java网络编程之Netty

基础概念

BIO:同步阻塞式IO,服务器实现模式为一个连接一个线程,即客户端有连接请求时服务器端就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销,当然可以通过线程池机制改善。
NIO:同步非阻塞式IO,服务器实现模式为一个请求一个线程,即客户端发送的连接请求都会注册到多路复用器上,多路复用器轮询到连接有I/O请求时才启动一个线程进行处理。
Java AIO(NIO.2) : 异步非阻塞,服务器实现模式为一个有效请求一个线程,客户端的I/O请求都是由OS先完成了再通知服务器应用去启动线程进行处理
NIO方式适用于连接数目多且连接比较短(轻操作)的架构,比如聊天服务器,并发局限于应用中,编程比较复杂,JDK1.4开始支持。
AIO方式使用于连接数目多且连接比较长(重操作)的架构,比如相册服务器,充分调用OS参与并发操作,编程比较复杂,JDK7开始支持

NIO采用轮询的方式,但是AIO不需要,AIO框架在windows下使用windows IOCP技术,在Linux下使用epoll多路复用IO技术模拟异步IO,向操作系统注册监听,操作系统数据准备好会主动通知应用程序(订阅、通知模式),不再需要selector轮询,由channel通道直接到操作系统注册监听。

NIO和AIO
NIO:会等数据准备好后,再交由应用进行处理,数据的读取/写入过程依然在应用线程中完成,只是将等待的时间剥离到单独的线程中去,节省数据准备时间,因为多路复用机制,Selector会得到复用,对于那些读写过程时间长的,NIO就不太适合。NIO适合读写连接多的小数据。数据量大的场景BIO也是很适合的。
AIO:读完(内核内存拷贝到用户内存)了系统再通知应用,使用回调函数,进行业务处理,AIO能够胜任读写过程长的任务。

NIO 同步非阻塞概念

Selector提供选择已经就绪的任务的能力:
Selector轮询注册在其上的Channel,如果某个Channel发生读写请求并且Channel就处于就绪状态,会被Selector轮询出来,然后通过SelectionKey可以获取就绪Channel的集合,进行后续的I/O操作。(同步)
一个Selector可以同时轮询多个Channel,因为JDK使用了epoll()代替传统的select实现,所以没有最大连接句柄1024/2048的限制。所以,只需要一个线程负责Selector的轮询,就可以接入成千上万的客户端。(非阻塞)

网络编程注意事项

  1. 黏包、断包问题解决,网络上很多DEMO都不是很完善
  2. 断线重连 客户端读取数据出现异常间隔5秒重连,客户端心跳判断连接断开间隔5秒重连,发送数据失败重连(没有连上,间隔5秒重连)。
  3. 业务数据包补发。考虑到网络环境的不稳定性、多变性(比如从进入电梯、进入地铁、移动网络切换到wifi等),在消息发送的时候,发送失败的概率其实不小,这时消息重发机制就很有必要了。发送消息时,除了心跳消息、握手消息、状态报告消息外,消息都加入消息发送超时管理器,立马开启一个定时器,比如每隔5秒执行一次,共执行3次,在这个周期内,如果消息没有发送成功,会进行3次重发,达到3次重发后如果还是没有发送成功,那就放弃重发,移除该消息,同时通过消息转发器通知应用层,由应用层决定是否再次重发。如果消息发送成功,服务端会返回一个消息发送状态报告,客户端收到该状态报告后,从消息发送超时管理器移除该消息,同时停止该消息对应的定时器即可。客户端发送消息,如果服务端没有反馈还可以在客户端做补发,失败的地方把该消息加入到队列,连接成功后查看队列从队列中取出补发该消息。在用户握手认证成功时,应该检查消息发送超时管理器里是否有发送超时的消息,如果有,则全部重发。还可以在发送失败报异常的时候把该消息加入到重发队列,重连成功后补发。
  4. java原生的BIO开发,一般会专门启动一个线程负责读取数据,其实读写就是对socket的inputStream、outputStream进行操作。把读取到的完整包解析出来给业务线程池处理,把剩余的不完整包跟下一次读取到的数据合并。nio一般是通过类似事件通知机制实现不阻塞。
  5. 心跳包:主要是为了防止NAT超时(比如手机连接互联网,运营商的网关就做了NAT映射,把无线网跟因特网对接,隔一段时间没有通信链接会被网关释放。防火墙也会把一段时间内没通信的链接断开),探测连接是否断开,由客户端发送比较合理。链路断开, 没有写操作的TCP连接是感知不到的, 除非这个时候发送数据给服务器, 造成写超时, 否则TCP连接不会知道断开了。主动kill掉一方的进程, 另一方会关闭TCP连接, 是系统代进程给服务器发的FIN. TCP连接就是这样, 只有明确的收到对方发来的关闭连接的消息(收到RST也会关闭, 大家都懂), 或者自己意识到发生了写超时, 否则它认为连接还存在。但是网路复杂中途出现的问题也会比较常见,譬如网线被掐断,对象进程被杀掉,频繁丢包,对方这时候的TCP长连接是不可使用的,但是对于应用层并不知道。如果需要知道当前的网络状况则需要很复杂的超时进行了解,TCP底层就实现了这样的功能,心跳机制是TCP在一段时间间隔后发送确定连接是否存在,如果确定存在的话,就会回传一个包来确定连接是存在的,如果没有返回包的话,则应该通知上层,网络出现了问题,需要进行连接失败的操作了。手动关闭客户端进程,事实上并不能测试出想要的结果,因为进程是在应用层的,所以,这种测试方法不能保证网络驱动层也不发送数据报文给服务器。经过测试发现,当应用层强制结束进程时,对于TCP连接,驱动层会发送reset数据包!而服务器收到这个数据包就可以正常关闭了!那么,如果拔掉网线呢,服务器收不到这个数据包,就会导致死连接存在!所以,心跳包是必要的,或者应用TCP协议本身的Keep-alive来设置SO_KEEPALIVE。减轻服务端压力服务端可以不回复心跳包,服务端一定时间内没有收到心跳包就断开该连接。
  6. 分包的方式:包头+长度、固定长度、分隔符(回车、自定义字符串)

常用方法

分包封装

  • 分隔符:DelimiterBasedFrameDecoder
  • 固定长度:FixedLengthFrameDecoder
  • 按行分隔:LineBasedFrameDecoder 遇到一个换行符,则认为是一个完整的报文
  • 自定义长度帧解码器:LengthFieldBasedFrameDecoder 可以指定最大包长度、长度域大小与位置,new LengthFieldBasedFrameDecoder(1024,0,2),一个包最好不要超过2048
    1. lengthFieldOffset = 0;
    2. lengthFieldLength = 2;
    3. lengthAdjustment = 0;
    4. initialBytesToStrip = 0。

