欢迎您访问 最编程 本站为您分享编程语言代码,编程技术文章!
您现在的位置是: 首页

ByteBuf

最编程 2024-01-14 14:18:55
...

ByteBuf

ByteBuf 是 Netty 的数据容器,所有网络通信中字节流的传输都是通过 ByteBuf 完成的。与JDK NIO 包中提供的 ByteBuffer 类类似。

我们首先介绍下 JDK NIO 的 ByteBuffer。下图展示了 ByteBuffer 的内部结构:

ByteBuffer 包含以下四个基本属性:

  • mark:为某个读取过的关键位置做标记,方便回退到该位置;
  • position:当前读取的位置;
  • limit:buffer 中有效的数据长度大小;
  • capacity:初始化时的空间容量。

以上四个基本属性的关系是:mark <= position <= limit <= capacity。

他有以下问题:

第一,ByteBuffer 分配的长度是固定的,无法动态扩缩容,所以很难控制需要分配多大的容量。如果分配太大容量,容易造成内存浪费;如果分配太小,存放太大的数据会抛出 BufferOverflowException 异常。在使用 ByteBuffer 时,为了避免容量不足问题,你必须每次在存放数据的时候对容量大小做校验,如果超出 ByteBuffer 最大容量,那么需要重新开辟一个更大容量的 ByteBuffer,将已有的数据迁移过去。整个过程相对烦琐,对开发者而言是非常不友好的。

第二,ByteBuffer 只能通过 position 获取当前可操作的位置,因为读写共用的 position 指针,所以需要频繁调用 flip、rewind 方法切换读写状态,开发者必须很小心处理 ByteBuffer 的数据读写,稍不留意就会出错。

ByteBuffer 作为网络通信中高频使用的数据载体,显然不能够满足 Netty 的需求,Netty 重新实现了一个性能更高、易用性更强的 ByteBuf,相比于 ByteBuffer 它提供了很多非常酷的特性:

  • 容量可以按需动态扩展,类似于 StringBuffer;
  • 读写采用了不同的指针,读写模式可以随意切换,不需要调用 flip 方法;
  • 通过内置的复合缓冲类型可以实现零拷贝;
  • 支持引用计数;
  • 支持缓存池。

 

 

从图中可以看出,ByteBuf 包含三个指针:读指针 readerIndex、写指针 writeIndex、最大容量 maxCapacity,根据指针的位置又可以将 ByteBuf 内部结构可以分为四个部分:

第一部分是废弃字节,表示已经丢弃的无效字节数据。

第二部分是可读字节,表示 ByteBuf 中可以被读取的字节内容,可以通过 writeIndex - readerIndex 计算得出。从 ByteBuf 读取 N 个字节,readerIndex 就会自增 N,readerIndex 不会大于 writeIndex,当 readerIndex == writeIndex 时,表示 ByteBuf 已经不可读。

第三部分是可写字节,向 ByteBuf 中写入数据都会存储到可写字节区域。向 ByteBuf 写入 N 字节数据,writeIndex 就会自增 N,当 writeIndex 超过 capacity,表示 ByteBuf 容量不足,需要扩容。

第四部分是可扩容字节,表示 ByteBuf 最多还可以扩容多少字节,当 writeIndex 超过 capacity 时,会触发 ByteBuf 扩容,最多扩容到 maxCapacity 为止,超过 maxCapacity 再写入就会出错。

由此可见,Netty 重新设计的 ByteBuf 有效地区分了可读、可写以及可扩容数据,解决了 ByteBuffer 无法扩容以及读写模式切换烦琐的缺陷。

