Java SpringCloud

注册中心:Nacos
配置中心:Nacos
网关:Gateway
流量控制组件:Sentinel(网关统一限流,控制台sentinel-dashboard)、Hystrix(结合FeignClient方便使用)
HTTP客户端:OpenFeign

Nacos注册中心安装

单机模式、集群模式、多集群模式

其他注册中心框架:Eureka、Zookeeper等

安装JDK8配置JAVA_HOME

下载Nacos

https://github.com/alibaba/nacos/releases

启动nacos

windows cmd执行:startup.cmd -m standalone 或者 .\startup.cmd -m standalone
linux 执行:startup.sh -m standalone

windows下删除startup.cmd下面语句不然会报错

1
-XX:HeapDumpPath=%BASE_DIR%\logs\java_heapdump.hprof -XX:-UseLargePages

访问

输入 localhost:8848/nacos 用户名和密码 都是 nacos

nacos配置存储在MYSQL数据库中

数据库执行conf/mysql.sql,建立配置表
修改conf/application.properties,找到位置,放开相关注释,修改数据库连接信息

1
2
3
4
5
6
7
8
9
10
11
#*************** Config Module Related Configurations ***************#
### If use MySQL as datasource:
spring.datasource.platform=mysql

### Count of DB:
db.num=1

### Connect URL of DB:
db.url.0=jdbc:mysql://127.0.0.1:3306/it-config?characterEncoding=utf8&connectTimeout=1000&socketTimeout=3000&autoReconnect=true&useUnicode=true&useSSL=false&serverTimezone=UTC
db.user.0=abc
db.password.0=Abc@defg123

可能由于网络等原因出现数据库连接不上要把超时时间改大connectTimeout=10000&socketTimeout=30000 单位ms。connectTimeout 建立连接的时候参数在建立连接后不会再生效,socketTimeout 参数有设置的意义,在查询时间超过一定的阈值后,断开连接可以防止客户端的连接被一直占用。
当SocketServer返回数据的时候(类似于SQL结果集的返回)其流程是:服务端程序数据(数据库) -> 内核Socket Buffer -> 网络 -> 客户端Socket Buffer -> 客户端程序JDBC所在的JVM内存; 在数据库被突然停掉或是发生网络错误时由于TCP/IP的结构原因,socket没有办法探测到网络错误,因此应用也无法主动发现数据库连接断开。如果没有设置socket timeout的话,应用在数据库返回结果前会无期限地等下去,这种连接被称为dead connection。

可能对于带有排序、聚集函数的大查询,socketTimeout 应该设置很大的值
socket timeout必须高于statement timeout

从nacos中获取配置

pom增加依赖

1
2
3
4
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>

项目中增加bootstrap.yml配置 ,name与active组合成dataid

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# Tomcat
server:
port: 9000

# Spring
spring:
application:
# 应用名称
name: auth
profiles:
# 环境配置
active: dev
main:
allow-bean-definition-overriding: true

cloud:
nacos:
discovery:
# 服务注册地址
server-addr: 127.0.0.1:8848
config:
# 配置中心地址
server-addr: 127.0.0.1:8848
# 配置文件格式
file-extension: yml
# 共享配置
shared-dataids: application-${spring.profiles.active}.${spring.cloud.nacos.config.file-extension}

nacos控制台中添加auth-dev.yml配置信息

Gateway

配置