自带的解析器遇到错误包怎么处理?

比如LengthFieldBasedFrameDecoder,遇到错误包会进行丢弃,保证后续发送过来的正常包可以继续解析。比如指定包最大长度为1024,解析到的长度超过1024就会报错丢弃,如果有包头还可以initialBytesToStrip参数设置成0,在处理程序中校验包头。如果长度指定成65536,如果中间有包给的长度是错误的,比如设置成3万,就可能出现需要丢弃掉3万个字节后面的内容才可以继续正常解析。

如果想要保住万无一失,就必须增加包头、包尾、长度、MD5校验码,进行组合校验,可以保证业务包出错的情况还能比较正常的运行。一般情况下直接用框架自带的Decoder就够用了。

编解码

  • StringDecoderStringEncoder

网络编程可能问题

多进程通过数据库进行交互

通过数据库进行交互可能出现一个进程往数据库里写入了一条记录,通知另一个进程去读取。但是写入操作还未提交另一个进程就去读取数据了,读取到的数据为空,特别是在需要批量操作的场景容易出现。所以需要根据情况写入后进行提交操作,提交成功然后再通知另一个进程读取。

1
@Transactional(propagation=Propagation.NEVER) // java spring ibatis 提交的代码
1
<tx:annotation-driven transaction-manager="transactionManagerIT"/>

或者直接通过配置

1
2
3
4
5
6
7
8
9
10
11
12
<bean id="baseTransactionProxyIT" class="org.springframework.transaction.interceptor.TransactionProxyFactoryBean" abstract="true">
<property name="transactionManager">
<ref bean="transactionManagerIT" />
</property>
<property name="transactionAttributes">
<props>
<prop key="save*">PROPAGATION_REQUIRED,-Exception</prop>
<prop key="send">PROPAGATION_NEVER,readOnly,-Exception</prop>
<prop key="*">PROPAGATION_SUPPORTS,-Exception</prop>
</props>
</property>
</bean>

Netty 主要功能介绍

心跳

使用空闲时间发送心跳。无消息交互时就算空闲, 客户端一直不发心跳,服务端无法通过心跳判断什么时候需要断开连接。

1
2
3
public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {
this((long)readerIdleTimeSeconds, (long)writerIdleTimeSeconds, (long)allIdleTimeSeconds, TimeUnit.SECONDS);
}
  • readerIdleTimeSeconds: 读超时. 即当在指定的时间间隔内没有从 Channel 读取到数据时, 会触发一个 READER_IDLE 的 IdleStateEvent 事件.
  • writerIdleTimeSeconds: 写超时. 即当在指定的时间间隔内没有数据写入到 Channel 时, 会触发一个 WRITER_IDLE 的 IdleStateEvent 事件.
  • allIdleTimeSeconds: 读/写超时. 即当在指定的时间间隔内没有读或写操作时, 会触发一个 ALL_IDLE 的 IdleStateEvent 事件.

语法说明

1
f.channel().closeFuture().sync();//等待服务端关闭端口监听。主线程执行到这里就 wait 子线程结束,子线程才是真正监听和接受请求的,closeFuture()是开启了一个channel的监听器,负责监听channel是否关闭的状态,如果监听到channel关闭了,子线程才会释放

与spring集成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
//  ==============================存在问题=======================================================
@Slf4j
@Component
public class EchoServer {
/**
* NioEventLoop并不是一个纯粹的I/O线程,它除了负责I/O的读写之外
* 创建了两个NioEventLoopGroup,
* 它们实际是两个独立的Reactor线程池。
* 一个用于接收客户端的TCP连接,
* 另一个用于处理I/O相关的读写操作,或者执行系统Task、定时任务Task等。
*/
private final EventLoopGroup bossGroup = new NioEventLoopGroup();
private final EventLoopGroup workerGroup = new NioEventLoopGroup();
private Channel channel;
public ChannelFuture start(int port) throws Exception {
//绑定对应ip和端口,同步等待成功。一定要用端口的形式。网络上很多用.localAddress(new InetSocketAddress(ip,port))这种形式对监听IP进行了限制,如果设置成127.0.0.1外部客户端无法访问
ChannelFuture future = serverBootstrap.bind(port).sync();
LOGGER.info("rpc server 已启动,端口:{}", port);
}
public void destroy() {
}
}