继承关系图

 

  • ByteBuf 是一个抽象类,实现 ReferenceCounted 和 Comparable 两个接口,分别具有引用计数和两个 ByteBuf 比较的能力。
  • AbstractByteBuf 是重要的抽象类,它是实现一个 Buffer 的骨架。重写了绝大部分 ByteBuf 抽象类的抽象方法,封装了 ByteBuf 操作的共同逻辑,比如在获取数据前检查索引是否有效等等。在 AbstractByteBuf 中定义了两个重要的指针变量简化用户降低 ByteBuf 的难度:
    • readerIndex
    • writerIndex
  • AbstractReferenceCountedByteBuf 也是重要的抽象类,主要实现了引用计数相关逻辑。内部维护一个 volatile int refCnt 变量。但是所有对 refCnt 的操作都需要通过 ReferenceCountUpdater 实例。AbstractReferenceCountedByteBuf 的子类实现可太多了。重要的有
    • PooledByteBuf: 抽象类,它是拥有池化能力的 ByteBuf 的骨架,内部定义了许多关于池化相关的类和变量。这个后续会详细讲解。
    • UnpooledHeapByteBuf: 实现类,非池化堆内内存ByteBuf。内部使用 byte[] array 字节数组存储数据。底层使用 HeapByteBufUtil 静态方法完成对数组的操作。为了提高性能,使用位移方式处理数据,值得好好体会。
    • CompositeByteBuf: 实现类,可组合的ByteBuf。底层实现通过内部类 Component 包装 ByteBuf,然后使用 Component[] 存储多个 Component 对象从而实现组合模式。
    • UnpooledDirectByteBuf: 实现类,非池化堆外ByteBuf。底层是持有 java.nio.ByteBuffer 对象的引用。
    • FixedCompositeByteBuf: 实现类,固定的可组合ByteBuf。允许以只读模式包装 ByteBuf 数组。

 

ByteBuf 核心 API

// 立即「丢弃」所有已读数据(需要做数据拷贝,将未读内容复制到最前面)
// 即便只有一个字节剩余可写,也执行「丢弃动作」
public abstract ByteBuf discardReadBytes();

// 会判断 readerIndex 指针是否超过了 capacity的一半
// 如果超过了就执行「丢弃」动作
// 这个方法相比 discardReadBytes() 智能一点
public abstract ByteBuf discardSomeReadBytes();

// 确保 minWritableBytes 字节数可写
// 如果容量不够的话,会触发「扩容」动作
// 扩容后的容量范围[64Byte, 4MB]
public abstract ByteBuf ensureWritable(int minWritableBytes);

// 返回一个int类型的值
// 0: 当前ByteBuf有足够可写容量,capacity保持不变
// 1: 当前ByteBuf没有足够可写容量,capacity保持不变
// 2: 当前ByteBuf有足够的可写容量,capacity增加
// 3: 当前ByteBuf没有足够的可写容量,但capacity已增长到最大值
public abstract int ensureWritable(int minWritableBytes, boolean force);

/**
 * 通过set/get方法还是需要将底层数据看成一个个由byte组成的数组,
 * 索引值是根据基本类型长度而增长的。
 * set/get 并不会改变readerIndex和writerIndex的值,
 * 你可以理解为对某个位进行更改操作
 * 至于大端小端,这个根据特定需求选择的。现阶段的我对这个理解不是特别深刻
 */
public abstract int   getInt(int index);
public abstract int   getIntLE(int index);

 * 方法getBytes(int, ByteBuf, int, int)也能实现同样的功能。
 * 两者的区别是:
 *        「当前方法」会增加目标Buffer对象的「writerIndex」的值,
 *     getBytes(int, ByteBuf, int, int)方法不会更改。

/**
 * 从指定的绝对索引处开始,将此缓冲区的数据传输到指定的目标Buffer对象,直到目标对象变为不可写。
 * 
 * 「writerIndex」 「readerIndex」
 *   数据源: 都不修改
 * 目标对象: 增加「writerIndex」 
 * 
 * @param index  索引值
 * @param dst    目标对象
 * @return       源对象
 */
public abstract ByteBuf getBytes(int index, ByteBuf dst);

/**
 * 从指定的绝对索引处开始,将此缓冲区的数据传输到指定的目标Buffer对象,传输长度为length
 * 方法getBytes(int, ByteBuf, int, int)也能实现同样的功能。
 * 
 * 「writerIndex」 「readerIndex」
 *   数据源: 都不修改
 * 目标对象: 增加「writerIndex」 
 * @param index  索引值
 * @param dst    目标对象
 * @param length 拷贝长度
 * @return       源对象
 */
public abstract ByteBuf getBytes(int index, ByteBuf dst, int length);