system/**开头的路径会转发给provider服务。

1
2
3
4
5
6
7
8
server.port=8888
spring.application.name=service-gateway
spring.cloud.nacos.discovery.server-addr=http://127.0.0.1:8848

spring.cloud.gateway.routes[0].id=route0
spring.cloud.gateway.routes[0].uri=lb://service-provider
spring.cloud.gateway.routes[0].predicates[0]=Path=/system/**
spring.cloud.gateway.routes[0].filters[0]=StripPrefix=1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
cloud:
gateway:
discovery:
locator:
# 是否和服务注册与发现组件结合,设置为 true 后可以直接使用应用名称调用服务
enabled: true
# 路由(routes:路由,它由唯一标识(ID)、目标服务地址(uri)、一组断言(predicates)和一组过滤器组成(filters)。filters 不是必需参数。)
routes:
# 路由标识(id:标识,具有唯一性) 截取请求
- id: route0
# 目标服务地址(uri:地址,请求转发后的地址) 或者lb://route0
uri: https://www.infotech.vip
# 路由条件(predicates:断言,匹配 HTTP 请求内容)
predicates:
## 转发地址格式为 uri/archive,/system 部分会被下面的过滤器给截取掉,相当于匹配到system前缀的请求就进行转发
- Path=/system/**
filters:
## 截取路径位数
- StripPrefix=1

代码主要内容

  • 权限过滤:定义filter校验token与通过策略放行部分路径进行权限过滤控制
  • 服务降级:Sentinel返回错误信息
  • 跨域控制

Ribbon配置

增加Ribbon依赖

1
2
3
4
5
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-ribbon</artifactId>
<version>2.1.0.RELEASE</version>
</dependency>

可以与Ribbon组合使用,实现负载均衡策略的配置。yml中通过NFLoadBalancerRuleClassName:com.netflix.loadbalancer.RoundRobinRule配置项目指定。
还可以自定义负载均衡策略。

Open Feign Client

Spring Cloud框架提供了RestTemplate和FeignClient两个方式完成服务间调用

作为Spring Cloud的子项目之一,Spring Cloud OpenFeign以将OpenFeign集成到Spring Boot应用中的方式,为微服务架构下服务之间的调用提供了解决方案。首先,利用了OpenFeign的声明式方式定义Web服务客户端;其次还更进一步,通过集成Ribbon或Eureka实现负载均衡的HTTP客户端。

Feign 是一个 Java 到 HTTP 的客户端绑定器,灵感来自于 Retrofit 和 JAXRS-2.0 以及 WebSocket。Feign 的第一个目标是降低将 Denominator 无变化的绑定到 HTTP APIs 的复杂性,而不考虑 ReSTfulness。

Feign 使用 Jersey 和 CXF 等工具为 ReST 或 SOAP 服务编写 java 客户端。此外,Feign 允许您在 Apache HC 等http 库之上编写自己的代码。Feign 以最小的开销将代码连接到 http APIs,并通过可定制的解码器和错误处理(可以写入任何基于文本的 http APIs)将代码连接到 http APIs。

Spring Cloud Ribbon是一个基于HTTP和TCP的客户端负载均衡工具,它基于Netflix Ribbon实现。
因为微服务间的调用,API网关的请求转发等内容,实际上都是通过Ribbon来实现的,Feign,它也是基于Ribbon实现的工具。

简单测试定义springclouddemo-provider springclouddemo-consumer springclouddemo-gateway三个微服务根据需要增加项目前缀容易识别,一般是项目名称,比如springclouddemo-,正常业务是一个服务可以同时是服务消费者与服务提供者,服务命名需要根据具体需求定义。

maven依赖

1
2
3
4
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>

demo

消费者微服务定义调用其他微服务服务的接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package vip.infotech.springclouddemoconsumer.feignclient;

/**
* 调用service-provider中的test接口
*/
@FeignClient(name="service-provider")
public interface TestProviderFeign {

@RequestMapping("test")
public String test();

@RequestMapping("update")
public String updateUserName(Integer id, Stirng userName);

}

消费者微服务业务实现调用TestService的test

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package vip.infotech.springclouddemoconsumer.service.impl;

@Service
public class UserServiceImpl implements UserService {

@Autowired
private UserMapper userMapper;

@Autowired
private TestProviderFeign testProviderFeign;

@Override
@GlobalTransactional(rollbackFor = Exception.class)
// @Transactional(rollbackFor = Exception.class)
public boolean updateUserAge(Integer id, Integer age) {

// 通过当前服务修改数据
testUserMapper.updateUserAge(id, age);

// 通过 fegin 调用其他服务修改数据
testProviderFeign.updateUserName(id, "abc");

// 两个都执行成功时,这里发生异常,测试是否数据都会回滚
System.out.println(1/0);

return true;
}
}

服务提供者微服务Controler层定义updateUserAgetest提供其他服务调用。

feign配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 设置 feign 客户端超时时间, 默认 1s
feign:
client:
config:
default:
connectTimeout: 5000
readTimeout: 5000
compression:
request:
# 开启数据压缩请求
enabled: true
# 压缩数据类型
mime-types: text/xml, application/xml, application/json
# 数据压缩下限 2048表示传输数据大于2048 才会进行数据压缩(最小压缩值标准)
min-request-size: 2048
# 开启数据压缩响应
response:
enabled: true

