Netty In Action(四)

引言

    网络应用程序的主要功能是传输数据,而数据传输的方式有多种,我们在实际应用中根据具体的需求来选择合适的方式。使用过Java网络库的人都知道,如果我们想将阻塞应用转变为非阻塞应用会非常困难,因为他们用的是两套完全不同的API。

    而Netty则提供了一套统一的网络API,如果我们想要改变网络传输方式,我们不需要修改大量的代码。这使得我们的工作效率大大提升。

案例研究:改变传输方式

    为了说明数据时如何传输的,我们写一个简单的服务器程序,这个程序接受新的连接并向客户写一个“Hi”字符串。

不使用Netty的IO/NIO

    首先我们先看下不使用Netty的阻塞IO程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class PlainOioServer {
public void serve(int port) throws IOException {
final ServerSocket socket = new ServerSocket(port); //1
try {
while (true) {
final Socket clientSocket = socket.accept(); //2
System.out.println("Accepted connection from " +
clientSocket);
new Thread(new Runnable() { //3
@Override
public void run() {
OutputStream out;
try {
out = clientSocket.getOutputStream();
out.write("Hi!\r\n".getBytes(Charset.forName("UTF-8"))); //4
out.flush();
clientSocket.close(); //5
} catch (IOException e) {
e.printStackTrace();
try {
clientSocket.close();
} catch (IOException ex) {
// ignore on close
}
} }
}).start(); //6
}
} catch (IOException e) {
e.printStackTrace();
} }
}
  1. 绑定服务器的端口
  2. 接受新的连接
  3. 创建新的线程处理连接
  4. 向客户端写消息
  5. 写完成后关闭连接
  6. 启动线程

    这段程序可以很好的工作,但一旦我们发现阻塞IO扩展性比较差而不能满足我们的需求时,我们会发现将其修改为异步程序是多么的困难,因为两套API是完全不同的。我们只有另写一套,如下代码所示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
public class PlainNioServer {
public void serve(int port) throws IOException {
System.out.println("Listening for connections on port " + port);
ServerSocketChannel serverChannel;
Selector selector;
serverChannel = ServerSocketChannel.open();
ServerSocket ss = serverChannel.socket();
InetSocketAddress address = new InetSocketAddress(port);
ss.bind(address); //1
serverChannel.configureBlocking(false);
selector = Selector.open(); //2
serverChannel.register(selector, SelectionKey.OP_ACCEPT); //3
final ByteBuffer msg = ByteBuffer.wrap("Hi!\r\n".getBytes());
while (true) {
try {
selector.select(); //4
} catch (IOException ex) {
ex.printStackTrace();
// handle in a proper way
break;
}
Set<SelectedKey> readyKeys = selector.selectedKeys(); //5
Iterator<SelectedKey> iterator = readyKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
try {
if (key.isAcceptable()) { //6
ServerSocketChannel server = (ServerSocketChannel)
key.channel();
SocketChannel client = server.accept();
System.out.println("Accept connection from " + client);
client.configureBlocking(false);
client.register(selector, SelectionKey.OP_WRITE |
SelectionKey.OP_READ, msg.duplicate()); //7
}
if (key.isWritable()) { //8
SocketChannel client = (SocketChannel) key.channel();
ByteBuffer buffer = (ByteBuffer) key.attachment();
while (buffer.hasRemaining()) {
if (client.write(buffer) == 0) { //9
break;
}
}
client.close(); //10
}
} catch (IOException ex) {
key.cancel();
try {
key.channel().close();
} catch (IOException cex) {
}
}
}
}
}
}
  1. 绑定服务器端口
  2. 打开Selector处理Channel
  3. 将ServerChannel注册到Selector上,并监听新连接到来事件
  4. 阻塞等待可执行的事件
  5. 获取所有可执行事件的实例
  6. 检测该事件是否是新连接事件
  7. 接受新的连接并将其注册到Selector上,监听该连接的读写事件
  8. 检测该事件是否是可写事件
  9. 向客户端写数据。当网络饱和的情况下一次可能写不完,当网络可写时将剩余的数据写入
  10. 关闭连接

    如上所示,新的程序和之前的完全不同,尽管他们的功能是一样的。下面我们看下使用Netty来转换传输方式。