/**
 * 把数据拷贝到目标数组中
 *
 * 「writerIndex」 「readerIndex」
 *   数据源: 都不修改
 * 目标对象: 无
 */
public abstract ByteBuf getBytes(int index, byte[] dst);

/**
 * 把数据拷贝到目标数组中
 *
 * 「writerIndex」 「readerIndex」
 *   数据源: 都不修改
 * 目标对象: 无
 */
public abstract ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length);

/**
 * 把数据拷贝到目标数组中
 *
 * 「writerIndex」 「readerIndex」
 *   数据源: 都不修改
 * 目标对象: 增加「writerIndex」
 */
public abstract ByteBuf getBytes(int index, ByteBuffer dst);

/**
 * 把数据拷贝到目标对象
 * 以上关于将数据复制给ByteBuf对象的方法最终还是调用此方法进行数据复制
 *
 * 「writerIndex」 「readerIndex」
 *   数据源: 都不修改
 * 目标对象: 都不修改
 */
public abstract ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length);

/**
 * 把数据拷贝到目标流中
 *
 * 「writerIndex」 「readerIndex」
 *   数据源: 都不修改
 * 目标对象: 无
 */
public abstract ByteBuf getBytes(int index, OutputStream out, int length) throws IOException;

/**
 * 把数据拷贝到指定通道
 *
 * 「writerIndex」 「readerIndex」
 *   数据源: 都不修改
 * 目标对象: 无
 */
public abstract int getBytes(int index, GatheringByteChannel out, int length) throws IOException;

/**
 * 把数据拷贝到指定通道,不会修改通道的「position」
 *
 * 「writerIndex」 「readerIndex」
 *   数据源: 都不修改
 * 目标对象: 无
 */
public abstract int getBytes(int index, FileChannel out, long position, int length) throws IOException;

/**
 * 把对象src的 「可读数据(writerIndex-readerIndex)」 拷贝到this.ByteBuf对象中
 * 剩下的参数凡是带有ByteBuf对象的,都和这个处理逻辑类似。
 * 但是setBytes(int index, ByteBuf src, int srcIndex, int length)这个方法就有点与众不同
 * 这个方法都不会修改这两个指针变量的值。
 * 
 * 「writerIndex」 「readerIndex」
 *    src: 增加「readerIndex」的值
 *   this: 都不修改
 */
public abstract ByteBuf setBytes(int index, ByteBuf src);
public abstract ByteBuf setBytes(int index, ByteBuf src, int length);
public abstract ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length);
public abstract ByteBuf setBytes(int index, byte[] src);
public abstract ByteBuf setBytes(int index, byte[] src, int srcIndex, int length);
public abstract ByteBuf setBytes(int index, ByteBuffer src);
public abstract int setBytes(int index, InputStream in, int length) throws IOException;
public abstract int setBytes(int index, ScatteringByteChannel in, int length) throws IOException;
public abstract int setBytes(int index, FileChannel in, long position, int length) throws IOException;
// 使用 NUL(0x00)填充
public abstract ByteBuf setZero(int index, int length);

/**
 * 以下是read操作
 * readerIndex 会按照对应类型增长。
 * 比如readByte()对应readerIndex+1,readShort()对应readerIndex+2
 */
public abstract byte  readByte();
public abstract short readShort();
public abstract short readShortLE();
public abstract int   readUnsignedShort();
public abstract int   readUnsignedShortLE();
public abstract int   readMedium();

/**
 * 从当前的 readerIndex 开始,将这个缓冲区的数据传输到一个新创建的缓冲区,
 * 并通过传输的字节数(length)增加 readerIndex。
 * 返回的缓冲区的 readerIndex 和 writerIndex 分别为0 和 length。
 *
 * @return 一个新创建的ByteBuf对象
 */
public abstract ByteBuf readBytes(int length);

/**
 * 返回一个新的ByteBuf对象。它是一个包装对象,里面有一个指向源Buffer的引用。
 * 该对象只是一个视图,只不过有几个指针独立源Buffer
 * 但是readerIndex(0)和writerIndex(=length)的值是初始的。
 * 另外,需要注意的是当前方法并不会调用 retain()去增加引用计数
 * @return 一个新创建的ByteBuf对象
 */