# OpenFeign 默认支持 Ribbon,也可以使用以下配置
ribbon:
# 建立连接所用时间,适用于网络正常的情况下,两端连接所用的时间
ConnectTimeout: 5000
# 建立连接后从服务器读取到可用资源所用的时间
ReadTimeout: 5000

日志

1
2
3
4
5
6
7
@Configuration
public class LogConfig {
@Bean
Logger.Level loggerLevel(){
return Logger.Level.FULL;
}
}
1
2
3
4
logging:
level:
# 以什么级别监控那个接口
com.software.controller: debug

Ribbon负载均衡算法:(可以在配置文件中配置使用的算法)
com.netflix.loadbalancer.RandomRule:随机算法实现;
RoundRobinRule:轮询负载均衡策略,依次轮询所有可用服务器列表,遇到第一个可用的即返回;
RetryRule :先按照RoundRobinRule策略获取服务,如果获取服务失败会在指定时间内重试;
AvaliabilityFilteringRule: 过滤掉那些因为一直连接失败的被标记为circuit tripped的后端server,并过滤掉那些高并发的的后端server(active connections 超过配置的阈值) ;
BestAvailableRule :会先过滤掉由于多次访问故障二处于断路器跳闸状态的服务,然后选择一个并发量最小的服务;
WeightedResponseTimeRule: 根据响应时间分配一个weight,响应时间越长,weight越小,被选中的可能性越低;
ZoneAvoidanceRule: 复合判断server所在区域的性能和server的可用性选择server

问题集

gateway问题1

Parameter 0 of method modifyRequestBodyGatewayFilterFactory in org.springframework.cloud.gateway.config.GatewayAutoConfiguration required a bean of type 'org.springframework.http.codec.ServerCodecConfigurer' that could not be found.

将pom.xml中关于spring-boot-start-web模块的jar依赖去掉
出现对象无法注入成功优先检查是否出现模块POM重复引用

gateway不需要数据源

在启动类增加排除数据源自动配置注解 SpringBootApplication(exclude= {DataSourceAutoConfiguration.class})

feignclient远程调用服务404与401

