Netty In Action(六-ChannelHandler)

引言

  一般来说,在网络应用中接收连接发送数据还不是最重要的,最重要的应该是如何处理数据。Netty提供了ChannelHandler接口用来处理数据。同时我们可以将ChannelHandler串起来形成一个链条(filter模式)来串行处理数据,这样每一个ChannelHandler就只需要实现一部分简单的功能即可。另外我们还可以在程序运行期间对ChannelHandler进行修改。

ChannelPipeline

  ChannelPipeline是Channel中处理输入和输出数据的ChannelHandler的集合。其运用过滤器模式来控制事件的执行以及多个ChannelHandler之间的连接。

  每创建一个新的Channel都会附带创建一个ChannelPipeline并将其添加到Channel中。一旦完成,Channel和其对应的ChannelPipeline就紧密联系在一起,Channel无法添加一个新的CHannelPipeline或者将老的ChannelPipeline移除。

ChannelPipeline示意图

  如上图所示描述了ChannelPipeline中ChannelHandler处理数据的流程。如果一个流入数据流事件被触发,数据将会从ChannelPipeline的起始位置传输到末尾,如果是流出事件,数据将从末尾位置传输到起始位置。ChannelPipeline可以根据ChannelHandler的类型来判断是否需要当前ChannelHandler处理数据,如果不需要则跳到下一个。

  ChannelPipeline可以在运行期间进行修改,所以我们可以在一个ChannelPipeline中修改另一个ChannelPipeline。如下图所示为ChannelPipeline提供的方法:

img2.png ChannelPipeline提供的方法

下面代码展示了如何修改一个CHannelPipeline

1
2
3
4
5
6
7
8
ChannelPipeline pipeline = ..;
FirstHandler firstHandler = new FirstHandler(); //1
pipeline.addLast("handler1", firstHandler); //2
pipeline.addFirst("handler2", new SecondHandler()); //3
pipeline.addLast("handler3", new ThirdHandler()); //4
pipeline.remove("handler3"); //5
pipeline.remove(firstHandler); //6
pipeline.replace("handler2", "handler4", new FourthHandler()); //7
  1. 创建FirstHandler实例
  2. 将FirstChanelHandler添加到piplene中
  3. 将SecondHandler添加到Pipline中的队列首位置,其在FirstHandler之前
  4. 将ThirdHandler添加到Pipline的队尾
  5. 通过名称移除Handler
  6. 通过实例移除Handler
  7. 通过名称将SecondHandler替换成FouthHandler

一般来说被添加到ChannelPipeline中的ChannelHandler都会处理传递给它的事件,所以任何一个ChannelHandler一定不要阻塞,否则会影响整个ChannelPipeline的性能

  ChannelPipline除了提供修改ChannelHandler的接口外,还提供额外的查询ChannelPipline中是否存在该ChannelHandler的接口,如下所示:

ChannelPipeline中Get操作

  由于ChannelPipline继承了ChannelInboundInvoker和ChannelOutboundInvoker,因此ChannelPipline暴露了其接口。如下图所示为ChannelPipline暴露的ChannelInboundInvoker的接口。除了ChannelPipline,ChannelHandlerContext也继承了ChannelInboundInvoker同时也包含下列接口。

img5.png ChannelPipeline中暴露的ChannelInboundInvoker方法

下图所示为ChannelPipline暴露的ChannelOutboundInvoker的方法

ChannelPipeline中暴露的ChannelOutboundInvoker方法

ChannelHandlerContext

  每次向ChannelPipline中新增ChannelHandler时都会为ChannelHandler新建一个ChannelHandlerContext,并且ChannelHandler通过ChannelHandlerContext与其他的ChannelHandler进行通信。

  ChannelHandlerContext一旦被创建就不会再改变,因此应用可以对其进行缓存。

  ChannelHandlerContext也实现了ChannelInboundInvoker和ChannelOutboundInvoker接口,因此它和ChannelPipline及Channel有许多相同的方法。不同的是在ChannelPipline或者Channel上调用方法数据会贯穿整个ChannelPipline,而在ChannelHandlerContext上调用则只会贯穿该ChannelHandler之后的ChannelHandler链路。