public abstract ByteBuf readSlice(int length);
public abstract ByteBuf readRetainedSlice(int length);


/**
 * 读取数据到 dst,直到不可读为止。
 *
 * 「writerIndex」 「readerIndex」
 *    dst: 增加「writerIndex」的值
 *   this: 增加「readerIndex」
 * @return 一个新创建的ByteBuf对象
 */
public abstract ByteBuf readBytes(ByteBuf dst);
public abstract ByteBuf readBytes(ByteBuf dst, int length);

/**
 * 读取数据到 dst,直到不可读为止。
 *
 * 「writerIndex」 「readerIndex」
 *    dst: 都不修改
 *   this: 都不修改
 * @return 一个新创建的ByteBuf对象
 */
public abstract ByteBuf readBytes(ByteBuf dst, int dstIndex, int length);

public abstract CharSequence readCharSequence(int length, Charset charset);
public abstract int readBytes(FileChannel out, long position, int length) throws IOException;
public abstract ByteBuf skipBytes(int length);


/**
 * 写入下标为 writerIndex 指向的内存。
 * 如果容量不够,会尝试扩容
 *
 * 「writerIndex」 「readerIndex」
 *    dst: 无
 *   this: 「writerIndex」 + 1
 * @return 一个新创建的ByteBuf对象
 */
public abstract ByteBuf writeByte(int value);

/**
 * 写入下标为 writerIndex 指向的内存。
 * 如果容量不够,会尝试扩容
 *
 * 「writerIndex」 「readerIndex」
 *    dst: 无
 *   this: 「writerIndex」 + 1
 * @return 一个新创建的ByteBuf对象
 */
public abstract ByteBuf writeBytes(ByteBuf src);
public abstract ByteBuf writeBytes(ByteBuf src, int length);
public abstract ByteBuf writeBytes(ByteBuf src, int srcIndex, int length);
public abstract ByteBuf writeBytes(byte[] src);
public abstract ByteBuf writeBytes(byte[] src, int srcIndex, int length);
public abstract ByteBuf writeBytes(ByteBuffer src);
public abstract int writeBytes(FileChannel in, long position, int length) throws IOException;
public abstract ByteBuf writeZero(int length);
public abstract int writeCharSequence(CharSequence sequence, Charset charset);

/**
 * 从「fromIndex」到「toIndex」查找value并返回索引值
 * @return 首次出现的位置索引,-1表示未找到
 */
public abstract int indexOf(int fromIndex, int toIndex, byte value);

/**
 * 定位此缓冲区中指定值的第一个匹配项。
 * 搜索范围[readerIndex, writerIndex)。
 * 
 * @return -1表示未找到
 */
public abstract int bytesBefore(byte value);

/**
 * 搜索范围[readerIndex,readerIndex + length)
 *
 * @return -1表示未找到
 *
 * @throws IndexOutOfBoundsException
 */
public abstract int bytesBefore(int length, byte value);

/**
 * 搜索范围[index, idnex+length)
 *
 * @return -1表示未找到
 *
 * @throws IndexOutOfBoundsException
 */
public abstract int bytesBefore(int index, int length, byte value);

/**
 * 使用指定的处理器按升序迭代该缓冲区的「可读字节」
 *
 * @return -1表示未找到; 如果ByteProcessor.process(byte)返回false,则返回上次访问的索引值
 */
public abstract int forEachByte(ByteProcessor processor);

/**
 * 迭代范围[index, index+length-1)
 */
public abstract int forEachByte(int index, int length, ByteProcessor processor);
public abstract int forEachByteDesc(ByteProcessor processor);
public abstract int forEachByteDesc(int index, int length, ByteProcessor processor);

/**
 * 返回此缓冲区可读字节的副本。两个ByteBuf内容独立。
 * 类似 buf.copy(buf.readerIndex(), buf.readableBytes());
 * 源ByteBuf的指针都不会被修改
 */
public abstract ByteBuf copy();
public abstract ByteBuf copy(int index, int length);

