Netty in Action(一)

netty是什么

    Nettt是一个基于Java NIO的client-server网络服务框架,人们可以利用netty快速地开发网络应用。同时netty相对于其他网络框架更加简单并且扩展性更强,这主要得益于其提供的简单易用的api将业务逻辑和网络处理代码解耦开来。能够使你更加专注于业务的实现而不需要太多关心网络底层实现。

异步设计

    netty所有的api都是异步的。异步处理已经不是什么新鲜事了,众所周知,IO已经变为一个应用的瓶颈,而异步处理正是为了解决这个问题出现的。

CallBacks机制

    CallBacks机制经常应用于异步处理,人们可以指定方法执行完后的回调函数,在JavaScript中,回调机制是其语言的核心。下面代码展示了如何利用回调机制处理接受数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public interface Fetcher {
void fetchData(FetchCallback callback);
}
public interface FetchCallback {
void onData(Data data);
void onError(Throwable cause);
}
public class Worker {
public void doWork() {
Fetcher fetcher = ...
fetcher.fetchData(new FetchCallback() {
@Override
public void onData(Data data) { #1
System.out.println("Data received: " + data);
}
@Override
public void onError(Throwable cause) { #2
System.err.println("An error accour: " + cause.getMessage());
}
});
... }
}

#1 没有出现错误,调用onData

#2 出现错误信息,调用onError

    你可以将回调函数从当前线程移植到其他线程,但是并不能保证回调函数被执行。当你将多个异步回调函数串起来的时候会形成spaghetti code(管式代码),有些人认为这样的代码很难读,但JavaScript以及Node.js都是这种风格。

Futures机制

    异步处理使用的第二个机制是Future机制。一个Future对象只有在特定情况下才会有值,Future对象要么是调用者的返回结果,要么是一个异常。Java在java.util.concurrent包中提供了供其线程池机制使用的Future接口,例如当你使用ExecutorService.submit()提交一个Runable任务时,就可以返回一个Future对象,利用Future对象可以判断该任务是否完成。如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
ExecutorService executor = Executors.newCachedThreadPool();
Runnable task1 = new Runnable() {
@Override
public void run() {
doSomeHeavyWork();
}
... }
Callable<Interger> task2 = new Callable() {
@Override
public Integer call() {
return doSomeHeavyWorkWithResul();
}
... }
Future<?> future1 = executor.submit(task1);
Future<Integer> future2 = executor.submit(task2);
while (!future1.isDone() || !future2.isDone()) {
...
// do something else
...
}

    CallBacks和Future是异步处理中最常用的两种机制,实际上无法判断两种机制的优劣,而Netty则会两种都提供,你可以自由选择使用哪种机制。

JVM中的阻塞与非阻塞比较

    随着web应用的持续增长,如何提升网络应用的效率变得尤为重要。幸运的是从1.4版本开始,java提供了NIO API来供我们编写更有效率的网络应用。Java 7中又引入的NIO.2不仅仅是之前api的升级,同时也允许我们更加高效方便地编写异步代码。

New or non-blocking?

The N in NIO is typically thought to mean “non-blocking” rather than “new.”NIO has beenaround for so long now that nobody calls it “new” IO anymore. Most people refer to it as “non-blocking” IO—

阻塞IO

    上图所示为典型的阻塞IO模式,一个线程处理一个网络连接,因此应用能够处理连接的个数是由JVM上允许建立的线程个数决定的。

非阻塞IO
    再来看下非阻塞IO模式,上图运用selector机制来处理多个连接。下面通过一个回显服务器示例来讲解非阻塞及阻塞IO的区别。

阻塞IO

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class PlainEchoServer {
public void serve(int port) throws IOException {
final ServerSocket socket = new ServerSocket(port); #1
try {
while (true) {
final Socket clientSocket = socket.accept(); #2
System.out.println("Accepted connection from " +
clientSocket);
new Thread(new Runnable() { #3
@Override
public void run() {
try {
BufferedReader reader = new BufferedReader(
new
InputStreamReader(clientSocket.getInputStream()));
PrintWriter writer = new PrintWriter(clientSocket
.getOutputStream(), true);
while (true) { #4
writer.println(reader.readLine());
writer.flush();
}
} catch (IOException e) {
e.printStackTrace();
try {
clientSocket.close();
} catch (IOException ex) {
// ignore on close
}
}
}
}).start(); #5
}
} catch (IOException e) {
e.printStackTrace();
}
}
}

# 1 绑定监听端口

# 2 阻塞至有新连接进来

# 3 新建线程用来处理客户端连接

# 4 从客户端读取数据并回写