使用Netty的IO/NIO

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class NettyOioServer {
public void server(int port) throws Exception {
final ByteBuf buf = Unpooled.unreleasableBuffer(
Unpooled.copiedBuffer("Hi!\r\n", Charset.forName("UTF-8")));
EventLoopGroup group = new OioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap(); //1
b.group(group) //2
.channel(OioServerSocketChannel.class)
.localAddress(new InetSocketAddress(port))
.childHandler(new ChannelInitializer<SocketChannel>() { //3
@Override
public void initChannel(SocketChannel ch)
throws Exception {
ch.pipeline().addLast(
new ChannelInboundHandlerAdapter() { //4
@Override
public void channelActive(
ChannelHandlerContext ctx) throws Exception {
ctx.write(buf.duplicate())
.addListener(ChannelFutureListener.CLOSE); //5
}
});
}
});
ChannelFuture f = b.bind().sync(); //6
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully().sync(); //7
}
}
}
  1. 创建ServerBootstrap
  2. 使用OioEventLoopGroup阻塞模式
  3. 指定ChannelInitializer
  4. 向Pipline中新增handler用来处理新的连接
  5. 向客户端写数据,并增加Listener监听器,一旦数据写完就关闭连接
  6. 绑定Server开始接受连接
  7. 释放所有资源

    如果我们想要将其修改为非阻塞(NIO)网络应用,我们只需要修改少部分代码即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class NettyNioServer {
public void server(int port) throws Exception {
final ByteBuf buf = Unpooled.unreleasableBuffer(
Unpooled.copiedBuffer("Hi!\r\n", Charset.forName("UTF-8")));
EventLoopGroup group = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap(); //1
b.group(group) //2
.channel(NioServerSocketChannel.class)
.localAddress(new InetSocketAddress(port))
.childHandler(new ChannelInitializer<SocketChannel>() { //3
@Override
public void initChannel(SocketChannel ch)
throws Exception {
ch.pipeline().addLast(
new ChannelInboundHandlerAdapter() { //4
@Override
public void channelActive(
ChannelHandlerContext ctx) throws Exception {
ctx.write(buf.duplicate())
.addListener(ChannelFutureListener.CLOSE); //5
}
});
}
});
ChannelFuture f = b.bind().sync(); //6
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully().sync(); //7
}
}
}

    我们只需要将OioEventLoopGroup修改为NioEventLoopGroup并将OioServerSocketChannel修改为NioServerSocketChannel即可。

传输API

    传输API的核心是Channel接口,所有的输出流操作都用到了传输API,如下图所示Channel接口的层次接口图:

Channel的层次结构图

    如上图所示,每个Channel都会分配一个ChannelPipeline和ChannelConfig。ChannelConfig负责存储配置信息并允许在运行期间修改。传输有特殊的配置而仅仅实现了ChannelConfig接口。ChannelPipeline则持有所有用于处理输入(inbound)和输出(outboud)流的ChannelHandler。这些Handler使我们能够响应网络状态变化或者处理数据。

到目前为止,我们可以用ChannelHandler处理以下事情:

  • 将数据从一种类型转换为另一种类型
  • 异常提醒
  • Channel激活/暂停提醒
  • Channel注册/注销EventLoop提醒
  • 用户特性事件提醒

    这些ChannelHandler在ChannelPipeline中像链条一样一个一个执行。如果我们使用过Servlet,我们会发现他们很相似。

ChannelPipeline

ChannelPipeline使用了拦截过滤器模式,这意味着你可以加入不同的CHannelHandler并且拦截ChannelPipeline中的数据或事件。这个和Unix中的管道(pipes,pipes可以将不同的命令串起来)很相似

    我们可以在程序运行期间修改ChannelPipeline,可以在任何需要时刻向其添加/移除ChannelHandler。这样我们就运用Netty构造高度灵活的应用程序。

    除了可以获得ChannelPipeline和ChannelConfig外,Channel本身还提供了很多方法,如下表列出了一些重要的方法。

