Netty
1、概述
2、线程模型
3、核心组件
3.1 Channel
3.2 ChannelHandler和ChannelPipeline
3.3 EventLoop和EventLoopGroup
3.4 Future和Promise
4、创建过程
4.1 服务端创建过程
4.2 客户端创建过程
5、TCP粘包和拆包
6、序列化与反序列化
-
+
tourist
register
Sign in
Future和Promise
## 1 Future ### 1.1 功能介绍 1. 由于 Netty 的 Future 都是**与异步 I/O 操作相关**的,因此命名为**ChannelFuture**,代表他**与 Channel 操作相关**。 2. ChannelFuture 主要**为了解决在异步 I/O 操作中调用者无法获取异步操作的结果而专门设计的**,**有两种状态**,**分别为 `uncompleted` 和 `completed`**: 1. 当**开始一个 I/O 操作**时,**一个新的 ChannelFuture 被创建**,此时他**处于**`uncompleted`**状态**,非失败、非成功、非取消,因为**I/O 操作还没有完成**。 2. **一旦 I/O 操作完成**,**ChannelFuture 将会被设置成 `completed`**,他的结果有三种可能,分比为操作成功、操作失败、操作被取消。 3. ChannelFuture**可以同时增加一个或者多个 GenericFutureListener**,**也可以用 `remove()` 方法删除 GenericFutureListener**,**当 I/O 操作完成之后**,**I/O 线程将会回调 ChannelFuture 中 GenericFutureListener 的 `operationComplete()` 方法**,并**把 ChannelFuture 当做方法的入参**,**如果用户需要做上下文相关的操作**,**需要将上下文信息保存到对应的 ChannelFuture 中**。 4. 需要注意的是,**不要在 ChannelHandler 中调用 ChannelFuture 的 `await()` 方法**,这**会导致死锁**,因为**发起 I/O 操作之后**,**由 I/O 线程负责异步通知发起 I/O 操作的用户线程**,如果**I/O 线程和用户线程是同一个线程**,就**会导致 I/O 线程等待自己通知操作完成**,这**就导致了死锁**。 ```java /** * BAD - NEVER DO THIS */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ChannelFuture channelFuture = ctx.channel().close(); channelFuture.awaitUninterruptibly(); // Perform post-closure operation // ... } /** * GOOD */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ChannelFuture channelFuture = ctx.channel().close(); channelFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { // Perform post-closure operation // ... } }) } ``` ## 2 Promise ### 2.1 功能介绍 1. Promise 是**可写的 Future**,Future 自身并没有写操作相关的接口,Netty**通过 Promise 对 Future 进行扩展**,**用于设置 I/O 操作的结果**: ```java /** * Marks this future as a success and notifies all * listeners. * * If it is success or failed already it will throw an {@link IllegalStateException}. */ Promise<V> setSuccess(V result); /** * Marks this future as a success and notifies all * listeners. * * @return {@code true} if and only if successfully marked this future as * a success. Otherwise {@code false} because this future is * already marked as either a success or a failure. */ boolean trySuccess(V result); /** * Marks this future as a failure and notifies all * listeners. * * If it is success or failed already it will throw an {@link IllegalStateException}. */ Promise<V> setFailure(Throwable cause); /** * Marks this future as a failure and notifies all * listeners. * * @return {@code true} if and only if successfully marked this future as * a failure. Otherwise {@code false} because this future is * already marked as either a success or a failure. */ boolean tryFailure(Throwable cause); ``` 2. Netty**发起 I/O 操作的时候**,**会创建一个新的 Promise 对象**,例如调用 ChannelHandlerCntext 的 `write()` 方法时,会创建一个新的 ChannelPromise: ```java @Override public ChannelFuture write(Object msg) { return write(msg, newPromise()); } ``` **当 I/O 操作发生异常或者完成时**,**设置 Promise 的结果**: ```java @Override public ChannelFuture write(Object msg, ChannelPromise promise) { Channel channel = channel(); if (!channel.isActive()) { // Mark the write request as failure if the channel is inactive. if (channel.isOpen()) { promise.tryFailure(NOT_YET_CONNECTED_EXCEPTION); } else { promise.tryFailure(CLOSED_CHANNEL_EXCEPTION); } // Release message now to prevent resource-leak. ReferenceCountUtil.release(msg); } else { outBoundBuffer.addMessage(msg, promise); } } ``` ## 参考文献 1. 《Netty 权威指南 第 2 版》
ricear
Aug. 25, 2021, 12:19 p.m.
©
BY-NC-ND(4.0)
转发文档
Collection documents
Last
Next
手机扫码
Copy link
手机扫一扫转发分享
Copy link
Markdown文件
share
link
type
password
Update password