将SpringBoot配置文件里面 server.servlet.context-path 注释掉即可。
注释掉后报错401没有权限说明已经调用到服务了。oauth2.0增加放行的路径

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
security:
oauth2:
client:
client-id: to_server
client-secret: 123456
scope: server
resource:
loadBalanced: true
prefer-token-info: false
ignore:
urls:
- /v2/api-docs
- /actuator/**
- /user/info/*
- /operlog
- /logininfor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public class ResourceServerConfig extends ResourceServerConfigurerAdapter
{
@Override
public void configure(HttpSecurity http) throws Exception
{
http.csrf().disable();
ExpressionUrlAuthorizationConfigurer<HttpSecurity>.ExpressionInterceptUrlRegistry registry = http
.authorizeRequests();
// 不登录可以访问
authIgnoreConfig().getUrls().forEach(url -> registry.antMatchers(url).permitAll());
registry.anyRequest().authenticated();
}
}


@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
// 表示通过aop框架暴露该代理对象,AopContext能够访问
@EnableAspectJAutoProxy(exposeProxy = true)
// 指定要扫描的Mapper类的包的路径
@MapperScan("com.infotech.**.mapper")
// 开启线程异步执行
@EnableAsync
// 自动加载类
@Import({ SecurityImportBeanDefinitionRegistrar.class, OAuth2FeignConfig.class, ApplicationConfig.class })
public @interface EnableCustomConfig
{

}

public class SecurityImportBeanDefinitionRegistrar implements ImportBeanDefinitionRegistrar
{
@Override
public void registerBeanDefinitions(AnnotationMetadata metadata, BeanDefinitionRegistry registry)
{
Class<ResourceServerConfig> aClass = ResourceServerConfig.class; // 资源服务配置
String beanName = StringUtils.uncapitalize(aClass.getSimpleName());
BeanDefinitionBuilder beanDefinitionBuilder = BeanDefinitionBuilder.genericBeanDefinition(ResourceServerConfig.class);
registry.registerBeanDefinition(beanName, beanDefinitionBuilder.getBeanDefinition());
}
}

@EnableCustomConfig //增加OAuth2.0相关配置
@SpringBootApplication
@EnableAutoConfiguration(exclude = {DataSourceAutoConfiguration.class})
public class SystemApplication extends SpringBootServletInitializer {
}

POSTMAN测试方法

需要Authorization 选择basicAuth设置用户名密码或者其他传递参数

源码分析

gateway

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
public class RoutePredicateHandlerMapping extends AbstractHandlerMapping {
// RoutePredicateHandlerMapping会遍历所有路由Route,并将获取到的route放入当前请求上下文的属性中
protected Mono<Route> lookupRoute(ServerWebExchange exchange) {

}
}

public class DiscoveryClientRouteDefinitionLocator implements RouteDefinitionLocator {
// 走到DiscoveryClientRouteDefinitionLocatord的getRouteDefinitions()方法
@Override
public Flux<RouteDefinition> getRouteDefinitions() {

SpelExpressionParser parser = new SpelExpressionParser();
Expression includeExpr = parser
.parseExpression(properties.getIncludeExpression());
Expression urlExpr = parser.parseExpression(properties.getUrlExpression());

Predicate<ServiceInstance> includePredicate;
if (properties.getIncludeExpression() == null
|| "true".equalsIgnoreCase(properties.getIncludeExpression())) {
includePredicate = instance -> true;
}
else {
includePredicate = instance -> {
Boolean include = includeExpr.getValue(evalCtxt, instance, Boolean.class);
if (include == null) {
return false;
}
return include;
};
}

return serviceInstances.filter(instances -> !instances.isEmpty())
.map(instances -> instances.get(0)).filter(includePredicate)
.map(instance -> {
RouteDefinition routeDefinition = buildRouteDefinition(urlExpr,
instance); // 可以获取到lb://xxxxx

final ServiceInstance instanceForEval = new DelegatingServiceInstance(
instance, properties);

for (PredicateDefinition original : this.properties.getPredicates()) {
PredicateDefinition predicate = new PredicateDefinition();
predicate.setName(original.getName());
for (Map.Entry<String, String> entry : original.getArgs()
.entrySet()) {
String value = getValueFromExpr(evalCtxt, parser,
instanceForEval, entry);
predicate.addArg(entry.getKey(), value);
}
routeDefinition.getPredicates().add(predicate);
}

for (FilterDefinition original : this.properties.getFilters()) { // 过滤
FilterDefinition filter = new FilterDefinition();
filter.setName(original.getName());
for (Map.Entry<String, String> entry : original.getArgs()
.entrySet()) {
String value = getValueFromExpr(evalCtxt, parser,
instanceForEval, entry);
filter.addArg(entry.getKey(), value);
}
routeDefinition.getFilters().add(filter);
}

return routeDefinition;
});
}

}

public class RouteToRequestUrlFilter implements GlobalFilter, Ordered {

// 构造完整的负载均衡地址,会出现
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { // 普通http请求每次都会进来,ws只建立链接进来一次
Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR);
if (route == null) {
return chain.filter(exchange);
}
log.trace("RouteToRequestUrlFilter start");
URI uri = exchange.getRequest().getURI(); // http://localhost:9000/websocket/info?t=1618903996773
boolean encoded = containsEncodedParts(uri);
URI routeUri = route.getUri(); // lb://stomp

if (hasAnotherScheme(routeUri)) {
// this is a special url, save scheme to special attribute
// replace routeUri with schemeSpecificPart
exchange.getAttributes().put(GATEWAY_SCHEME_PREFIX_ATTR,
routeUri.getScheme());
routeUri = URI.create(routeUri.getSchemeSpecificPart());
}

if ("lb".equalsIgnoreCase(routeUri.getScheme()) && routeUri.getHost() == null) {
// Load balanced URIs should always have a host. If the host is null it is
// most
// likely because the host name was invalid (for example included an
// underscore)
throw new IllegalStateException("Invalid host: " + routeUri.toString());
}

URI mergedUrl = UriComponentsBuilder.fromUri(uri) // lb://stomp/websocket/info?t=1618903996773
// .uri(routeUri)
.scheme(routeUri.getScheme()).host(routeUri.getHost())
.port(routeUri.getPort()).build(encoded).toUri();
exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, mergedUrl);
return chain.filter(exchange);
}
}

// 负载均衡过滤器
public class LoadBalancerClientFilter implements GlobalFilter, Ordered {
@Override
@SuppressWarnings("Duplicates")
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
URI url = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);
String schemePrefix = exchange.getAttribute(GATEWAY_SCHEME_PREFIX_ATTR);
if (url == null
|| (!"lb".equals(url.getScheme()) && !"lb".equals(schemePrefix))) {
return chain.filter(exchange);
}
// preserve the original url
addOriginalRequestUrl(exchange, url);

if (log.isTraceEnabled()) {
log.trace("LoadBalancerClientFilter url before: " + url);
}

final ServiceInstance instance = choose(exchange); // Ribbon 负载均衡算法

if (instance == null) {
throw NotFoundException.create(properties.isUse404(),
"Unable to find instance for " + url.getHost());
}

URI uri = exchange.getRequest().getURI();

// if the `lb:<scheme>` mechanism was used, use `<scheme>` as the default,
// if the loadbalancer doesn't provide one.
String overrideScheme = instance.isSecure() ? "https" : "http";
if (schemePrefix != null) {
overrideScheme = url.getScheme();
}

URI requestUrl = loadBalancer.reconstructURI(
new DelegatingServiceInstance(instance, overrideScheme), uri); // http://192.168.38.152:8082/websocket/info?t=1618904280775

if (log.isTraceEnabled()) {
log.trace("LoadBalancerClientFilter url chosen: " + requestUrl);
}

exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, requestUrl);
return chain.filter(exchange);
}

//负载均衡选择方法
protected ServiceInstance choose(ServerWebExchange exchange) {
return loadBalancer.choose(
((URI) exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR)).getHost());
}
}


public class RibbonLoadBalancerClient implements LoadBalancerClient {
@Override
public ServiceInstance choose(String serviceId) {
return choose(serviceId, null);
}

/**
* New: Select a server using a 'key'.
* @param serviceId of the service to choose an instance for
* @param hint to specify the service instance
* @return the selected {@link ServiceInstance}
*/
public ServiceInstance choose(String serviceId, Object hint) {
Server server = getServer(getLoadBalancer(serviceId), hint);
if (server == null) {
return null;
}
return new RibbonServer(serviceId, server, isSecure(server, serviceId),
serverIntrospector(serviceId).getMetadata(server));
}