/**
* 将Netty服务端从main函数启动方式改为交给Spring来管理启动和销毁的工作。本写法只支持jar包在命令行启动,在tomcat中启动会导致tomcat假死。
*/
@SpringBootApplication
public class SpringNettyApplication implements CommandLineRunner {

@Value("${netty.port}")
private int port;

@Value("${netty.url}")
private String url;

@Autowired
private EchoServer echoServer;

public static void main(String[] args) {
SpringApplication.run(SpringNettyApplication.class, args);
}

@Override
public void run(String... args) throws Exception { // CommandLineRunner run 等待Spring加载完成后执行nettyServer启动工作。tomcat中启动会有问题,新启线程执行也不行,需要异步处理,后面有例子说明
ChannelFuture future = echoServer.start(url,port);
Runtime.getRuntime().addShutdownHook(new Thread(){ // 这个方法的意思就是在jvm中增加一个关闭的钩子,当jvm关闭的时候,会执行系统中已经设置的所有通过方法addShutdownHook添加的钩子,当系统执行完这些钩子后,jvm才会关闭。所以这些钩子可以在jvm关闭的时候进行内存清理、对象销毁等操作
@Override
public void run() {
echoServer.destroy();
}
});
//服务端管道关闭的监听器并同步阻塞,直到channel关闭,线程才会往下执行,结束进程。不能写在echoServer.start方法里面,会导致springboot启动的时候阻塞住.主线程执行到这里就 wait 子线程结束,子线程才是真正监听和接受请求的,子线程就是Netty启动的监听端口的线程; 即closeFuture()是开启了一个channel的监听器,负责监听channel是否关闭的状态,如果未来监听到channel关闭了,子线程才会释放,syncUninterruptibly()让主线程同步等待子线程结果。.channel.close()才是主动关闭通道的方法。
future.channel().closeFuture().syncUninterruptibly();
}
}
/**
* 在springboot+netty+tomcat中启动,需要使用线程池新建线程启动netty,可以在Listener中启动
* 遇到tomcat假死排查步骤主要是:
* 1.查看网络问题
* 2.排查防火墙
* 3.查看springboot jdk tomcat netty版本与依赖
* 4.如果是在IDE中要判断是否IDE出问题
* 5.判断程序执行到哪里假死最终定位是netty问题,服务端绑定端口后异步监听,等待客户端连接,而这个过程线程会变为wait状态
* 解决思路:新启动线程启动netty服务端监听端口
*/
/**
* @ description 解决项目打war包发布在tomcat,端口被占用问题
* 解决思路:给NettyServer分配一个独立的线程用于加载
* 在IOC的容器的启动过程,当所有的bean都已经处理完成之后,spring ioc容器会有一个发布事件的动作,可以去做一些自己想做的事
* 让我们的bean实现ApplicationListener接口,这样当发布事件时,ioc容器就会以容器的实例对象作为事件源类,并从中找到事件的监听者,此时ApplicationListener接口实例中的onApplicationEvent(E event)方法就会被调用,我们的逻辑代码就会写在此处
* 一个是root application context ,另一个就是我们自己的 projectName-servlet context,这种情况下,就会造成onApplicationEvent方法被执行两次
* 我们可以只在root application context初始化完成后调用逻辑代码
* ContextStartedEvent(容器启动)、ContextRefreshedEvent(容器刷新也就是初始化完成,spring容器加载完毕做一件事情)等
* ContextRefreshedEvent详细说明:
* Published when the ApplicationContext is initialized or refreshed, for example, using the refresh() method on the ConfigurableApplicationContext interface. "Initialized" here means that all beans are loaded, post-processor beans are detected and activated, singletons are pre-instantiated, and the ApplicationContext object is ready for use. As long as the context has not been closed, a refresh can be triggered multiple times, provided that the chosen ApplicationContext actually supports such "hot" refreshes. For example, XmlWebApplicationContext supports hot refreshes, but GenericApplicationContext does not.
* 当容器中的bean初始化好的时候,会回调ContextRefreshedEvent事件一次,也可以手动调用容器提供的refresh()方法,去触发ContextRefreshedEvent事件
*/
// 例子一 =====================================================================================
@Slf4j
@Component
public class EchoServer implements Runnable{
...
@Override
public void run() { // 直接把上面demo启动方法拿过来,或者自己写启动与关闭方法进行调用
try {
EchoServer myself = this; // 注意:不需要
ChannelFuture future = startListener();
Runtime.getRuntime().addShutdownHook(new Thread() { // 注意:使用Listener在tomcat下启动不需要加这个方法,无法销毁
@Override
public void run() {
myself.destroy(); // tomcat下无法进行销毁
}
});
//服务端管道关闭的监听器并同步阻塞,直到channel关闭,线程才会往下执行,结束进程
future.channel().closeFuture().syncUninterruptibly();
} catch (Exception e) {
e.printStackTrace();
}
}
/** 正确写法
@Override
public void run() {
try {
ChannelFuture future = startListener();
//服务端管道关闭的监听器并同步阻塞,直到channel关闭,线程才会往下执行,结束进程
future.channel().closeFuture().syncUninterruptibly();
} catch (Exception e) {
e.printStackTrace();
}
}
*/
}
@Component
@Log
public class NettyServerListener implements ApplicationListener<ContextRefreshedEvent> {
@Autowired
private EchoServer nettyServer;
/**
* 当一个applicationContext被初始化或被刷新时触发
* @param event
*/
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
if (event.getApplicationContext().getParent() == null) { // root application context 没有parent
log.info("EchoServer Start Success");
//自己的NettyServer
// NettyServer nettyServer = new NettyServer(); // new 的方式不受spring监管、从配置文件注入的端口会无效
new Thread(nettyServer).start();
}
}
@PreDestroy
public void cleanup() {
nettyServer.destroy(); // tomcat可以进行销毁操作,类似于操作了一个falg标记使线程退出。但是感觉还有其他改进方案,应该终止线程,回收线程所有资源
// 使用线程的interrupt也无法正常结束
}
}
// ===============================可行方案======================================================
/**
* 方案二
* 可能初始化容器后调用CommandLineRunner的run方法太久导致后续无法执行,run方法如果有阻塞方法得用异步执行
* 修改方案:启动方法改成异步,也可以解决问题
*/
public class NettyServer {
@Override
public void start() { // 直接把上面demo启动方法拿过来,或者自己写启动与关闭方法进行调用
try {
ChannelFuture future = startListener();
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
this.destroy();
}
});
//服务端管道关闭的监听器并同步阻塞,直到channel关闭,线程才会往下执行,结束进程
future.channel().closeFuture().syncUninterruptibly();
} catch (Exception e) {
e.printStackTrace();
}
}
}
public class StartCommand implements CommandLineRunner {

@Resource
private NettyServer nettyServer;


@Override
public void run(String... args) throws Exception {
CompletableFuture.runAsync(() -> nettyServer.start()); // nettyServer 不需要实现Runnable,普通类调用就行
}

/**
// 通过@Async注解也能解决问题
@Override
@Async
public void run(String... args) throws Exception {
try {
ChannelFuture future = nettyServer.startListener();
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
nettyServer.destroy();
}
});
//服务端管道关闭的监听器并同步阻塞,直到channel关闭,线程才会往下执行,结束进程
future.channel().closeFuture().syncUninterruptibly();
} catch (Exception e) {
e.printStackTrace();
}
}
*/
}
// =====================================================================================
// 其他例子
@WebListener
public class NettyServerListener implements //ApplicationListener<ContextStartedEvent>
ServletContextListener // ServletContextListener用于监听tomcat启动或者关闭
{
private static final Logger log= LoggerFactory.getLogger(NettyServerListener.class);
private ExecutorService webSocketSinglePool;
private EchoServer echoServer= null;
@Override
public void contextInitialized(ServletContextEvent sce) {

}
@Override
public void contextDestroyed(ServletContextEvent sce) {

new Thread(()->echoServer.destroy()).run();
}

@PostConstruct
public void setup() {
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("webSocketSinglePool-%d").build();
webSocketSinglePool = new ThreadPoolExecutor(1, 1, 0L,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024),
namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
log.info("webSocketSinglePool init.");
}

/*@Override
public void onApplicationEvent(ContextStartedEvent event) {
log.info("监听到事件了");
runWebSocketServer(event.getApplicationContext());
}*/
private void runWebSocketServer(ApplicationContext applicationContext) {
final EchoServer echoServer = applicationContext.getBean(EchoServer.class);
new Thread(() -> {
try { //开始启动netty服务
log.info("ready to start Netty");
echoServer.run();
} catch (Exception e) {
log.error("webSocket listen and serve error.", e);
}
}).run();
}
@PreDestroy
public void cleanup(){
webSocketSinglePool.shutdown();
log.info("webSocketSinglePool destroyed.");
}
}
/**
* 另一个demo
*/
@Component
public class ApplicationRefreshListener implements ApplicationListener<ContextStartedEvent> {

private static final Logger LOG = LoggerFactory.getLogger(ApplicationRefreshListener.class);
private ExecutorService webSocketSinglePool;

@PostConstruct
public void setup() {
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("webSocketSinglePool-%d").build();
webSocketSinglePool = new ThreadPoolExecutor(1, 1, 0L,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024),
namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
LOG.info("webSocketSinglePool init.");
}

@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
runWebSocketServer(event.getApplicationContext());
}