方法名 描述
eventLoop() 返回该Channel注册的EventLoop
popline() 返回该Channel所在的ChannelPipeline
isActive() 判断该CHannel是否激活,也就是是否连接到另一端
localAddress() 返回本地绑定的SocketAddress
remoteAddress() 返回远端绑定的SocketAddress
write() 向另一端写数据。写的数据通过ChannelPipeline传输

    为了向远端写数据,我们调用Channel的write()方法,如下面代码所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
Channel channel = ...
ByteBuf buf = Unpooled.copiedBuffer("your data", CharsetUtil.UTF_8); //1
ChannelFuture cf = channel.write(buf); //2
cf.addListener(new ChannelFutureListener() { //3
@Override
public void operationComplete(ChannelFuture future) {
if (future.isSuccess()) { //4
System.out.println(“Write successful“);
} else {
System.err.println(“Write error“); //5
future.cause().printStacktrace();
} }
});
  1. 创建ByteBuf用来存储需要写的数据
  2. 写数据
  3. 添加ChannelFutureListener监听数据是否写完
  4. 正常写完数据
  5. 写数据异常

    需要注意的是Channel是线程安全的,也就是说其所有操作都是线程安全的。这样我们就可以存储一个Channel的引用然后在任何时刻都可以通过他向远端写数据。下面是Netty在多线程下写数据的代码示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
final Channel channel = ...
final ByteBuf buf = Unpooled.copiedBuffer(„your data“,
CharsetUtil.UTF_8); //1
Runnable writer = new Runnable() { //2
@Override
public void run() {
channel.write(buf.duplicate());
}
};
Executor executor = Executors.newChachedThreadPool(); //3
// write in one thread
executor.execute(writer); //4
// write in another thread
executor.execute(writer); //5
...
  1. 创建ByteBuf用来存储需要写的数据
  2. 创建向Channel写数据的Runnable
  3. 获取线程执行器Executor
  4. 执行Runnable任务
  5. 在另一个线程中执行Runnable任务

Channel可以保证按照我们传数据的顺序来写数据。

Netty包含的Transport

    Netty自带了一些我们可以使用的传输协议,尽管不全,但也足够我们使用了。如下表所示为Netty中包含的传输

名称 所在包 描述
NIO io.netty.channel.socket.nio 以java.nio.channels包为基础,同时使用了选择器方法(selector)
OIO io.netty.channel.socket.oio 以java.net包为基础,使用了阻塞IO流
Local io.netty.channel.local 在虚拟机之间通信
Embedded io.netty.channel.embedded 专门供测试ChannelHandler使用,可以在无网状态下运行ChannelHandler

NIO-非阻塞IO

    NIO传输是目前使用最多的。它基于Selector模式提供了完全异步的IO操作。用户可以通过注册来立即获取chanel状态变化。Channel变化可以分为以下几种

  • 一个新的Channel准备好被接收(新的连接到来)
  • Channel建立连接成功
  • Channel接受到新的数据并且可读
  • CHannel可发送数据

    程序会对这些状态变更的事件做出相应的处理然后重置他们以便下次变更到来时重新相应。这里会有一个线程对Channel进行检测其状态是否变更,如果变更了就将其归类。

SelectionKey中定义的可订阅事件如下:

  • OP_ACCEPT:提醒有新连接到来,建立Channel
  • OP_CONNECT:提醒连接完成
  • OP_READ:Channel中数据可读
  • OP_WRITE:提醒Channel中可写数据。大部分情况下没问题,但是有些系统会出现网络堵塞问题,当远端消费数据的速度小于我们写数据的速度时会出现这种问题

    Netty的NIO传输内部使用了这种模型来接受和发送数据,外部封装成统一的API给用户使用,掩盖了内部所有的实现细节。下图为Netty内部流程图。

