Netty In Action(五-Buffers)

引言

    当我们需要传输数据时,我们经常使用到Buffer。Java NIO实现了自己的Buffer类,但是功能相当有限并且效率低下。而JDK的ByteBuffer又相当笨重复杂。所以Netty实现了自己的ByteBuf来在pipeline中传输数据。

Buffer API

    Netty的buffer API包含两个接口:

  • ByteBuf
  • ByteBufHolder

    Netty通过引用计数(reference-counting)的方式来标记什么时候需要释放一个buf及其包含的资源。Netty的Buffer有以下几个优势:

  • 如果有必要,可以自定义buffer类型
  • 通过内置类型标识零拷贝
  • 不需要调用flip()即可实现读写模式切换
  • 读写index分离
  • 提供链式方法
  • 引用计数
  • 提供池(Pooling)服务

ByteBuf-字节数据容器

    ByteBuf主要用于字节缓存。ByteByf包含两个索引:读索引和写索引。这样我们可以通过调整读索引的位置来多次读取数据。

工作方式

    当我们向ByteBuf写数据时,写索引会增长写数据的字节数量。当我们读数据时,读索引也会增长。当读索引与写索引在同一个位置时说明数据已被读完。ByteBuf变为不可读状态,如果继续读就会抛出IndexOutOfBoundsException异常。

    当调用以“read”或“write”开头的方法时会增加读索引和写索引的位置。而调用“set”和“get”方法时则不会移动索引,但是会给出操作相对索引的位置。

    ByteBuf存储字节数有最大值,当超过这个值时会抛出异常,目前这个最大值时Integer.MAX_VALUE。

初始化readIndex和writeIndex置为0

ByteBuf不同类型