private void runWebSocketServer(ApplicationContext applicationContext) {
final WebSocketServer webSocketServer = applicationContext.getBean(WebSocketServer.class);
webSocketSinglePool.execute(() -> {
try {
webSocketServer.listenAndServe();
} catch (Exception e) {
LOG.error("webSocket listen and serve error.", e);
}
});
}

@PreDestroy
public void cleanup() {
webSocketSinglePool.shutdown();
LOG.info("webSocketSinglePool destroyed.");
}
}
// =====================================================================================

注意要点

  • 服务端使用serverBootstrap.bind(port).sync();形式监听指定端口,不要用new InetSocketAddress(ip,port)这种限制IP的形式
  • Netty中ByteToMessageDecoder中的decode方法执行多次的问题,再查看decode方法的源码注释如下:This method will be called till either the input has nothing to read,意思是说:ByteBuf对象的数据没有读完的话,decode方法会一直调用。readerIndex会随着数据的读取而不断增加,所以保证每次decode读取一个完整包,如果不满足一个完整包就重置readerIndex退出下次再进来读。
  • 如果使用的是分隔符的数据包还可以先查找分隔符,如果没有找到分隔符却发现包里有内容可以进行跳过操作,丢弃这些错误数据,找到分隔符了就可以进行读取,验证长度,读取包内容等操作,读取完一个完整包退出等待系统自动调用decode继续后续包操作,当然也可以自己代码判断后面如果还有内容直接进行后续包处理。
  • 注意不要把没用到的netty自带的decoder解码器加上去,要根据具体需求加,不然会导致解码错误。还要注意decoder是否是可共享的如果是不可共享的得用new
  • write read 数据类型默认使用ByteBuf
  • 应用开发的时候往往是异步的,可能要经过多个应用,所以需要整条链路的标识,可以使用sessionid、token、id等做标识,这样返回的时候才能找到到底是谁发送的这个请求,也可以各个子应用自己生成标识单独维护各自请求的对应关系
  • 一般来说都是通过Decoder或者Encoder进行对设备上报与下发的协议内容的编解码转换成POJO,Handler中直接用POJO处理业务逻辑或者再进行分发到业务模块

功能扩展

启用一个端口解析不同设备多种协议

  1. 可以针对不同协议定义一个XXXHandler,定义一个入口Handler方法,在入口Handler判断协议头类型调用对应的XXXHandler处理类处理对应数据(可以保证handler的数据足够分析包类型,方案可行)
  2. 定义CustomDecoder,在Decoder中判断需要调用的Handler(可以保证Decode中的数据足够分析包类型,方案可行)。一般来说都是通过Decoder或者Encoder进行协议的编解码转换成POJO,Handler中直接用POJO处理业务逻辑,如果使用pojo下发命令还得有Encoder,如果直接用字节流处理了就不用。
  3. 说明:不同客户的连接过来发送的包是不会粘在一起的,客户端1的包不会跟客户的2的包粘在一起,只有可能客户端1的包1跟客户的1的包2粘在一起,如果多个客户端都共用一个内存存放包数据是会有问题的,会无法区分数据是谁的,就得在数据包中加标识说明是谁的包,正常的话底层框架都会分装好了,不会暴露这么原始的接口。很多c程序都是一个链接给一个独立buf。
  4. 其他方案:一个进程一个端口解析一种协议,一个进程多个端口解析不同协议

其他扩展框架

  • SOFABolt:蚂蚁金融服务集团开发的一套基于 Netty 实现的网络通信框架
  • smart-socket
  • t-io
  • mina

多线程

一个线程读取多个连接的数据(考虑各个链接都只读取到部分数据的情况,数据处理线程需要考虑数据拼接)
每个连接对应一个读线程
多个线程随机读取不定连接的数据(考虑各个链接都只读取到部分数据的情况,数据处理线程需要考虑数据拼接)

不能直接使用read会阻塞线程,那后面的线程都无法执行了,可以先通过available判断是否可读。
任务分发可以建立一个主线程的阻塞队列,然后分发任务至子线程,子线程若处理完毕,则提交任务至主线程。

扩展

通过一个缓冲区就可以把同步转换成异步。

AIO(NIO2.0) 与NIO不同,当进行读写操作时,只须直接调用API的read或write方法即可。这两种方法均为异步的,对于读操作而言,当有流可读取时,操作系统会将可读的流传入read方法的缓冲区,并通知应用程序;对于写操作而言,当操作系统将write方法传递的流写入完毕时,操作系统主动通知应用程序。其中的read/write方法,会返回一个带回调函数的对象,当执行完读取/写入操作后,直接调用回调函数。主要在java.nio.channels包下增加了下面四个异步通道:
AsynchronousSocketChannel 客户端异步socket
AsynchronousServerSocketChannel 服务器异步socket
AsynchronousFileChannel 用于文件异步读写
AsynchronousDatagramChannel UDP

BIO是一个连接一个线程。
NIO是一个请求一个线程。
AIO是一个有效请求一个线程。
先来个例子理解一下概念,以银行取款为例: 

同步 : 自己亲自出马持银行卡到银行取钱(使用同步IO时,Java自己处理IO读写);
异步 : 委托一小弟拿银行卡到银行取钱,然后给你(使用异步IO时,Java将IO读写委托给OS处理,需要将数据缓冲区地址和大小传给OS(银行卡和密码),OS需要支持异步IO操作API);
阻塞 : ATM排队取款,你只能等待(使用阻塞IO时,Java调用会一直阻塞到读写完成才返回);
非阻塞 : 柜台取款,取个号,然后坐在椅子上做其它事,等号广播会通知你办理,没到号你就不能去,你可以不断问大堂经理排到了没有,大堂经理如果说还没到你就不能去(使用非阻塞IO时,如果不能读写Java调用会马上返回,当IO事件分发器会通知可读写时再继续进行读写,不断循环直到读写完成)