/**
 * 返回该缓冲区可读字节的一个片段。
 * 修改返回的缓冲区或这个缓冲区的内容会影响彼此的内容,同时它们维护单独的索引和标记。
 * 此方法与 buf.slice (buf.readerIndex () ,buf.readableBytes ()相同。
 * 此方法不修改此缓冲区的 readerIndex 或 writerIndex。
 */
public abstract ByteBuf slice();

/**
 * 与 slice().retain() 行为一样
 */
public abstract ByteBuf retainedSlice();
public abstract ByteBuf slice(int index, int length);
public abstract ByteBuf retainedSlice(int index, int length);

/**
 * 内容共享。各自维护独立的索引的标记。
 * 新的ByteBuf的可读内容是和slice()方法返回的一样。但是由于共享底层的ByteBuf对象,
 * 所以底层的所有内容都是可见的。
 * read和write标志并不是复制的。同时也需要注意此方法并不会调用retain()给引用计数+1
 */
public abstract ByteBuf duplicate();
public abstract ByteBuf retainedDuplicate();

/**
 * 返回组成这个缓冲区的 NIO bytebuffer 的最大数目。一般默认是1,对于组合的ByteBuf则计算总和。
 * 
 * @return -1 表示底层没有ByteBuf
 * @see #nioBuffers(int, int)
 */
public abstract int nioBufferCount();

/**
 * 将该缓冲区的可读字节作为 NIO ByteBuffer 公开。共享内容。
 * buf.nioBuffer(buf.readerIndex(), buf.readableBytes()) 结果一样。
 * 请注意,如果这个缓冲区是一个动态缓冲区并调整了其容量,那么返回的NIO缓冲区将不会看到这些变化
 */
public abstract ByteBuffer nioBuffer();
public abstract ByteBuffer nioBuffer(int index, int length);

/**
 * 仅内部使用: 公开内部 NIO 缓冲区。
 */
public abstract ByteBuffer internalNioBuffer(int index, int length);
public abstract ByteBuffer[] nioBuffers();
public abstract ByteBuffer[] nioBuffers(int index, int length);

/**
 * 如果当前ByteBuf拥有支持数据则返回true
 */
public abstract boolean hasArray();
public abstract byte[] array();

/**
 * 返回此缓冲区的支撑字节数组中第一个字节的偏移量。
 */
public abstract int arrayOffset();

/**
 * 当且仅当此缓冲区具有指向「backing data」的低级内存地址的引用时才返回true
 */
public abstract boolean hasMemoryAddress();
public abstract long memoryAddress();

/**
 * 如果此 ByteBuf 内部为单个内存区域则返回true。复合类型的缓冲区必须返回false,即使只包含一个ByteBuf对象。
 */
public boolean isContiguous() {
    return false;
}

public abstract String toString(Charset charset);

public abstract String toString(int index, int length, Charset charset);

@Override
public abstract int hashCode();

@Override
public abstract boolean equals(Object obj);

@Override
public abstract int compareTo(ByteBuf buffer);

@Override
public abstract String toString();

@Override
public abstract ByteBuf retain(int increment);

@Override
public abstract ByteBuf retain();

@Override
public abstract ByteBuf touch();

@Override
public abstract ByteBuf touch(Object hint);

boolean isAccessible() {
    return refCnt() != 0;
}

getXX() 从源 Buffer 复制数据到目标 Buffer。可能会修改目标 Buffer 的 writerIndex。
setXX() 将目标 Buffer 中的数据复制到源 Buffer。可能会修改目标 Buffer 的 readerIndex。
readXX() 表示从 Buffer 中读取数据,会根据基本类型增长源 Buffer 的 readerIndex。
get 和 set 都是相对于 this 而言,比如 this.getXX() 意味着获取 this.buffer 的信息并复制到目标ByteBuf对象中。而 this.setXX() 表示从目标ByteBuf对象中复制数据到 this.buffer。

ByteBuf demo

public class ByteBufTest {