Netty提供三种不同类型的ByteBuf,我们可以根据需要选择不同的类型

  1. 堆内存Buffer(HEAP BUFFERS)

        堆缓存是使用最广泛的,它将数据缓存在JVM的堆内存空间。Netty通过一个隐式数组来保存数据。如果我们没用池缓存(pool)技术,那么这种类型在分配和销毁内存时是非常快的.

    1
    2
    3
    4
    5
    6
    7
    ByteBuf heapBuf = ...;
    if (heapBuf.hasArray()) { //1
    byte[] array = heapBuf.array(); //2
    int offset = heapBuf.arrayOffset() + heapBuf.position(); //3
    int length = heapBuf.readableBytes(); //4
    YourImpl.method(array, offset, length); //5
    }

    //1 检测ByteBuf是否由数组支持(是否是HEAP BUFFER)

    //2 获取数组引用

    //3 计算数组第一个字节的偏移量

    //4 计算可读的字节数

    //5 自己业务逻辑实现

        通过非堆(noneheap)ByteBuf获取数组会抛UnsupportedOperationException异常,所以在使用数组时最好通过hasArray方法先进行检测。

  2. 直接内存Buffer(DIRECT BUFFERS)

        直接内存意味着Buffer处于堆外内存。当我们需要通过网络传输数据时直接内存会很方便。实际上,当我们没有使用直接内存时,在通过socket传输数据时JVM会拷贝一份Buffer中的数据,而直接内存则免去了这个步骤。但是直接内存的缺点是分配和销毁太过昂贵,这也是Netty使用Pool的原因。直接内存的另一个缺点是我们不能通过一个Array直接访问,当我们需要访问数据时需要获取数据的一份拷贝。

    下面代码示例了如何访问直接内存:

    1
    2
    3
    4
    5
    6
    ByteBuf directBuf = ...;
    if (!directBuf.hasArray()) { //1
    int length = directBuf.readableBytes(); //2
    byte[] array = new byte[length]; //3
    directBuf.getBytes(array); //4
    YourImpl.method(array, 0, array.length); //5

    // 1判断byffer是否由数组支持(否表示直接内存)

    //2 获取可读数据长度len

    //3 分配一个容量为len的数组

    //4 将数据督导数组中

    //5 处理数据

  3. 复合Buffer(COMPOSITE BUFFER)

        复合缓冲区可以构造多个ByteBuf实例,并且提供一份组合视图。Netty提供了ByteBuf的子类CompositeByteBuf来处理复合缓冲区,CompositeByteBuf只是一个视图。CompositeByteBuf.hasArray永远返回false,因为她有可能包含堆缓存也可能包含直接内存缓存。

        例如一条信息可能由两部分组成:header和body,在一个应用中body可能相同而header不同,因此我们可以复用body而只改变header即可。

    CompositeByteBuf

        而使用JDK的ByteBuffer很难这样实现,除非我们再构建一个数组来存储header和body,如下代码所示:

    1
    2
    3
    4
    5
    6
    7
    8
    // Use an array to composite them
    ByteBuffer[] message = new ByteBuffer[] { header, body };
    // Use copy to merge both
    ByteBuffer message2 = ByteBuffer.allocate(
    header.remaining()+ body.remaining();
    message2.put(header);
    message2.put(body);
    message2.flip();

    下列代码是通过CompositeByteBuf的改进版本

    1
    2
    3
    4
    5
    6
    7
    8
    9
    CompositeByteBuf compBuf = ...;
    ByteBuf heapBuf = ...;
    ByteBuf directBuf = ...;
    compBuf.addComponent(heapBuf, directBuf); //1
    .....
    compBuf.removeComponent(0); //2
    for (ByteBuf buf: compBuf) { //3
    System.out.println(buf.toString());
    }

    //1 将heapBuf和directBuf添加到复合缓冲Buffer中

    //2 移除第一个ButeBuf(heapBuf)

    //3 循环处理复合缓冲Buffer中的ByteBuf

ByteBuf字节级别的操作

    ByteBuf提供了很多读写操作,并且比JDK的ByteBuffer更加简单易用且性能更好

随机访问索引

    ByteBuf使用以0起始的索引,这意味着ByteBuf的第一个字节索引是0最后一个字节索引是capacity-1。

1
2
3
4
5
ByteBuf buffer = ...;
for (int i = 0; i < buffer.capacity(); i ++) {
byte b = buffer.getByte(i);
System.out.println((char) b);
}

    需要注意的是根据index访问数据并不会增加读索引和写索引的值。如果有需要可以通过readIndex(index)和write(index)来增加其索引值。

顺序访问索引

    ByteBuf通过readIndex来提供顺序读数据,通过writeIndex提供顺序写数据。与jdk的ByteBuffer不同的是我们不需要调用flip()来切换读写模式。如下图所示ByteBuf通过两个指针切分成三个区域。

ByteBuf分割区域

//1 已读数据(可以被回收)

//2 未读数据

//3 可写数据

可被回收的字节

    可以被回收的字节区域表示ByteBuf已经执行读操作所以可以被回收。初始化时,这段区域大小为0,当执行读操作时这个区域大小增加。需要注意的是只有执行”read”操作时才会增加该区域大小,”get”操作不会移动readerIndex。可以通过调用discardReadBytes()函数来回收该区域。

discardReadBytes后区域图

    当调用discardReadBytes()后,可被回收的区域大小为0,readerIndex为0。因为discardReadBytes()方法会执行内存复制操作将可读的数据复制到起始位置,所以我们最好不要频繁地调用该函数。

可读字节(ByteBuf中实际内容)

    任何以”read”或者”skip”开头的操作都会增长readerIndex。

1
2
3
4
5
// Iterates the readable bytes of a buffer.
ByteBuf buffer = ...;
while (buffer.readable()) {
System.out.println(buffer.readByte());
}

可写字节

    任何以write开头的操作都会增加wirerIndex的值。如果没有更多写空间,操作会抛出IndexOutOfBoundException异常。

1
2
3
4
5
// Fills the writable bytes of a buffer with random integers.
ByteBuf buffer = ...;
while (buffer.writableBytes() >= 4) {
buffer.writeInt(random.nextInt());
}

清除索引

    我们可以通过调用clear()函数来重置readerIndex和writerIndex。这个操作不会清除buffer内容,而是重置指针到0.

调用clear后区域图

查询操作

    indexOf()方法可以帮助我们定位一个值的索引位置。更加复杂的操作可以通过ByteBufProcessor实现。

衍生缓冲区

    为了创建一个已存在buffer的视图,可以调用duplicate(),slice(),slice(int, int),readOnly()或者order(ByteOrder)。衍生缓冲区拥有自己独立的writerIndex和ReaderIndex以及索引标志,但却共享内部存储的数据。

    如果不想共享数据的话,可以调用copy()或者copy(int, int)函数来实现。如下示例表示ByteBuf的slice是如何工作的:

1
2
3
4
5
6
Charset utf8 = Charset.forName("UTF-8");
ByteBuf buf = Unpooled.copiedBuffer("Netty in Action rocks!", utf8); //1
ByteBuf sliced = buf.slice(0, 14); //2
System.out.println(sliced.toString(utf8); //3
buf.setByte(0, (byte) 'J'); //4
assert buf.get(0) == sliced.get(0); //5

//1 创建一个ByteBuf

//2 通过slice创建一个索引从0到14的ByteBuf

//3 输出结果为”Netty in Action”

//4 更新索引为0的字节

//5 返回true,因为两个ByteBuf共享同一套数据

下列代码创建了ByteBuf的一份拷贝数据

1
2
3
4
5
6
Charset utf8 = Charset.forName("UTF-8");
ByteBuf buf = Unpooled.copiedBuffer("Netty in Action rocks!", utf8); //1
ByteBuf copy = buf.copy(0, 14); //2
System.out.println(copy.toString(utf8); //3
buf.setByte(0, (byte) 'J'); //4
assert buf.get(0) != copy.get(0); //5

//1 创建一个ByteBuf

//2 copy索引从0到14的数据

//3 输出”Netty in Action”

//4 更新Index=0的字节值

//5 不会失败,因为两个ByteBuf不共用底层是数据

读写操作

ByteBuf有两种读写操作

  • get/set开头的操作需要传入一个给定的index

  • Read/write操作不但读取当前索引的数据,还会增加readerIndex和WriterIndex

    get操作

    set操作

    set操作没有无符号操作

1
2
3
4
5
6
7
8
9
10
11
Charset utf8 = Charset.forName("UTF-8");
ByteBuf buf = Unpooled.copiedBuffer("Netty in Action rocks!", utf8); //1
System.out.println((char) buf.getByte(0)); //2
// 3
int readerIndex = buf.readerIndex();
int writerIndex = buf.writerIndex();
buf.setByte(0, (byte) 'B'); //4
System.out.println((char) buf.getByte(0)); //5
//6
assert readerIndex = buf.readerIndex();
assert writerIndex = buf.writerIndex();

//1 创建ByteBuf

//2 输出首个字节”N”

//3 储存readerIndex和writerIndex

//4 更新index=0的字节为“B”

//5 应该输出更新后的”B”

//6 true,因为get和set操作并不会修改索引

下面看下read和write操作

read操作

write操作

write操作

1
2
3
4
5
6
7
8
9
Charset utf8 = Charset.forName("UTF-8");
ByteBuf buf = Unpooled.copiedBuffer("Netty in Action rocks!", utf8); //1
System.out.println((char) buf.readByte()); //2
//
int readerIndex = buf.readerIndex(); //3
int writerIndex = buf.writerIndex();
buf.writeByte( (byte) '?'); //4
assert readerIndex = buf.readerIndex(); //5
assert writerIndex != buf.writerIndex();

//1 创建ByteBuf

//2 打印第一个字节”N”

//3 存储目前的readerIndex和writeIndex

//4 写一个字节

//5 readerIndex不变,而writerIndex增加

其他有用的操作

其他操作

其他操作

ByteBufHolder

    一般来说,我们在实际应用中除了存储字节数据外,还需要存储一些其他属性。例如Http返回值中,除了返回的数据还有状态,cookie等信息。这种情况,Netty提供了一个抽象接口ByteBufHolder。ByteBufHolder还支持Netty的高级功能,例如其存储的数据可以直接从缓存池中获取,如果需要还可以自动释放。ByteBufHolder仅提供了几个方法:

ByteBufHolder提供的方法

ByteBufAllocator

    之前提过,Netty支持池类缓存ByteBuf。ByteBufAllocator正是为实现池类缓存而设计的,当然用户可以自由选择是否使用池缓存

ByteBufAllocator提供的方法

    ByteBufAllocator允许传入标识ByteBuf初始和最大容量的参数。并且ByteBuf是可以自动扩容的,直到扩容到允许的最大值。

    获取ByteBufAllocator的引用很简单,我们可以通过channel(一般来说,channel还可以保存另一个ByteBufAllocator)或者绑定到ChannelHandler的ChannelHandlerContext获取。如下代码所示为如何获取ByteBufAllocator引用

1
2
3
4
5
6
Channel channel = ...;
ByteBufAllocator allocator = channel.alloc(); //1
....
ChannelHandlerContext ctx = ...;
ByteBufAllocator allocator2 = ctx.alloc(); //2
...

//1 从channel获取ByteBufAllocator

//2 从ChannelHandlerContext中获取ByteBufAllocator

    Netty有两种不同的ByteBufAllocator实现方式,一种通过池子缓存ByteBuf的实例来减少分配和销毁ByteBuf实例的开销,同时尽量减少内存碎片的产生。另外一种不缓存ByteBuf,每次都返回一个新的ByteBuf实例。Netty默认使用PooledByteBufAllocator(带有池缓存),当然可以通过修改ChannelConfig配置或者在启动服务器时指定另一种ByteBufAllocator来使用不使用池缓存的模式。

Unpooled

    有时候我们无法获取ByteBufAllocator的实例来声明ByteBuf,对于这种情况,Netty提供了一个工具类Unpooled,该类包含一些静态方法来创建无池缓存的ByteBuf。

Unpooled提供的方法

当然Unpooled也可以在不使用Netty时用来获取ByteBuf

ByteBufUtil

    ByteBufUtil同样也提供了一些静态方法,尤其在操作ByteBuf时相当有用。其中最有用的应当是hexdump()方法,其主要用于将ByteBuf的内容以十六进制数据输出。