Epoll

epoll fd有一个私有的struct eventpoll,它记录哪一个fd注册到了epfd上。eventpoll 同样有一个等待队列,记录所有等待的线程。还有一个预备好的fd列表,这些fd可以进行读或写。

函数声明:int epoll_create(int size)
该函数生成一个epoll专用的文件描述符。它其实是在内核申请一空间,用来存放你想关注的socket fd上是否发生以及发生了什么事件。size就是你在这个epoll fd上能关注的最大socket fd数。创建一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大。这个参数不同于select()中的第一个参数,给出最大监听的fd+1的值。需要注意的是,当创建好epoll句柄后,它就是会占用一个fd值,在linux下如果查看/proc/进程id/fd/,是能够看到这个fd的,所以在使用完epoll后,必须调用close()关闭,否则可能导致fd被耗尽。
创建一个 epoll instance,实际上是创建了一个 eventpoll 实例,包含了红黑树以及一个双向链表。源码位置https://github.com/torvalds/linux/blob/master/fs/eventpoll.c#L177
红黑树的叶子节点都是 epitem 结构体。关于各项的解释,注释里已经说的比较清楚了。我们关心的应该是,当往这棵红黑树上添加、删除、修改节点的时候,我们从(用户态)程序代码中能操作的是一个 fd,即一个 socket 对应的 file descriptor,所以一个 epitem 实例与一个 socket fd 一一对应。另外还需要注意到的是 rdllink 这个变量,这个指向了上一步创建的 evnetpoll 实例中的成员变量 rdllist,也就是那个就绪链表。这里很重要,注意留意,后面会讲到。当然,我们还需要关注的是 event 这个变量,代表了我们针对这个 socket fd 关心的事件,比如 EPOLLIN、EPOLLOUT。通过上述的讲解应该大致明白了,当我们使用 socket() 或者 accept() 得到一个 socket fd 时,我们添加到这棵红黑树上的是一个结构体,与这个 socket fd 一一对应。
当我们通过 socket() 以及 accept() 获取到一个 socket 对象时,这个 socket 对象到底有哪些东西呢?可以看到,一个 socket 实例包含了一个 file 的指针,以及一个 socket_wq 变量。其中 socket_wq 中的 wait 表示等待队列,fasync_list 表示异步等待队列。
那么等待队列和异步等待队列中有什么呢?大致来说,等待队列和异步等待队列中存放的是关注这个 socket 上的事件的进程。区别是等待队列中的进程会处于阻塞状态,处于异步等待队列中的进程不会阻塞。

再简单总结一下收包以及触发的过程:

1
2
3
4
5
6
包从网卡进来
一路经过各个子系统到达内核协议栈(传输层)
内核根据包的 {src_ip:src_port, dst_ip:dst_port} 找到 socket 对象(内核维护了一份四元组和 socket 对象的一一映射表)
数据包被放到 socket 对象的接收缓冲区
内核唤醒 socket 对象上的等待队列中的进程,通知 socket 事件
进程唤醒,处理 socket 事件(read/write)

epoll触发:
上面其实提到了等待队列,每当我们创建一个 socket 后(无论是 socket()函数 还是 accept() 函数),socket 对象中会有一个进程的等待队列,表示某个或者某些进程在等待这个 socket 上的事件。
但是当我们往 epoll 红黑树上添加一个 epitem 节点(也就是一个 socket 对象,或者说一个 fd)后,实际上还会在这个 socket 对象的 wait queue 上注册一个 callback function,当这个 socket 上有事件发生后就会调用这个 callback function。这里与上面讲到的不太一样,并不会直接 wake up 一个等待进程,需要注意一下。
简单讲就是,这个 socket 在添加到这棵 epoll 树上时,会在这个 socket 的 wait queue 里注册一个回调函数,当有事件发生的时候再调用这个回调函数(而不是唤醒进程)。
很简单,这个回调函数会把这个 socket 添加到创建 epoll instance 时对应的 eventpoll 实例中的就绪链表上,也就是 rdllist 上,并唤醒 epoll_wait,通知 epoll 有 socket 就绪,并且已经放到了就绪链表中,然后应用层就会来遍历这个就绪链表,并拷贝到用户空间,开始后续的事件处理(read/write)。
所以这里其实就体现出与 select 的不同, epoll 把就绪的 socket 给缓存了下来,放到一个双向链表中,这样当唤醒进程后,进程就知道哪些 socket 就绪了,而 select 是进程被唤醒后只知道有 socket 就绪,但是不知道哪些 socket 就绪,所以 select 需要遍历所有的 socket。
另外,应用程序遍历这个就绪链表,由于就绪链表是位于内核空间,所以需要拷贝到用户空间,这里要注意一下,网上很多不靠谱的文章说用了共享内存,其实不是。由于这个就绪链表的数量是相对较少的,所以由内核拷贝这个就绪链表到用户空间,这个效率是较高的。
上面可以看到,这里确确实实是从内核复制 rdllist 到用户空间,非共享内存。应用程序调用 epoll_wait 返回后,开始遍历拷贝回来的内容,处理 socket 事件。
至此,从注册一个 file descriptor(socket fd) 到 epoll 红黑树,到这个 socket 上有数据包从网卡进来,再到如何触发 epoll,再到应用程序的用户空间,由应用程序开始 read/write 事件的整个过程就理顺了。

accept 的惊群效应。
先解释一下什么是惊群,如果一个 socket 上有多个进程在同时等待事件,当事件触发后,内核可能会唤醒多个或者所有在等待的进程,然而只会有一个进程成功获取该事件,其他进程都失败,这种情况就叫惊群,会一定程度浪费 cpu,影响性能。

流程:
epoll 在内核开辟了一块缓存,用来创建 eventpoll 对象,并返回一个 file descriptor 代表 epoll instance
这个 epoll instance 中创建了一颗红黑树以及一个就绪的双向链表(当然还有其他的成员)
红黑树用来缓存所有的 socket,支持 O(log(n)) 的插入和查找,减少后续与用户空间的交互
socket 就绪后,会回调一个回调函数(添加到 epoll instance 上时注册到 socket 的)
这个回调函数会把这个 socket 放到就绪链表,并唤醒 epoll_wait
应用程序拷贝就绪 socket 到用户空间,开始遍历处理就绪的 socket
如果有新的 socket,再添加到 epoll 红黑树上,重复这个过程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
// include/linux/rbtree.h
struct rb_node {
unsigned long __rb_parent_color;
struct rb_node *rb_right;
struct rb_node *rb_left;
} __attribute__((aligned(sizeof(long)))); // 可以指定特殊属性 字节对齐空间
/* The alignment might seem pointless, but allegedly CRIS needs it */

