基础概念
BIO:同步阻塞式IO,服务器实现模式为一个连接一个线程,即客户端有连接请求时服务器端就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销,当然可以通过线程池机制改善。
NIO:同步非阻塞式IO,服务器实现模式为一个请求一个线程,即客户端发送的连接请求都会注册到多路复用器上,多路复用器轮询到连接有I/O请求时才启动一个线程进行处理。
Java AIO(NIO.2) : 异步非阻塞,服务器实现模式为一个有效请求一个线程,客户端的I/O请求都是由OS先完成了再通知服务器应用去启动线程进行处理
NIO方式适用于连接数目多且连接比较短(轻操作)的架构,比如聊天服务器,并发局限于应用中,编程比较复杂,JDK1.4开始支持。
AIO方式使用于连接数目多且连接比较长(重操作)的架构,比如相册服务器,充分调用OS参与并发操作,编程比较复杂,JDK7开始支持
NIO采用轮询的方式,但是AIO不需要,AIO框架在windows下使用windows IOCP技术,在Linux下使用epoll多路复用IO技术模拟异步IO,向操作系统注册监听,操作系统数据准备好会主动通知应用程序(订阅、通知模式),不再需要selector轮询,由channel通道直接到操作系统注册监听。
NIO和AIO
NIO:会等数据准备好后,再交由应用进行处理,数据的读取/写入过程依然在应用线程中完成,只是将等待的时间剥离到单独的线程中去,节省数据准备时间,因为多路复用机制,Selector会得到复用,对于那些读写过程时间长的,NIO就不太适合。NIO适合读写连接多的小数据。数据量大的场景BIO也是很适合的。
AIO:读完(内核内存拷贝到用户内存)了系统再通知应用,使用回调函数,进行业务处理,AIO能够胜任读写过程长的任务。
NIO 同步非阻塞概念
Selector提供选择已经就绪的任务的能力:
Selector轮询注册在其上的Channel,如果某个Channel发生读写请求并且Channel就处于就绪状态,会被Selector轮询出来,然后通过SelectionKey可以获取就绪Channel的集合,进行后续的I/O操作。(同步)
一个Selector可以同时轮询多个Channel,因为JDK使用了epoll()代替传统的select实现,所以没有最大连接句柄1024/2048的限制。所以,只需要一个线程负责Selector的轮询,就可以接入成千上万的客户端。(非阻塞)
网络编程注意事项
- 黏包、断包问题解决,网络上很多DEMO都不是很完善
- 断线重连 客户端读取数据出现异常间隔5秒重连,客户端心跳判断连接断开间隔5秒重连,发送数据失败重连(没有连上,间隔5秒重连)。
- 业务数据包补发。考虑到网络环境的不稳定性、多变性(比如从进入电梯、进入地铁、移动网络切换到wifi等),在消息发送的时候,发送失败的概率其实不小,这时消息重发机制就很有必要了。发送消息时,除了心跳消息、握手消息、状态报告消息外,消息都加入消息发送超时管理器,立马开启一个定时器,比如每隔5秒执行一次,共执行3次,在这个周期内,如果消息没有发送成功,会进行3次重发,达到3次重发后如果还是没有发送成功,那就放弃重发,移除该消息,同时通过消息转发器通知应用层,由应用层决定是否再次重发。如果消息发送成功,服务端会返回一个消息发送状态报告,客户端收到该状态报告后,从消息发送超时管理器移除该消息,同时停止该消息对应的定时器即可。客户端发送消息,如果服务端没有反馈还可以在客户端做补发,失败的地方把该消息加入到队列,连接成功后查看队列从队列中取出补发该消息。在用户握手认证成功时,应该检查消息发送超时管理器里是否有发送超时的消息,如果有,则全部重发。还可以在发送失败报异常的时候把该消息加入到重发队列,重连成功后补发。
- java原生的BIO开发,一般会专门启动一个线程负责读取数据,其实读写就是对socket的inputStream、outputStream进行操作。把读取到的完整包解析出来给业务线程池处理,把剩余的不完整包跟下一次读取到的数据合并。nio一般是通过类似事件通知机制实现不阻塞。
- 心跳包:主要是为了防止NAT超时(比如手机连接互联网,运营商的网关就做了NAT映射,把无线网跟因特网对接,隔一段时间没有通信链接会被网关释放。防火墙也会把一段时间内没通信的链接断开),探测连接是否断开,由客户端发送比较合理。链路断开, 没有写操作的TCP连接是感知不到的, 除非这个时候发送数据给服务器, 造成写超时, 否则TCP连接不会知道断开了。主动kill掉一方的进程, 另一方会关闭TCP连接, 是系统代进程给服务器发的FIN. TCP连接就是这样, 只有明确的收到对方发来的关闭连接的消息(收到RST也会关闭, 大家都懂), 或者自己意识到发生了写超时, 否则它认为连接还存在。但是网路复杂中途出现的问题也会比较常见,譬如网线被掐断,对象进程被杀掉,频繁丢包,对方这时候的TCP长连接是不可使用的,但是对于应用层并不知道。如果需要知道当前的网络状况则需要很复杂的超时进行了解,TCP底层就实现了这样的功能,心跳机制是TCP在一段时间间隔后发送确定连接是否存在,如果确定存在的话,就会回传一个包来确定连接是存在的,如果没有返回包的话,则应该通知上层,网络出现了问题,需要进行连接失败的操作了。手动关闭客户端进程,事实上并不能测试出想要的结果,因为进程是在应用层的,所以,这种测试方法不能保证网络驱动层也不发送数据报文给服务器。经过测试发现,当应用层强制结束进程时,对于TCP连接,驱动层会发送reset数据包!而服务器收到这个数据包就可以正常关闭了!那么,如果拔掉网线呢,服务器收不到这个数据包,就会导致死连接存在!所以,心跳包是必要的,或者应用TCP协议本身的Keep-alive来设置SO_KEEPALIVE。减轻服务端压力服务端可以不回复心跳包,服务端一定时间内没有收到心跳包就断开该连接。
- 分包的方式:包头+长度、固定长度、分隔符(回车、自定义字符串)
常用方法
分包封装
- 分隔符:
DelimiterBasedFrameDecoder
- 固定长度:
FixedLengthFrameDecoder
- 按行分隔:
LineBasedFrameDecoder
遇到一个换行符,则认为是一个完整的报文 - 自定义长度帧解码器:
LengthFieldBasedFrameDecoder
可以指定最大包长度、长度域大小与位置,new LengthFieldBasedFrameDecoder(1024,0,2),一个包最好不要超过2048- lengthFieldOffset = 0;
- lengthFieldLength = 2;
- lengthAdjustment = 0;
- initialBytesToStrip = 0。
自带的解析器遇到错误包怎么处理?
比如LengthFieldBasedFrameDecoder,遇到错误包会进行丢弃,保证后续发送过来的正常包可以继续解析。比如指定包最大长度为1024,解析到的长度超过1024就会报错丢弃,如果有包头还可以initialBytesToStrip参数设置成0,在处理程序中校验包头。如果长度指定成65536,如果中间有包给的长度是错误的,比如设置成3万,就可能出现需要丢弃掉3万个字节后面的内容才可以继续正常解析。
如果想要保住万无一失,就必须增加包头、包尾、长度、MD5校验码,进行组合校验,可以保证业务包出错的情况还能比较正常的运行。一般情况下直接用框架自带的Decoder就够用了。
编解码
StringDecoder
、StringEncoder
网络编程可能问题
多进程通过数据库进行交互
通过数据库进行交互可能出现一个进程往数据库里写入了一条记录,通知另一个进程去读取。但是写入操作还未提交另一个进程就去读取数据了,读取到的数据为空,特别是在需要批量操作的场景容易出现。所以需要根据情况写入后进行提交操作,提交成功然后再通知另一个进程读取。
1 | // java spring ibatis 提交的代码 |
1 | <tx:annotation-driven transaction-manager="transactionManagerIT"/> |
或者直接通过配置
1 | <bean id="baseTransactionProxyIT" class="org.springframework.transaction.interceptor.TransactionProxyFactoryBean" abstract="true"> |
Netty 主要功能介绍
心跳
使用空闲时间发送心跳。
1 | public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) { |
- readerIdleTimeSeconds: 读超时. 即当在指定的时间间隔内没有从 Channel 读取到数据时, 会触发一个 READER_IDLE 的 IdleStateEvent 事件.
- writerIdleTimeSeconds: 写超时. 即当在指定的时间间隔内没有数据写入到 Channel 时, 会触发一个 WRITER_IDLE 的 IdleStateEvent 事件.
- allIdleTimeSeconds: 读/写超时. 即当在指定的时间间隔内没有读或写操作时, 会触发一个 ALL_IDLE 的 IdleStateEvent 事件.
语法说明
1 | f.channel().closeFuture().sync();//等待服务端关闭端口监听。主线程执行到这里就 wait 子线程结束,子线程才是真正监听和接受请求的,closeFuture()是开启了一个channel的监听器,负责监听channel是否关闭的状态,如果监听到channel关闭了,子线程才会释放 |
与spring集成
1 | // ==============================存在问题======================================================= |
注意要点
- 服务端使用
serverBootstrap.bind(port).sync();
形式监听指定端口,不要用new InetSocketAddress(ip,port)
这种限制IP的形式 - Netty中ByteToMessageDecoder中的decode方法执行多次的问题,再查看decode方法的源码注释如下:This method will be called till either the input has nothing to read,意思是说:ByteBuf对象的数据没有读完的话,decode方法会一直调用。
readerIndex
会随着数据的读取而不断增加,所以保证每次decode读取一个完整包,如果不满足一个完整包就重置readerIndex
退出下次再进来读。 - 如果使用的是分隔符的数据包还可以先查找分隔符,如果没有找到分隔符却发现包里有内容可以进行跳过操作,丢弃这些错误数据,找到分隔符了就可以进行读取,验证长度,读取包内容等操作,读取完一个完整包退出等待系统自动调用decode继续后续包操作,当然也可以自己代码判断后面如果还有内容直接进行后续包处理。
- 注意不要把没用到的netty自带的decoder解码器加上去,要根据具体需求加,不然会导致解码错误。还要注意decoder是否是可共享的如果是不可共享的得用new
- write read 数据类型默认使用ByteBuf
- 应用开发的时候往往是异步的,可能要经过多个应用,所以需要整条链路的标识,可以使用sessionid、token、id等做标识,这样返回的时候才能找到到底是谁发送的这个请求,也可以各个子应用自己生成标识单独维护各自请求的对应关系
- 一般来说都是通过Decoder或者Encoder进行对设备上报与下发的协议内容的编解码转换成POJO,Handler中直接用POJO处理业务逻辑或者再进行分发到业务模块
功能扩展
启用一个端口解析不同设备多种协议
- 可以针对不同协议定义一个XXXHandler,定义一个入口Handler方法,在入口Handler判断协议头类型调用对应的XXXHandler处理类处理对应数据(可以保证handler的数据足够分析包类型,方案可行)
- 定义CustomDecoder,在Decoder中判断需要调用的Handler(可以保证Decode中的数据足够分析包类型,方案可行)。一般来说都是通过Decoder或者Encoder进行协议的编解码转换成POJO,Handler中直接用POJO处理业务逻辑,如果使用pojo下发命令还得有Encoder,如果直接用字节流处理了就不用。
- 说明:不同客户的连接过来发送的包是不会粘在一起的,客户端1的包不会跟客户的2的包粘在一起,只有可能客户端1的包1跟客户的1的包2粘在一起,如果多个客户端都共用一个内存存放包数据是会有问题的,会无法区分数据是谁的,就得在数据包中加标识说明是谁的包,正常的话底层框架都会分装好了,不会暴露这么原始的接口。很多c程序都是一个链接给一个独立buf。
- 其他方案:一个进程一个端口解析一种协议,一个进程多个端口解析不同协议
其他扩展框架
- SOFABolt:蚂蚁金融服务集团开发的一套基于 Netty 实现的网络通信框架
- smart-socket
- t-io
- mina
参考
- Netty实现心跳机制与断线重连
- 开源一个自用的Android IM库,基于Netty+TCP+Protobuf实现
- netty 例子 example
- netty 例子 Netty-study
- netty 例子 NettyChat
- netty 例子Jupiter
- 一起学Netty(十四)之 Netty生产级的心跳和重连机制
- 浅析 Netty 实现心跳机制与断线重连
- SpringBoot使用netty服务端和客户端-这个启动写法有问题再tomcat下无法使用,new出来的不受spring管理,而且只IP监听127.0.0.1,要用CommandLineRunner
- Http-Netty-Rpc-Service系统改造
- 【Netty】Springboot整合Netty
- 【Netty】心跳机制与断线重连
- Netty与Spring Boot的整合
- Netty之解决TCP粘包拆包(自定义协议)
- Python基础-Socket基础-1
- 【Netty】Netty之ByteBuf
- 什么是编解码器?
- java netty之ByteToMessageDecoder
- Netty5 Read事件处理过程_源码讲解
- 使用LengthFieldBasedFrameDecoder解码器及自定义
- 看懂通信协议:自定义通信协议设计之TLV编码应用
- Netty实战系列一之多协议并存
- JAVA 中BIO,NIO,AIO的理解