    public static void main(String[] args) {

        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(6, 10);

        printByteBufInfo("ByteBufAllocator.buffer(5, 10)", buffer);

        buffer.writeBytes(new byte[]{1, 2});

        printByteBufInfo("write 2 Bytes", buffer);

        buffer.writeInt(100);

        printByteBufInfo("write Int 100", buffer);

        buffer.writeBytes(new byte[]{3, 4, 5});

        printByteBufInfo("write 3 Bytes", buffer);

        byte[] read = new byte[buffer.readableBytes()];

        buffer.readBytes(read);

        printByteBufInfo("readBytes(" + buffer.readableBytes() + ")", buffer);

        printByteBufInfo("BeforeGetAndSet", buffer);

        System.out.println("getInt(2): " + buffer.getInt(2));

        buffer.setByte(1, 0);

        System.out.println("getByte(1): " + buffer.getByte(1));

        printByteBufInfo("AfterGetAndSet", buffer);

    }

    private static void printByteBufInfo(String step, ByteBuf buffer) {

        System.out.println("------" + step + "-----");

        System.out.println("readerIndex(): " + buffer.readerIndex());

        System.out.println("writerIndex(): " + buffer.writerIndex());

        System.out.println("isReadable(): " + buffer.isReadable());

        System.out.println("isWritable(): " + buffer.isWritable());

        System.out.println("readableBytes(): " + buffer.readableBytes());

        System.out.println("writableBytes(): " + buffer.writableBytes());

        System.out.println("maxWritableBytes(): " + buffer.maxWritableBytes());

        System.out.println("capacity(): " + buffer.capacity());

        System.out.println("maxCapacity(): " + buffer.maxCapacity());

    }

}

ReferenceCounted

定义和引用计数相关的接口。方法的实现一般是在抽象类 io.netty.util.AbstractReferenceCounted中完成。

AbstractByteBuf

它是 ByteBuf 的基本实现骨架,实现了 io.netty.buffer.ByteBuf 大部分的抽象方法,子类只需根据特定功能实现对应抽象方法即可。在 io.netty.buffer.AbstractByteBuf 抽象类中做了以下事情:

  • 定义并维护 5 个指定变量。分别是 readerIndex 、writerIndex 、markedReaderIndex、markedWriterIndex 和 maxCapacity。因此,此抽象类的主要工作也是维护这 5 个变量。比如在 getXX() 方法前判断一下是否满足等等。
  • 初始化 ResourceLeakDetector<ByteBuf> 内存泄漏检测对象。它记录 Netty 各种的 ByteBuf 使用情况,能对占用资源的对象进行监控,无论是否池化、无论堆外堆内。有 4 种级别可选: DISABLED、SIMPLE、ADVANCED 和 PARANOID。监控级别也由低到高,级别越高,可监控的 ByteBuf 数量越多,可获得的信息也越多,但是性能影响也越大。一般建议在 DEBUG 模式下可使用 ADVANCED 或 PARANOID,生产环境使用 SMPLE。其实实现逻辑也是比较简单的,就是对 ByteBuf 对象进行包装,在执行相关API 时记录必要的数据,然后根据这些数据分析哪里出现了内存泄漏,并通过日志告知用户需要进行排查。
// io.netty.buffer.AbstractByteBuf
public abstract class AbstractByteBuf extends ByteBuf {
    static final ResourceLeakDetector<ByteBuf> leakDetector =
        ResourceLeakDetectorFactory.instance().newResourceLeakDetector(ByteBuf.class);
    int readerIndex;
    int writerIndex;
    private int markedReaderIndex;
    private int markedWriterIndex;
    private int maxCapacity;
    
    @Override
    public ByteBuf setByte(int index, int value) {
        checkIndex(index);
        _setByte(index, value);
        return this;
    }

    protected abstract void _setByte(int index, int value);

    @Override
    public byte getByte(int index) {
        checkIndex(index);
        return _getByte(index);
    }

    protected abstract byte _getByte(int index);

    @Override
    public byte readByte() {
        checkReadableBytes0(1);
        int i = readerIndex;
        byte b = _getByte(i);
        readerIndex = i + 1;
        return b;
    }
    // ...
}

 