struct rb_root {
struct rb_node *rb_node;
};

// 你可以惊奇的发现红黑树的节点rb_node没有提供额外的属性字段去存储实际数据,所以它的用法和我们常见的红黑树有点不一样。
// 例子
struct binder_proc {
//红黑树的根节点
struct rb_root nodes; // binder_proc进程内的binder实体组成的红黑树(关联binder_node->rb_node)
};

//nodes这个红黑树的节点的存储对象
struct binder_node {
struct rb_node rb_node; // 如果这个Binder实体还在使用,则将该节点链接到proc->nodes中。
};

// 遍历方法
struct rb_node *n;
for (n = rb_first(&proc->nodes); n != NULL; n = rb_next(n)) {
//将红黑树对象转化为binder_node ,rb_entry函数类似上文的getParentAddr
struct binder_node *node = rb_entry(n, struct binder_node,
rb_node);
}

// 同理 epi = rb_entry(rbp, struct epitem, rbn); 红黑树也是这样把rbp转换成epitem

// /linux/rbtree.h

#define rb_entry(ptr, type, member) container_of(ptr, type, member)
// 重要 它的作用是:设ptr是某个type类型结构中的member成员变量的地址,该宏的结果是得到该type的地址。即有结构中成员变量的地址得到结构的地址。非常高级的样子。

// include/linux/kernel.h
#define container_of(ptr, type, member) ({ \
void *__mptr = (void *)(ptr); \
((type *)(__mptr - offsetof(type, member))); })

// include/linux/stddef.h
#undef offsetof
#ifdef __compiler_offsetof
#define offsetof(TYPE, MEMBER) __compiler_offsetof(TYPE, MEMBER)
#else
#define offsetof(TYPE, MEMBER) ((size_t)&((TYPE *)0)->MEMBER)
#endif
// ------------------------------------------------------------------------------


/*
* This structure is stored inside the "private_data" member of the file
* structure and represents the main data structure for the eventpoll
* interface.
*/
struct eventpoll {
/* Lock which protects rdllist and ovflist */
spin_lock_t lock; // 对本数据结构的访问
/*
* This mutex is used to ensure that files are not removed
* while epoll is using them. This is held during the event
* collection loop, the file cleanup path, the epoll file exit
* code and the ctl operations.
*/
struct mutex mtx; // 防止使用时被删除
/*
* 等待队列可以看作保存进程的容器,在阻塞进程时,将进程放入等待队列;
* 当唤醒进程时,从等待队列中取出进程
*/
/* Wait queue used by sys_epoll_wait() */
wait_queue_head_t wq; // sys_epoll_wait() 使用的等待队列
/* Wait queue used by file->poll() */
wait_queue_head_t poll_wait; // file->poll()使用的等待队列
/* List of ready file descriptors */
struct list_head rdllist; // 就绪链表
/* RB tree root used to store monitored fd structs */
struct rb_root_cached rbr; // 用于管理所有fd的红黑树(树根)
/*
* This is a single linked list that chains all the "struct epitem" that
* happened while transferring ready events to userspace w/out
* holding ->lock.
*/
struct epitem *ovflist; // 将事件到达的fd进行链接起来发送至用户空间
...
}

struct epitem {
union { // struts union 嵌套使用,可以理解为在同一层
/* RB tree node links this structure to the eventpoll RB tree */
struct rb_node rbn; // 重点:用于主结构管理的红黑树
/* Used to free the struct epitem */
struct rcu_head rcu;
};
/* List header used to link this structure to the eventpoll ready list */
struct list_head rdllink; // 重点:事件就绪链表
/*
* Works together "struct eventpoll"->ovflist in keeping the
* single linked chain of items.
*/
struct epitem *next; // 用于主结构体中的链表
/* The file descriptor information this item refers to */
struct epoll_filefd ffd; // 这个结构体对应的被监听的文件描述符信息
/* List containing poll wait queues */
struct list_head *pwqlist; // 双向链表,保存着被监视文件的等待队列
/* The "container" of this item */
struct eventpoll *ep; // 该项属于哪个主结构体(多个epitm从属于一个eventpoll)
/* List header used to link this item to the "struct file" items list */
struct list_head fllink; // --重点: 双向链表,用来链接被监视的文件描述符对应的struct
/* wakeup_source used when EPOLLWAKEUP is set */
struct wakeup_source __rcu *ws;
/* The structure that describe the interested events and the source fd */
struct epoll_event event; // 重点:注册的感兴趣的事件,也就是用户空间的epoll_event
}
// 从上面可知道,最核心的就是这两个结构体。红黑树直接用系统里面的那个,还有双向链表也类似。比如下面这段代码:

static void unlist_file(struct epitems_head *head)
{
struct epitems_head *to_free = head;
struct hlist_node *p = rcu_dereference(hlist_first_rcu(&head->epitems));
if (p) {
struct epitem *epi= container_of(p, struct epitem, fllink);
spin_lock(&epi->ffd.file->f_lock);
if (!hlist_empty(&head->epitems))
to_free = NULL;
head->next = NULL;
spin_unlock(&epi->ffd.file->f_lock);
}
free_ephead(to_free);
}
// 还可以重点关注一下 ep_insert ep_remove ep_rbtree_insert INIT_LIST_HEAD等方法看是如何新增删除元素的

函数声明:int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
该函数用于控制某个epoll文件描述符上的事件,可以注册事件,修改事件,删除事件。
参数:
epfd:由 epoll_create 生成的epoll专用的文件描述符;
op:要进行的操作例如注册事件,可能的取值EPOLL_CTL_ADD 注册、EPOLL_CTL_MOD 修 改、EPOLL_CTL_DEL 删除
fd:关联的文件描述符;
event:指向epoll_event的指针;
如果调用成功返回0,不成功返回-1