通知下一个ChannelHandler

  通过调用ChannelInboundInvoker和ChannelOutboundInvoker的方法我们可以通知最近的Handler,事件触发的位置取决于我们的设置。

如下图展示了ChannelHandlerContext和ChannelHandler以及ChannelPipline的关系

ChannelHandlerContext、ChannelHandler与ChannelPipline关系图
  1. ChannelPipline绑定的Channel
  2. ChannelPipline绑定到Chanel并持有增加到其中的ChannelHandler实例
  3. ChannelHandler是ChannelPipline的一部分
  4. 当向ChannelPipline添加ChannelHandler时创建ChannelHandlerContext

如果我们想触发一个贯穿ChannelPipline的事件流,我们有两种不同的方法可以达成

  • 调用Channel的方法
  • 调用ChannelPipline的方法

如下代码所示,分别通过Channel和ChannelPipline触发一个写操作

1
2
3
4
ChannelHandlerContext ctx = ..;
Channel channel = ctx.channel(); //A
channel.write(Unpooled.copiedBuffer("Netty in Action",
CharsetUtil.UTF_8)); //B
1
2
3
4
ChannelHandlerContext ctx = ..;
ChannelPipeline pipeline = ctx.pipeline(); //A
pipeline.write(Unpooled.copiedBuffer("Netty in Action",
CharsetUtil.UTF_8)); //B

如下图所示为通知事件在ChannelPipline中传输过程

通知事件在ChannelPipline中传输过程
  1. 事件传输到第一个ChannelHandler
  2. 通过赋值给ChannelHandlerContext,ChannelPipline将事件传输给下一个ChannelHandler
  3. 同上

  有时候我们想将事件从一个特定的位置进行触发然后传输并屏蔽该位置之前的Handler,那么我们可以通过ChannelHandlerContext来完成。

1
2
ChannelHandlerContext ctx = ..; //1
ctx.write(Unpooled.copiedBuffer("Netty in Action", CharsetUtil.UTF_8)); //2
  1. 获取ChannelHandlerContext引用
  2. 触发写事件

使用上述代码,事件将从ChannelHandlerContext指定的下一个ChannelHandler开始传输,如下示意图

通知事件在ChannelPipline中传输过程
  1. 通过ChannelHandlerContext将事件传输给特定的ChannelHandler
  2. 事件传输
  3. Handler执行完毕后数据输出

修改ChannelPipline

  我们可以通过调用ChannelHandler的pipline()方法获取ChannelPipline引用。然后我们就可以对其中的ChannelHandler进行操作。

我们可以将ChannelHandlerContext存储起来,在其他方法或者线程中来发送数据

如下代码所示:

1
2
3
4
5
6
7
8
9
10
public class WriteHandler extends ChannelHandlerAdapter {
private ChannelHandlerContext ctx;
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
this.ctx = ctx; //1
}
public void send(String msg) { //2
ctx.write(msg);
}
}
  1. 存储ChannelHandlerContext引用,方便后边使用
  2. 通过之前存储的ChannelHandlerContext发送数据

  需要注意的是,如果ChannelHandler实例被@Sharable注解,那么它可以被添加到多个ChannelPipline中。这就意味着这个ChannelHandler拥有多个ChannelHandlerContext,也就是说单一实例的ChannelHandler可以绑定不同的ChannelHandlerContext被调用。因此,当我们想要使用@Sharable注解一个ChannelHandler时,我们必须要保证该Handler是线程安全的。

状态模型

  Netty有一套简单的状态模型和ChannelInboundHandler的方法一一对应。如下图所示四中不同的状态:

img11.png channel的周期状态

Channel在生命周期中会处于不同的状态,如下图所示:

