Netty
1、概述
2、线程模型
3、核心组件
3.1 Channel
3.2 ChannelHandler和ChannelPipeline
3.3 EventLoop和EventLoopGroup
3.4 Future和Promise
4、创建过程
4.1 服务端创建过程
4.2 客户端创建过程
5、TCP粘包和拆包
6、序列化与反序列化
-
+
游客
注册
登录
ChannelHandler和ChannelPipeline
## 1 ChannelHandler ### 1.1 功能说明 1. ChannelHandler**类似于 Servlet 的 Filter 过滤器**,**负责对 I/O 事件或者 I/O 操作进行拦截和处理**。 2. 他**可以选择性地拦截和处理自己感兴趣的事件**,**也可以透传和终止事件的传递**。 > 透传,即透明传输(Pass Through),指的是在通讯中不管传输的业务内容如何,只负责将传输的内容由源地址传输到目的地址,而不对业务数据内容做任何改变。 3. 基于 ChannelHandler 接口,用户**可以方便地进行业务逻辑定制**,例如打印日志、统一封装异常消息、性能统计和消息编解码等。 4. ChannelHandler**支持注解**,目前支持的注解有两种: 1. `Shareable`:**多个 ChannelPipeline 共用同一个 ChannelHandler**。 2. `Skip`:**被 `Skip` 注解的方法不会被调用**,**直接被忽略**。 5. 对于大多数的 ChannelHandler**会选择性地拦截和处理某个或某些事件**,**其他的事件会忽略**,**由下一个 ChannelHandler 进行拦截和处理**,这就**会导致一个问题**,**用户 ChannelHandler 必须要实现 ChannelHandler 的所有接口**,**包括他不关心的那些事件处理接口**,这**会导致用户代码的冗余和臃肿**,**代码的可维护性也会变差**,为了解决这个问题,Netty 提供了 ChannelHandlerAdapter 基类,他的**所有接口实现都是事件透传**,**如果用户 ChannelHandler 关心某个事件**,**只需要覆盖 ChannelHandlerAdapter 对应的方法即可**,对于不关心的,可以直接继承使用父类的方法,这样子类的代码就会非常简洁和清晰,ChannelHandlerAdapter 部分代码实现如下所示,我们可以发现这些透传方法被 `@Skip` 注解了,这些方法在执行的过程中被忽略,直接跳到下一个 ChannelHandler 中执行对应的方法: ```java @Skip @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.fireExceptionCaught(cause); } ``` ### 1.2 部分类继承关系 ![](/media/202108/2021-08-24_145140_169106.png) ## 2 ChannelPipeline ### 2.1 功能说明 ChannelPipeline**是 ChanelHandler 的容器**,他**负责 ChannelHandler 的管理和事件拦截与调度**。 #### 2.1.1 事件处理 下图展示了**一个消息被 ChannelPipeline 的 ChannelHandler 链拦截和处理的全过程**: ![](/media/202108/2021-08-24_150034_070629.png) 1. 底层的 SocketChannel 的 `read()` 方法**读取 ByteBuf**,**触发 ChannelRead 事件**,**由 IO 线程 NioEventLoop 调用 ChannelPipeline 的 `fireChannelRead()` 方法**,**将消息传到 ChannelPipeline 中**。 2. **消息依次被 HeadHandler**、**ChannelHandler1**、**ChannelHandler2**、...、**TailHandler 拦截和处理**,在这个过程中,**任何 ChannelHandler 都可以中断当前的流程**,**结束消息的传递**。 3. **调用 ChannelHandlerContext 的 `write()` 方法发送消息**,**消息从 TailHandler 开始**,**途径 ChannelHandlerN**、...、**ChannelHandler1**、**HeadHandler**,**最终被添加到消息发送缓冲区等待刷新和发送**,在此过程中**也可以中断消息的传递**,例如当编码失败时,就需要中断流程,构造异常的 Future 返回。 #### 2.1.2 事件分类 > 如无特殊说明,以下方法均在 `ChannelHandlerContext` 类中。 Netty 中的事件分为 `inbound`**事件和 `outbound` 事件**: 1. `inbound` 事件通常**由 I/O 线程触发**,例如**TCP 链路建立事件**、**链路关闭事件**、**读事件**、**异常通知事件**等,对应于上图的左半部分,触发 `inbound` 事件的方法如下: 1. `fireChannelRegistered()`:**Channel 注册事件**。 2. `fireChannelActive()`:**TCP 链路建立成功**,**Channel 激活事件**。 3. `fireChannelRead()`:**读事件**。 4. `fireChannelReadComplete()`:**读操作完成通知事件**。 5. `fireExceptionCaught()`:**异常通知事件**。 6. `fireUserEventTriggered()`:**用户自定义事件**。 7. `fireChannelWritabilityChanged()`:**Channel 的可写状态变化通知事件**。 8. `fireChannelInactive()`:**TCP 连接关闭**,**链路不可用通知事件**。 2. `outbound` 事件通常是**由用户主动发起的网络 I/O 操作**,例如**用户发起的连接操作**、**绑定操作**、**消息发送等操作**,对应于上图的右半部分,触发 `outbound` 事件的方法如下: 1. `bind()`:**绑定本地地址事件**。 2. `connect()`:**连接服务端事件**。 3. `write()`:**发送事件**。 4. `flush()`:**刷新时间**。 5. `read()`:**读事件**。 6. `disconnect()`:**断开连接事件**。 7. `close()`:**关闭当前 Channel 事件**。 #### 2.1.3 自定义拦截器 1. ChannelPipeline**通过 ChannelHandler 接口来实现事件的拦截和处理**,由于 ChannelHandler 中的事件种类繁多,不同的 ChannelHandler 可能只需要关心其中的某一个或者某几个事件,所以,**通常 ChannelHandler 只需要继承相应的 ChannelHandlerAdapter 类覆盖自己关心的方法即可**。 2. 具体的示例如下: 1. **拦截 `Channel Active` 事件**,**打印 TCP 链路建立成功日志**: ```java public class MyInboundHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("TCP connected!"); ctx.fireChannelActive(); } } ``` 2. **拦截 `Channel Close` 事件**,**在链路关闭时释放资源**: ``` public class MyOutboundHandler extends ChannelOutboundHandlerAdapter { @Override public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { System.out.println("TCP closing ..."); ctx.close(promise); } } ``` #### 2.1.3 构建 ChannelPipeline 1. 事实上,**用户不需要自己创建 ChannelPipeline**,因为**使用 ServerBootstrap 或者 Bootstrap 启动服务端或者客户端时**,**Netty 会为每个 Channel 连接创建一个独立的 ChannelPipeline**,对于使用者而言,只**需要将自定义的拦截器加入到 ChannelPipeline 中即可**,例如: ```java channelPipeline = ch.pipeline(); channelPipeline.addLast("decoder", new MyProtocolDecoder()); channelPipeline.addLast("encoder", new MyProtocolEncoder()); ``` 2. 对于类似**编解码**这样的 ChannelHandler,他**存在先后顺序**,例如 `MessageToMessageDecoder`,在这之前往往**需要有 `ByteToMessageDecoder` 将 ByteBuf 解码为对象**,**然后对对象做二次解码得到最终的 POJO 对象**,ChannelPipeline**支持使用 `addBefore()` 和 `addAfter()` 在指定位置添加或者删除拦截器**。 #### 2.1.4 主要特性 1. ChannelPipeline**支持运行态动态的添加或者删除 ChannelHandler**,在某些场景下这个特性非常实用,例如当业务高峰期需要对系统做拥塞保护时,就可以根据当前的系统时间进行判断,如果处于业务高峰期,则动态地将系统拥塞保护 ChannelHandler 添加到当前的 ChannelPipeline 中,当高峰期过去之后,就可以动态删除拥塞保护 ChannelHandler 了。 2. **ChannelPipeline 是线程安全的**,这意味着 $N$**个业务线程可以并发地操作 ChannelPipeline 而不存在多线程并发问题**,但是**ChannelHandler 不是线程安全的**,这意味着**尽管 ChannelPipeline 是线程安全的**,但是**用户仍然需要自己保证 ChannelHandler 的线程安全**。 ## 参考文献 1. 《Netty 权威指南 第 2 版》 2. [透传](https://baike.baidu.com/item/%E9%80%8F%E4%BC%A0/5630676)。
ricear
2021年8月24日 16:35
©
BY-NC-ND(4.0)
转发文档
收藏文档
上一篇
下一篇
手机扫码
复制链接
手机扫一扫转发分享
复制链接
Markdown文件
分享
链接
类型
密码
更新密码