此外 Netty 可以利用引用计数的特点实现内存泄漏检测工具。JVM 并不知道 Netty 的引用计数是如何实现的,当 ByteBuf 对象不可达时,一样会被 GC 回收掉,但是如果此时 ByteBuf 的引用计数不为 0,那么该对象就不会释放或者被放入对象池,从而发生了内存泄漏。Netty 会对分配的 ByteBuf 进行抽样分析,检测 ByteBuf 是否已经不可达且引用计数大于 0,判定内存泄漏的位置并输出到日志中,你需要关注日志中 LEAK 关键字。

AbstractByteBuf.leakDetector

泄露检测器,用来帮助检测内存泄露。

static final ResourceLeakDetector<ByteBuf> leakDetector =
            ResourceLeakDetectorFactory.instance().newResourceLeakDetector(ByteBuf.class);

AbstractReferenceCountedByteBuf

抽象类,实现了引用计数器接口,内部使用 ReferenceCountUpdater 对象对变量 refCnt 进行增/减操作,操作 refCnt 的唯一入口就是 updater 对象。内部实现还是比较简洁的,因为所有的操作都委派给 ReferenceCountedByteBuf 对象来完成。

public abstract class AbstractReferenceCountedByteBuf extends AbstractByteBuf {
    private static final long REFCNT_FIELD_OFFSET = //refCnt属性的内存偏移地址
            ReferenceCountUpdater.getUnsafeOffset(AbstractReferenceCountedByteBuf.class, "refCnt");
    private static final AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> AIF_UPDATER =//原子更新器
            AtomicIntegerFieldUpdater.newUpdater(AbstractReferenceCountedByteBuf.class, "refCnt");
 // 核心对象,操作变量refCnt的唯一入口
    private static final ReferenceCountUpdater<AbstractReferenceCountedByteBuf> updater =
            new ReferenceCountUpdater<AbstractReferenceCountedByteBuf>() {
        @Override
        protected AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> updater() {
            return AIF_UPDATER;
        }
        @Override
        protected long unsafeOffset() {
            return REFCNT_FIELD_OFFSET;
        }
    };

    // Value might not equal "real" reference count, all access should be via the updater
    @SuppressWarnings("unused")
    private volatile int refCnt = updater.initialValue();

    protected AbstractReferenceCountedByteBuf(int maxCapacity) {
        super(maxCapacity);
    }

    @Override
    boolean isAccessible() {
        // Try to do non-volatile read for performance as the ensureAccessible() is racy anyway and only provide
        // a best-effort guard.
        return updater.isLiveNonVolatile(this);//是否还能用,释放了就不能用了
    }

    @Override
    public int refCnt() {
        return updater.refCnt(this);//获取真实引用计数
    }

    /**
     * An unsafe operation intended for use by a subclass that sets the reference count of the buffer directly
     */
    protected final void setRefCnt(int refCnt) {
        updater.setRefCnt(this, refCnt);//直接设置真实引用计数
    }

    /**
     * An unsafe operation intended for use by a subclass that resets the reference count of the buffer to 1
     */
    protected final void resetRefCnt() {
        updater.resetRefCnt(this);//重新设置真实计数
    }

    @Override
    public ByteBuf retain() {
        return updater.retain(this);//真实计数+1
    }

    @Override
    public ByteBuf retain(int increment) {
        return updater.retain(this, increment);//真实计数+increment
    }

    @Override
    public ByteBuf touch() {
        return this;//获取当前对象
    }

    @Override
    public ByteBuf touch(Object hint) {
        return this;
    }

    @Override
    public boolean release() {
 // 首先通过 updater 更新「refCnt」的值,refCnt=refCnt-2
 // 如果旧值「refCnt」==2,则update.release(this)会返回true,表示当前「ByteBuf」引用计数为0了,是时候需要释放了
 // 释放内存
return handleRelease(updater.release(this));//外部可以调用的尝试释放资源,内部是用引用更新器来判断的 } @Override public boolean release(int decrement) { return handleRelease(updater.release(this, decrement)); } //真正返回才去释放 private boolean handleRelease(boolean result) { if (result) { deallocate(); } return result; } /**

上一篇: 深入了解Netty:高级内置解码器、编码器和ByteBuf

下一篇: 【Netty】Netty 4重大变动及特性(官方文档翻译)