# 5 启动线程

    上述服务器代码要求每次连接进来一个请求就需要创建一个新的线程,即使使用线程池也仅能解决一时问题,不能再根本上解决问题:客户端的连接数取决于后台处理线程的个数。当连接数多时则会带来大问题。

非阻塞IO

    在介绍NIO之前,我们先了解一些NIO的基本知识

BYTEBUFFER

    ByteBuffer在Netty中即为重要,其主要是用来缓存数据的。ByteBuffer既可以分配到堆内存中也可以分配到堆外内存。一般来说,堆外内存能够更加快速地传递给channel,但分配和释放会更耗时。新旧的NIO API对ByteBuffer提供了统一的管理。ByteBuffer能够实现无拷贝地在各个实例之间共享,同时允许对可见数据进行切片和其他操作处理。

Slicing

Slicing a ByteBuffer allows to create a new ByteBuffer that share the same data as the intialByteBuffer but only expose a sub-region of it. This is useful to minimize memory copies whilestill only allow access to a part of the data

ByteBuffer有以下几个重要的操作

  • 将数据写进ByteBuffer
  • 调用ByteBuffer.flip()切换到读模式
  • 从ByteBuffer中读取数据
  • 调用ByteBuffer.clear()或者ByteBuffer.compact()来整理ByteBuffer内存

    当往ByteBuffer中写数据时,ByteBuffer会通过更新buffer中write index的位置来跟踪buffer中的数据(也可以手动更新)。当需要从ByteBuffer中读取数据时,需要调用flip()来切换到读模式,flip()会将buffer的读起始位置设置为0,这样就可以读取buffer中所有数据了。

    为了能够再次向ByteBuffer中写数据,可以将buffer模式切换到写模式并调用任意下列两个方法。

  • ByteBuffer.clear():清除ByteBuffer
  • ByteBuffer.compact():通过内存拷贝清除已经读过的数据

    ByteBuffer.compact()会将所有未读的数据拷贝到buffer的起始位置。如下所示为ByteBuffer的使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Channel inChannel = ....;
ByteBuffer buf = ByteBuffer.allocate(48);
int bytesRead = -1;
do {
bytesRead = inChannel.read(buf); #1
if (bytesRead != -1) {
buf.flip(); #2
while(buf.hasRemaining()){
System.out.print((char) buf.get()); #3
}
buf.clear(); #4
}
} while (bytesRead != -1);
inChannel.close();

#1 从channel中读取数据到ByteBuffer

#2 切换模式至读模式

#3 读取buffer中的数据,每次调用一个get()会将buffer当前位置更新+1

#4 切换buffer至写模式,使其可以重新写

使用Selector模式

    Selector可以监听多个IO是否可以读/写,这样一个Selector就可以用来处理多个连接,相比于阻塞IO每个连接占用一个线程,Selector模式更加高效。

通过以下几个操作就可以轻松运用Selector

  1. 在channels上创建一个或多个Selector
  2. 在channel上注册需要监听的事件,目前支持四种事件
    • OP_ACCEPT:socket-accept事件
    • OP_CONNECT:socket-connect事件
    • OP_READ:可读事件
    • OP_WRITE:可写事件
  3. channel注册后,调用Selector.select()方法阻塞直到上述注册的一个事件发生
  4. 当Selector.select()返回时,可以通过SelectionKey实例获取所有可操作的事件

    下面EchoServer是基于非阻塞Selector的服务器代码,运用这个版本的Server可以运用一个线程处理上千个连接。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public class PlainNioEchoServer {
public void serve(int port) throws IOException {
System.out.println("Listening for connections on port " + port);
ServerSocketChannel serverChannel = ServerSocketChannel.open();
ServerSocket ss = serverChannel.socket();
InetSocketAddress address = new InetSocketAddress(port);
ss.bind(address); #1
serverChannel.configureBlocking(false);
Selector selector = Selector.open();
serverChannel.register(selector, SelectionKey.OP_ACCEPT); #2
while (true) {
try {
selector.select(); #3
} catch (IOException ex) {
ex.printStackTrace();
// handle in a proper way
break;
}
Set readyKeys = selector.selectedKeys(); #4
Iterator iterator = readyKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = (SelectionKey) iterator.next();
iterator.remove(); #5
try {
if (key.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel)key.channel();
SocketChannel client = server.accept(); #6
System.out.println("Accepted connection from" + client);
client.configureBlocking(false);
client.register(selector, SelectionKey.OP_WRITE | SelectionKey.OP_READ, ByteBuffer.allocate(100)); #7
}
if (key.isReadable()) { #8
SocketChannel client = (SocketChannel) key.channel();
ByteBuffer output = (ByteBuffer) key.attachment();
client.read(output); #9
}
if (key.isWritable()) { #10
SocketChannel client = (SocketChannel) key.channel();
ByteBuffer output = (ByteBuffer) key.attachment();
output.flip();
client.write(output); #11
output.compact();
}
} catch (IOException ex) {
key.cancel();
try {
key.channel().close();
} catch (IOException cex) {
}
}
}
}
}
}