protected Server getServer(ILoadBalancer loadBalancer, Object hint) {
if (loadBalancer == null) {
return null;
}
// Use 'default' on a null hint, or just pass it on?
return loadBalancer.chooseServer(hint != null ? hint : "default"); // 调用默认负载均衡器中的chooseServer方法
}
}

// 默认负载均衡器
public class ZoneAwareLoadBalancer<T extends Server> extends DynamicServerListLoadBalancer<T> {
@Override
public Server chooseServer(Object key) { // key=default
if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) {
logger.debug("Zone aware logic disabled or there is only one zone");
return super.chooseServer(key); // 执行该方法
}
Server server = null;
try {
LoadBalancerStats lbStats = getLoadBalancerStats();
Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
logger.debug("Zone snapshots: {}", zoneSnapshot);
if (triggeringLoad == null) {
triggeringLoad = DynamicPropertyFactory.getInstance().getDoubleProperty(
"ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".triggeringLoadPerServerThreshold", 0.2d);
}

if (triggeringBlackoutPercentage == null) {
triggeringBlackoutPercentage = DynamicPropertyFactory.getInstance().getDoubleProperty(
"ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".avoidZoneWithBlackoutPercetage", 0.99999d);
}
Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());
logger.debug("Available zones: {}", availableZones);
if (availableZones != null && availableZones.size() < zoneSnapshot.keySet().size()) {
String zone = ZoneAvoidanceRule.randomChooseZone(zoneSnapshot, availableZones);
logger.debug("Zone chosen: {}", zone);
if (zone != null) {
BaseLoadBalancer zoneLoadBalancer = getLoadBalancer(zone);
server = zoneLoadBalancer.chooseServer(key);
}
}
} catch (Exception e) {
logger.error("Error choosing server using zone aware logic for load balancer={}", name, e);
}
if (server != null) {
return server;
} else {
logger.debug("Zone avoidance logic is not invoked.");
return super.chooseServer(key);
}
}
}
public class BaseLoadBalancer extends AbstractLoadBalancer implements
PrimeConnections.PrimeConnectionListener, IClientConfigAware {
/*
* Get the alive server dedicated to key
*
* @return the dedicated server
*/
public Server chooseServer(Object key) {
if (counter == null) {
counter = createCounter();
}
counter.increment();
if (rule == null) {
return null;
} else {
try {
return rule.choose(key); // rule.roundRobinRule rule.lb=ZoneAwareLoadBalancer
} catch (Exception e) {
logger.warn("LoadBalancer [{}]: Error choosing server for key {}", name, key, e);
return null;
}
}
}
}