int epoll_ctl(int epfd, intop, int fd, struct epoll_event*event); 
epoll的事件注册函数,它不同与select()是在监听事件时告诉内核要监听什么类型的事件,而是在这里先注册要监听的事件类型。
第一个参数是epoll_create()的返回值 ,
第二个参数表示动作,用三个宏来表示 :
EPOLL_CTL_ADD:              注册新的fd到epfd中;
EPOLL_CTL_MOD:            修改已经注册的fd的监听事件;
EPOLL_CTL_DEL:                从epfd中删除一个fd;
第三个参数 是需要监听的fd ,
第四个参数 是告诉内核需要监听什么事件
events可以是以下几个宏的集合:
EPOLLIN:                        触发该事件,表示对应的文件描述符上有可读数据。(包括对端SOCKET正常关闭);
EPOLLOUT:                  触发该事件,表示对应的文件描述符上可以写数据;
EPOLLPRI:                      表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
EPOLLERR:                表示对应的文件描述符发生错误;
EPOLLHUP:                表示对应的文件描述符被挂断;
EPOLLET:                      将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。
EPOLLONESHOT:    只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里。
函数声明:int epoll_wait(int epfd,struct epoll_event * events,int maxevents,int timeout)
该函数用于轮询I/O事件的发生;
参数:
epfd:由epoll_create 生成的epoll专用的文件描述符;
epoll_event:用于回传代处理事件的数组;
maxevents:每次能处理的事件数;
timeout:等待I/O事件发生的超时值(单位我也不太清楚);-1相当于阻塞,0相当于非阻塞。一般用-1即可
返回发生事件数。
epoll_wait运行的原理是
等侍注册在epfd上的socket fd的事件的发生,如果发生则将发生的sokct fd和事件类型放入到events数组中。
并 且将注册在epfd上的socket fd的事件类型给清空,所以如果下一个循环你还要关注这个socket fd的话,则需要用epoll_ctl(epfd,EPOLL_CTL_MOD,listenfd,&ev)来重新设置socket fd的事件类型。这时不用EPOLL_CTL_ADD,因为socket fd并未清空,只是事件类型清空。这一步非常重要。

epoll使用的资料网上一大把,EPOLLIN(读)监听事件的类型,大家一般使用起来一般没有什么疑问,无非是监听某个端口,一旦客户端连接有数据发送,它马上通知服务端有数据,一般用一个回调的读函数,从这个相关的socket接口读取数据就行了。但是有关EPOLLOUT(写)监听的使用,网上的资料却讲得不够明白,理解起来有点麻烦。因为监听一般都是被动操作,客户端有数据上来需要读写(被动的读操作,EPOLIN监听事件很好理解,但是服务器给客户发送数据是个主动的操作,写操作如何监听呢?

如果将客户端的socket接口都设置成 EPOLLIN | EPOLLOUT(读,写)两个操作都设置,那么这个写操作会一直监听,有点影响效率。经过查阅大量资料,我终于明白了EPOLLOUT(写)监听的使用场,一般说明主要有以下三种使用场景:

1: 对客户端socket只使用EPOLLIN(读)监听,不监听EPOLLOUT(写),写操作一般使用socket的send操作
2:客户端的socket初始化为EPOLLIN(读)监听,有数据需要发送时,对客户端的socket修改为EPOLLOUT(写)操作,这时EPOLL机制会回调发送数据的函数,发送完数据之后,再将客户端的socket修改为EPOLL(读)监听
3:对客户端socket使用EPOLLIN 和 EPOLLOUT两种操作,这样每一轮epoll_wait循环都会回调读,写函数,这种方式效率不是很好。

程序思路:

  1. 服务端循环调用epoll_wait等待并且获取活动事件,返回文件描述符的个数。如果 timeout为0,则表示 epoll_wait在 rdllist链表中为空,立刻返回,不会等待。(epoll_wait系统调用,如果没有事件,所以需要睡眠(阻塞)。当有事件到来时,睡眠会被ep_poll_callback函数唤醒,ep_poll_callback唤醒等待队列中的进程)
  2. 循环步骤一返回的描述符个数。判断事件对应的状态。根据事件状态读取对应的数据,或者写入数据。通过epoll_ctl更新红黑树中sockfd注册监听的事件。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
for( ; ; )
{
nfds = epoll_wait(epfd,events,20,500); // epoll_wait系统调用获取活跃事件文件描述符个数
for(i=0; i<nfds; ++i)
{
if(events[i].data.fd==listenfd) // 有新的连接;我们可以注册多个FD, 如果内核发现事件,就会载入events,如果有我们要的描述符也就是listenfd,说明某某套接字监听描述符所对应的事件发生了变化。每次最多监测20个fd数。
{
connfd = accept(listenfd,(sockaddr *)&clientaddr, &clilen); // accept 这个连接
ev.data.fd = connfd;
ev.events = EPOLLIN|EPOLLET; // LT
epoll_ctl(epfd,EPOLL_CTL_ADD,connfd,&ev); // 将新的fd添加到epoll的监听中,EPOLLIN|EPOLLET
}
else if( events[i].events&EPOLLIN ) // 接收到数据,读socket,数据可读标志EPOLLIN
{
n = read(sockfd, line, MAXLINE)) < 0 // 读
ev.data.ptr = md; // md为自定义类型,添加数据
ev.events = EPOLLOUT|EPOLLET;
epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev);//修改标识符,等待下一个循环时发送数据,异步处理的精髓
}
else if(events[i].events&EPOLLOUT) // 有数据待发送,写socket
{
struct myepoll_data* md = (myepoll_data*)events[i].data.ptr; // 取数据
sockfd = md->fd;
send( sockfd, md->ptr, strlen((char*)md->ptr), 0 ); // 发送数据
ev.data.fd = sockfd;
ev.events = EPOLLIN|EPOLLET;
epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev); // 修改标识符,等待下一个循环时接收数据
}
else
{
//其他的处理
}
}
}

epoll_wait ,timeout,详细使用epoll因为源码里对timeout不为0的情况下,还有些额外处理,引起其他耗时。如果使用epoll_wait()如果明确知道这次能取到东西下次直接把timeout设置为0,其实是可以稍微提升点性能的。
https://github.com/torvalds/linux/blob/master/fs/eventpoll.c#L1759

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events,
int, maxevents, int, timeout)
{
struct timespec64 to;

return do_epoll_wait(epfd, events, maxevents,
ep_timeout_to_timespec(&to, timeout));
}