状态转换模型

  在一些其他情况下,我们可能发现其他的状态之间的转换,这是因为用户可以手动从EventLoop中注销Channel然后再次注册该Channel。在这种情况下,我们可能会见到多个channelRegistered和channelUnregistered的状态转换,但是channelActive和channelInactive状态变换只会出现一次,因为一个channel只能存活于一个连接生命周期然后就会被回收,如果要重新连接,则需要重新创建一个Channle。

状态转换模型

常用ChannelHandler及其类型

  通过使用ChannelHandler,Netty支持拦截操作以及状态变更通知,这样我们能够更加方便简单地写可重复运用的业务逻辑。

Netty支持两种类型的ChannelHandler,如下图所示:

ChannelHandler类型

ChannelHandler——所有ChannelHandler的父类

  Netty的handler类型相对来说定义明确且层次接口清晰。所有Handler的父类都是ChannelHandler。当ChannelHandler被添加或者从ChannelPipline中移除时,其提供相应的回调函数。

ChannelHandler提供回调函数

  Netty为ChannelHandler实现了一个适配器类ChannelHandlerAdapter,我们在使用时只需要复写我们感兴趣的方法即可,其最基本的作用是将事件传递给ChannelPipline中的下一个ChannelHandler。

Inbound handlers

ChannelInboundHandler提供Channel状态变化或者接受到数据时调用的方法,如下所示为其提供的方法

img17.png ChannelInboundHandler提供的回调函数

需要特殊说明的是上面所有的方法都是ChannelInboundInvoker方法的一个副本。

  需要特别注意的是,ChannelInboundHandler用于接受数据,所以需要复写channelRead(…)方法,而该方法需要手动释放资源。这一点很重要,因为Netty使用池缓存ByteBuf对象,如果我们忘记释放资源就会造成资源浪费甚至资源枯竭。如下代码所示:

1
2
3
4
5
6
7
8
@Sharable
public class DiscardHandler extends ChannelInboundHandlerAdapter { //1
@Override
public void channelRead(ChannelHandlerContext ctx,
Object msg) {
ReferenceCountUtil.release(msg); //2
}
}
  1. 继承ChannelInboundHandlerAdapter
  2. 释放资源

  忘记释放资源会造成资源枯竭,幸运的而是Netty会在忘记释放资源的地方打印Warn日志,这让我们比较容易发现在哪边释放忘记释放资源。

同时Netty为我们提供了SimpleChannelInboundHandler类,该类帮助我们自动释放资源,因此我们一定不能缓存资源供后续使用,如下代码所示:

1
2
3
4
5
6
7
8
@Sharable
public class SimpleDiscardHandler extends SimpleChannelInboundHandler<Object> { //1
@Override
public void channelRead(ChannelHandlerContext ctx,
Object msg) {
// No need to do anything special //2
}
}
  1. 继承SimpleChannelInboundHandler
  2. 不需要手动释放资源

如果我们队其他状态改变感兴趣,可以复写其他函数。

Outbound handlers

ChannelOutboundHandler与ChannelInboundHandler类似,如下所示为其提供的方法:

img19.png ChannelOutboundHandler提供的回调函数

  类似于ChannelInboundHandler,Netty也提供了ChannelOutboundHandlerAdapter适配器类,我们可以只实现自己感兴趣的方法。如果继承ChannelOutboundHandlerAdapter,在我们使用时仍然需要手动释放资源.

1
2
3
4
5
6
7
8
9
10
11
@Sharable
public class DiscardOutboundHandler
extends ChannelOutboundHandlerAdapter { //1
@Override
public void write(ChannelHandlerContext ctx,
Object msg, ChannelPromise promise) {
ReferenceCountUtil.release(msg); //2
promise.setSuccess(); //3
}
}
  1. 继承自ChannelOutboundHandlerAdapter
  2. 释放资源
  3. 通知ChannelPromise

  需要注意的是我们需要手动释放资源并且通知ChannelPromise,如果ChannelPromise没有被通知那么有可能与之对应的ChannelFutureListener不会被触发。

当消息被消费并且不会再传递到下一个ChannelOutboundHandler时,我们需要手动释放资源。一旦Channel被关闭或者数据传输到真正的传输层(网络模型),资源会被自动释放