Selector逻辑流程图

  1. 新建的Channel注册到Selector上
  2. 选择处理器的变化通知
  3. 已经注册过的Channel
  4. Selector.select()方法会一直堵塞直到有Channel状态变化或者超时
  5. 检测是否有状态变化
  6. 处理所有发生状态变化的Channel
  7. 在同一个线程中处理其他任务

    这种模式的传输方式在处理任务的时候比起来阻塞传输(OIO)传输多少会有点延迟,这是由于Selector工作模式引起的,因为其收到状态变更的消息通知需要一定得时间。但这仅仅是毫秒级的,我们可以通过增加网络宽带来减少延迟。

    目前NIO传输适用的一个重要功能是“零文件拷贝”(zero-file-copy)。这个特性可以让我们快速有效地将文件系统的数据传输到网络栈中而不需要将数据先从内存空间拷贝到用户空间。

    但是并不是所有的操作系统都支持这种特性,另外,这种特性不适用于传输加密/压缩的数据,要传输这样的数据必须先将其传输到用户空间进行加工处理。所以只有传输原始数据才能使用这种特性。Ftp或者Http服务器下载大文件时可以使用这个特性。

OIO-Old blocking IO

    在传统的Java net中,我们为每一个socket分配一个线程处理任务。或者多个socket共享一个线程,但是当一个socket堵塞时间比较长时可能导致该线程共享的所有其他socket堵塞。

    Netty的NIO和OIO使用的是同一套API。之所以能够实现一套统一的API,Netty利用了socket上的SO_TIMEOUT属性,如果socket上的操作在一定得时间内没有完成,将会跑出SocketTimeoutException,Netty捕获该异常然后继续执行工作。然后再下一次EventLoop中,继续尝试执行。这是唯一有效的办法,但问题是抛出SocketTimeoutException是需要消耗资源的,因为它要执行StrackTrace等操作。

OIO逻辑流程图

  1. 线程分配socket
  2. socket连接远端服务器
  3. 读可能阻塞的socket
  4. 读操作完成
  5. 读操作完成并且获得读的数据然后处理数据
  6. 执行该socket提交的其他任务
  7. 继续循环

Local-虚拟机内通信

    Netty可以提供本机内通信的API,该API和前面提供的API相同,并且类似于NIO是异步的。Client可以通过SocketAddress连接到本机服务器(类似于连接远程服务器一样)。需要注意的是我们在使用时客户端和服务器必须同时开着。

Embedded transport

    Embedded主要用于测试ChannelHandler或者嵌入其他的ChannelHandler当做帮助类使用。

合理选择传输类型

    在实际应用中,我们需要根据实际情况选择不同的传输方式,下表是不同的协议支持的传输方式:

OIO逻辑流程图

    下面让我们总结下传输类型的使用场景

  • 低并发连接:如果并发量较低,可以先使用OIO传输模式,因为是低并发,我们不需要担心分配线程的限制,并且资源也不是问题。这时我们可能会产生什么才是低并发场景的疑问,这个很难界定。但只要记住NIO传输类型可以支持万级以及十万级的并发量。所以任何低于一千的并发量都可以称作低并发。

    这种情况下OIO运行的会很好,但有时NIO在低并发下也会很合适。当连接非常”活”跃时,OIO的上下文切换可能会比较耗时。

  • 高并发连接:当应用连接比较多时就需要选择NIO传输方式。

  • 低延迟:如果对实时性要求较高,首先使用OIO。但仍然需要权衡下,因为低延迟意味着要消耗更多的线程

  • 阻塞代码:如果你正在改造老代码,而老代码是线程阻塞的。对Netty来说,将阻塞的IO修改为非阻塞的代价较大,或许只能重写代码。因此可以先将其转换为OIO,然后在需要的时候修改为NIO

  • 本机内通信:利用Local实现本机内通信

  • 测试ChannelHandler:利用Embedded可以进行ChannelHandler测试

程序需求 推荐的传输类型
低并发 OIO
高并发 NIO
低延迟 OIO
阻塞代码改造 OIO
本机通信 Local
测试ChannelHandler Embedded