public abstract class PredicateBasedRule extends ClientConfigEnabledRoundRobinRule {
/**
* Get a server by calling {@link AbstractServerPredicate#chooseRandomlyAfterFiltering(java.util.List, Object)}.
* The performance for this method is O(n) where n is number of servers to be filtered.
*/
@Override
public Server choose(Object key) {
ILoadBalancer lb = getLoadBalancer();
Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key); // 返回服务,key=default
if (server.isPresent()) {
return server.get();
} else {
return null;
}
}
}

public abstract class AbstractServerPredicate implements Predicate<PredicateKey> {
/**
* Get servers filtered by this predicate from list of servers.
*/
public List<Server> getEligibleServers(List<Server> servers, Object loadBalancerKey) {
if (loadBalancerKey == null) {
return ImmutableList.copyOf(Iterables.filter(servers, this.getServerOnlyPredicate()));
} else {
List<Server> results = Lists.newArrayList();
for (Server server: servers) {
if (this.apply(new PredicateKey(loadBalancerKey, server))) { // 返回default服务
results.add(server);
}
}
return results;
}
}

/**
* Choose a server in a round robin fashion after the predicate filters a given list of servers and load balancer key.
*/
public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) { // loadBalancerKey=default
List<Server> eligible = getEligibleServers(servers, loadBalancerKey);
if (eligible.size() == 0) {
return Optional.absent();
}
return Optional.of(eligible.get(incrementAndGetModulo(eligible.size())));
}

/**
* Referenced from RoundRobinRule
* Inspired by the implementation of {@link AtomicInteger#incrementAndGet()}.
*
* @param modulo The modulo to bound the value of the counter.
* @return The next value.
*/
private int incrementAndGetModulo(int modulo) { // 如果是websocket改方法只会进来一次,除非断线重连了。
for (;;) {
int current = nextIndex.get();
int next = (current + 1) % modulo;
if (nextIndex.compareAndSet(current, next) && current < modulo) // 也就是循环,如果有3个服务就是 0-1-2 循环请求
return current;
}
}
}

public class ZoneAvoidancePredicate extends AbstractServerPredicate {
@Override
public boolean apply(@Nullable PredicateKey input) {
if (!ENABLED.get()) {
return true;
}
String serverZone = input.getServer().getZone();
if (serverZone == null) {
// there is no zone information from the server, we do not want to filter
// out this server
return true;
}
LoadBalancerStats lbStats = getLBStats();
if (lbStats == null) {
// no stats available, do not filter
return true;
}
if (lbStats.getAvailableZones().size() <= 1) {
// only one zone is available, do not filter
return true; // 一个实例的情况
}
Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
if (!zoneSnapshot.keySet().contains(serverZone)) {
// The server zone is unknown to the load balancer, do not filter it out
return true;
}
logger.debug("Zone snapshots: {}", zoneSnapshot);
Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());
logger.debug("Available zones: {}", availableZones);
if (availableZones != null) {
return availableZones.contains(input.getServer().getZone());
} else {
return false;
}
}
}


public class CachingRouteLocator implements Ordered, RouteLocator,
ApplicationListener<RefreshRoutesEvent>, ApplicationEventPublisherAware { // 定时从nacos刷新路由规则
@Override
public void onApplicationEvent(RefreshRoutesEvent event) { // RefreshRoutesEvent ,heartbeat
try {
fetch().collect(Collectors.toList()).subscribe(list -> Flux.fromIterable(list)
.materialize().collect(Collectors.toList()).subscribe(signals -> {
applicationEventPublisher
.publishEvent(new RefreshRoutesResultEvent(this));
cache.put(CACHE_KEY, signals);
}, throwable -> handleRefreshError(throwable)));
}
catch (Throwable e) {
handleRefreshError(e);
}
}
}