static struct timespec64 *ep_timeout_to_timespec(struct timespec64 *to, long ms)
{
struct timespec64 now;

// 如果配-1,是会返回NULL的,这个NULL指针在后续是表示block的
if (ms < 0)
return NULL;

// timeout配0
if (!ms) {
to->tv_sec = 0;
to->tv_nsec = 0;
return to;
}

...// 配其他超时时间,一些处理
return to;
}
static int do_epoll_wait(int epfd, struct epoll_event __user *events,
int maxevents, struct timespec64 *to)
{
...
/* Time to fish for events ... */
error = ep_poll(ep, events, maxevents, to);
...
}

static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
int maxevents, struct timespec64 *timeout)
{
1、初始化超时相关... //timed_out为超时flag,初始值为0
lockdep_assert_irqs_enabled();

// 用户设置timeout不为0,配了某个超时时间
if (timeout && (timeout->tv_sec | timeout->tv_nsec)) {
...
}
// 用户设置timeout为0
else if (timeout) {
//则置超时flag为true,避免空跑等待
timed_out = 1;
}

// 这里是关键:只要有事件响应,都会立刻返回,与你配的timeout无关
/*
* This call is racy: We may or may not see events that are being added
* to the ready list under the lock (e.g., in IRQ callbacks). For cases
* with a non-zero timeout, this thread will check the ready list under
* lock and will add to the wait queue. For cases with a zero
* timeout, the user by definition should not care and will have to
* recheck again.
*/
eavail = ep_events_available(ep);

while (1) {
if (eavail) {
2. 进入循环,如果有事件要处理// 如果这里有事件响应,则可以返回用户态
/*
* Try to transfer events to user space. In case we get
* 0 events and there's still timeout left over, we go
* trying again in search of more luck.
*/
res = ep_send_events(ep, events, maxevents);
if (res)
return res;
}
3. 没有事件、且不等待
// 如果没有事件响应,但用户的timeout为0,立刻返回
if (timed_out)
return 0;
... //timeout为非0的处理

4. 没有事件、进入忙碌的loop...
// 待着是否block的flag去轮询
eavail = ep_busy_loop(ep, timed_out);
if (eavail)
continue;
// 有信号发生,中断
if (signal_pending(current))
return -EINTR;
...

5. 没有事件、使用ep->wq等待...

// wait的唤醒,有严格的实现,但竞争状态太复杂了,先看怎么用。
/*
* Internally init_wait() uses autoremove_wake_function(),
* thus wait entry is removed from the wait queue on each
* wakeup. Why it is important? In case of several waiters
* each new wakeup will hit the next waiter, giving it the
* chance to harvest new event. Otherwise wakeup can be
* lost. This is also good performance-wise, because on
* normal wakeup path no need to call __remove_wait_queue()
* explicitly, thus ep->lock is not taken, which halts the
* event delivery.
*/
init_wait(&wait);
write_lock_irq(&ep->lock);
__set_current_state(TASK_INTERRUPTIBLE);

// 最后检查一下
eavail = ep_events_available(ep);

// 真的没有事件可以做
// 则将当前进程加入epoll的等待队列
if (!eavail)
__add_wait_queue_exclusive(&ep->wq, &wait);

write_unlock_irq(&ep->lock);
...
6. 加到等待队列之后,主动让出CPU...
if (!eavail)
timed_out = !schedule_hrtimeout_range(to, slack,
HRTIMER_MODE_ABS);
__set_current_state(TASK_RUNNING);

7. 被唤醒
eavail = 1; // 默认下一轮要处理事件

// 如果被唤醒了但还在wait list里,这里有很多竞争状态,需要重新检查
if (!list_empty_careful(&wait.entry)) {
write_lock_irq(&ep->lock);
// 说明是超时唤醒的,但现在已经为不在等待队列里,则这里也需要让下一轮while处理事情
if (timed_out)
eavail = list_empty(&wait.entry);
__remove_wait_queue(&ep->wq, &wait);
write_unlock_irq(&ep->lock);
}
}
}

epoll_wait()只要里边还有事件响应,这句调用都会立刻给你返回,而与timeout配多少没关系,不会因为不同的timeout有额外开销

init_wait(&wait) 比较复杂,涉及操作系统里面的等待队列设计进程、线程状态、进程模型等。java中wait不占用cpu 会释放锁,sleep不会释放锁,对于CPU资源来说,不管是哪种方式暂停的线程,都表示它暂时不再需要CPU的执行时间。OS会将执行时间分配给其它线程。c语言#include<sys/wait.h> wait()会暂时停止进程的执行,直到有信号来到或子进程结束。

疑问

epoll到底用没用到mmap?
select poll epoll 都是对poll机制的封装。epoll里面没有mmap相关代码。并没有用到内核态内存映射到用户态的技术。但是这个技术是存在的。dpdk,跟netmap绕过内核的tcp/ip协议栈,在用户态协议栈处理。减少中断,上下文切换等开销)就有用到内核态内存映射到用户态技术。要提升性能可以使用基于DPDK的Redis 回环口测试性能提高明显。

libevent:事件驱动,高性能;轻量级,专注于网络;跨平台,支持Windows、Linux、Mac Os等;支持多种 I/O多路复用技术,epoll、poll、dev/poll、select 和kqueue 等; 支持I/O,定时器和信号等事件;

mv与rename区别?
mv is a basic command line designed to do one thing and do it well (Unix philosophy) : move file(s) or directorie(s).
mv is a standard utility to move one or more files to a given target. It can be used to rename a file, if there’s only one file to move. If there are several, mv only works if the target is directory, and moves the files there.
So mv foo bar will either move the file foo to the directory bar (if it exists), or rename foo to bar (if bar doesn’t exist or isn’t a directory). mv foo1 foo2 bar will just move both files to directory bar, or complain if bar isn’t a directory.
mv will call the rename() C library function to move the files, and if that doesn’t work (they’re being moved to another filesystem), it will copy the files and remove the originals.
If all you have is mv and you want to rename multiple files, you’ll have to use a shell loop. There are a number of questions on that here on the site, see e.g. this, this, and others.

rename() only works on the same device, it just changes its name(or “moves” the name to another directory). rename() cannot move the file data from one location to another.
If you want to copy or move the file, you need to do it yourself:

  1. open the source and destination file
  2. read() from the source file, write to the destination file in a loop until the end.
  3. unlink() the source file (only if you want to move it.)

复制文件的话sendfile也比较快,调用shell的mv方法不安全,性能也不高。

问题

netty导致的too many files open、linux系统句柄超负荷?
升级netty版本,或升级springboot和cloud的版本试试。
重连的时候释放资源试试:

1
2
3
4
5
this.workerGroup.shutdownGracefully();
this.bossGroup.shutdownGracefully();

// ---
bootstrap.releaseExternalResources()

参考