#1 绑定Server的port

#2 注册channel的OP_ACCEPT Selector事件,监听新连接

#3 阻塞直到有新的连接事件到来

#4 获取所有可操作的SelectedKey实例

#5 遍历SelectedKey实例,将遍历过的去除

#6 获取新的连接

#7 将新的连接注册到Selector中,并监听读/写事件

#8 检查SelectKey是否可读

#9 读数据

#10 检测是否可写

#11 写数据

    上述代码实现起来比较繁琐,新的NIO API去掉了大部分繁琐的过程,使实现起来更加简单明了

基于NIO.2的EchoServer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
public class PlainNio2EchoServer {
public void serve(int port) throws IOException {
System.out.println("Listening for connections on port " + port);
final AsynchronousServerSocketChannel serverChannel =
AsynchronousServerSocketChannel.open();
InetSocketAddress address = new InetSocketAddress(port);
serverChannel.bind(address); #1
final CountDownLatch latch = new CountDownLatch(1);
serverChannel.accept(null, new
CompletionHandler<AsynchronousSocketChannel, Object>() { #2
@Override
public void completed(final AsynchronousSocketChannel channel, Object attachment) {
serverChannel.accept(null, this); #3
ByteBuffer buffer = ByteBuffer.allocate(100);
channel.read(buffer, buffer,
new EchoCompletionHandler(channel)); #4
@Override
public void failed (Throwable throwable, Object attachment){
try {
serverChannel.close(); #5
} catch (IOException e) {
// ingnore on close
} finally {
latch.countDown();
}
}
}); try
{
latch.await();
} catch(
InterruptedException e)
{
Thread.currentThread().interrupt();
}
}
private final class EchoCompletionHandler implements
CompletionHandler<Integer, ByteBuffer> {
private final AsynchronousSocketChannel channel;
EchoCompletionHandler(AsynchronousSocketChannel channel) {
this.channel = channel;
}
@Override
public void completed(Integer result, ByteBuffer buffer) {
buffer.flip();
channel.write(buffer, buffer, new CompletionHandler<Integer, #6
ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer buffer) {
if (buffer.hasRemaining()) {
channel.write(buffer, buffer, this); #7
} else {
buffer.compact();
channel.read(buffer, buffer,
EchoCompletionHandler.this); #8
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
channel.close();
} catch (IOException e) {
// ingnore on close
}
}
});
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
channel.close();
} catch (IOException e) {
// ingnore on close
}
}
}
}

#1 绑定Server的port

#2 监听新连接到来,一旦有新的连接接入则会调用CompletionHandler

#3 重新监听连接接入事件

#4 在channel上触发读操作,一单有数据可读EchoCompletionHandler将会被触发

#5 出现错误时关闭channel

#6 注册写回调事件,通#4

#7 当buffer中还有数据时再次注册写事件

#8 同#4,注册CompletionHandler回调读事件

    上述代码看起来要比之前的更加复杂,按NIO2.0自己实现了loop事件,我们在使用的时候只需要简单地注册自己感兴趣的事件即可。

非阻塞应用存在的问题以及Netty是如何解决的

跨平台及兼容性问题

    非阻塞应用一般都会有跨平台问题,一个NIO应用在Linux上可以运行但在Window无法运行,同时还需要对低版本的兼容。NIO2.0只能在java7之后的版本运行,但它提供了一套统一的管理api,使其也能够在更低完本的jdk上运行,只不过有些功能受到了限制。

修复e-poll bug

    在Linux系统上,Java的NIO的Selector运用的是较为高效的e-poll机制,但是当连接较少时会存在一个很严重的bug导致cpu占用率很高。

1
2
3
4
5
6
7
8
9
10
11
...
while (true) {
int selected = selector.select(); #1
Set<SelectedKeys> readyKeys = selector.selectedKeys();
Iterator iterator = readyKeys.iterator(); #2
while (iterator.hasNext()) { #3
... #4
}
}
...
...

    在Linux系统中,#1处并没有阻塞,而是返回0,这样while循环会到时cpu上升到100%,即使到现在,这个问题仍然存在,不过幸运的是Netty避免了这个问题。