// --转发过程--
public class WebsocketRoutingFilter implements GlobalFilter, Ordered {
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
changeSchemeIfIsWebSocketUpgrade(exchange);

URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);
String scheme = requestUrl.getScheme();

if (isAlreadyRouted(exchange)
|| (!"ws".equals(scheme) && !"wss".equals(scheme))) {
return chain.filter(exchange);
}
setAlreadyRouted(exchange);

HttpHeaders headers = exchange.getRequest().getHeaders();
HttpHeaders filtered = filterRequest(getHeadersFilters(), exchange);

List<String> protocols = headers.get(SEC_WEBSOCKET_PROTOCOL);
if (protocols != null) {
protocols = headers.get(SEC_WEBSOCKET_PROTOCOL).stream().flatMap(
header -> Arrays.stream(commaDelimitedListToStringArray(header)))
.map(String::trim).collect(Collectors.toList());
}

return this.webSocketService.handleRequest(exchange, new ProxyWebSocketHandler(
requestUrl, this.webSocketClient, filtered, protocols)); // 设置代理
}


private static class ProxyWebSocketHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
// pass headers along so custom headers can be sent through
return client.execute(url, this.headers, new WebSocketHandler() {
@Override
public Mono<Void> handle(WebSocketSession proxySession) {
// Use retain() for Reactor Netty
Mono<Void> proxySessionSend = proxySession
.send(session.receive().doOnNext(WebSocketMessage::retain));// 处理websocket消息
// .log("proxySessionSend", Level.FINE);
Mono<Void> serverSessionSend = session.send(
proxySession.receive().doOnNext(WebSocketMessage::retain));
// .log("sessionSend", Level.FINE);
return Mono.zip(proxySessionSend, serverSessionSend).then();
}

/**
* Copy subProtocols so they are available downstream.
* @return
*/
@Override
public List<String> getSubProtocols() {
return ProxyWebSocketHandler.this.subProtocols;
}
});
}
}
}

public class WebSocketMessage {
public WebSocketMessage retain() { // 读取数据
DataBufferUtils.retain(this.payload);
return this;
}
}

websocket 连接建立才进行路由,后续直接走代理不重复进行路由。

扩展

dubbo

(读音[ˈdʌbəʊ])是阿里巴巴公司开源的一个高性能优秀的服务框架,使得应用可通过高性能的 RPC 实现服务的输出和输入功能,可以和Spring框架无缝集成。

它提供了三大核心能力:面向接口的远程方法调用,智能容错和负载均衡,以及服务自动注册和发现。核心部件:Remoting、RPC、Registry。跟ZK组合使用,dubbo就是动物园的动物,zookeeper是动物园。我们通过dubbo 建立service这个服务,并且到zookeeper上面注册,填写对应的zookeeper服务所在 的IP及端口号。管理中心(动物园)和供给者(各种动物),消费者(万千游客)。通过dubbo-admin可以对消费者和提供者进行管理,是一个war包,可以在tomcat中启动。

Dubbo 提供了4种负载均衡实现,分别是基于权重随机算法的 RandomLoadBalance、基于最少活跃调用数算法的 LeastActiveLoadBalance、基于 hash 一致性的 ConsistentHashLoadBalance,以及基于加权轮询算法的 RoundRobinLoadBalance。

Dubbo使用dubbo缺省协议,基于netty+hessian(二进制序列化),采用单一长连接和NIO异步通讯,适合于小数据量大并发的服务调用,以及服务消费者机器数远大于服务提供者机器数的情况,不适合传送大数据量的服务,比如传文件,传视频等,除非请求量很低。还可以支持RMI、Hessian、Http、Webservice、Thrift、memcached、redis、RestFull协议。可以不同服务配置不同服务。使用的是netty,主流通信框架包括netty,mina,Grizzly都可以选用。

源码分析

nacos

asyncContext 实现long polling。 req.startAsync();asyncContext.setTimeout(0L);
LongPollingService.java onEvent() 配置修改的时候触发返回 默认等待30-0.5=29.5秒

参考