欢迎您访问 最编程 本站为您分享编程语言代码,编程技术文章!
您现在的位置是: 首页

理解Netty中的WaterMark水位线概念

最编程 2024-07-30 14:14:51
...

    Netty里write数据的时候,是有水位线处理的,只要我们设置了。

    Netty中write的时候,其实并没有真正将数据写到网络上,而是写到ChannelOutboundBuffer里面缓存起来的,writeAndFlush的时候则会写入网络。如果我们设置了高低水位线,比如低水位线L、高水位线H,如果ChannelOutboundBuffer里面的缓存size大小超过了高水位线H,那么channel.isWritable()是false,即逻辑上的不可写入;而ChannelOutboundBuffer里面缓存的size低于L则channel.isWritable()变为true,即逻辑上可写入。为什么说逻辑上,因为如果我们在写入之前不判断channel.isWritable,其实是可以写入的。

    如果不设置高低水位线限制,可能会有问题,比如网络慢,但是程序往写入数据快,导致很多数据堆积在ChannelOutboundBuffer中,使得内存爆炸。

    Abstract.AbstractUnsafe中,如下:

  1.     AbstraceUnsafe中有个ChannelOutboundBuffer,write()时就是存在这个buffer里面,数据结构是一维数组
  2.     filterOutboundMessage里面,AbstractNioByteChannel里面,会把ByteBuffer转换为DirectByteBuffer或者FileRegion,因为ByteBuffer是JVM内存,转换为DirectByteBuffer或者FileRegion后,减轻JVM GC的压力,同时可以使用zero copy的优点
  3.     estimatorHandle.size()里面返还ByteBuffer中的byte个数,即发送的数据是多少byte
  4.     addMessage()里面,如图1所示,在一维链表的最后添加Entry,这个新的Entry就是我们新write的数据
  5.     addMessage()里面,如List-2所示,TOTAL_PENDING_SIZE_UPDATER是一个AtomicLongFieldUpdater,把ChannelOutboundBuffer里面的totalPendingSize加上新write的数据大小,如果大小highWaterMark值,则设置unwritabble

    List-1

public final void write(Object msg, ChannelPromise promise) {
    assertEventLoop();

    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    if (outboundBuffer == null) {
        // If the outboundBuffer is null we know the channel was closed and so
        // need to fail the future right away. If it is not null the handling of the rest
        // will be done in flush0()
        // See https://github.com/netty/netty/issues/2362
        safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
        // release message now to prevent resource-leak
        ReferenceCountUtil.release(msg);
        return;
    }

    int size;
    try {
        msg = filterOutboundMessage(msg);
        size = pipeline.estimatorHandle().size(msg);
        if (size < 0) {
            size = 0;
        }
    } catch (Throwable t) {
        safeSetFailure(promise, t);
        ReferenceCountUtil.release(msg);
        return;
    }

    outboundBuffer.addMessage(msg, size, promise);
}

    

图1

    List-2 

private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
    if (size == 0) {
        return;
    }

    long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
    if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
        setUnwritable(invokeLater);
    }
}

    AnstractUnsafe里面flush时就是把ChannelOutboundBuffer里面的Entry链表中未flush的Entry写入到网络中,即返回给调用方,所以writeAndFlush是写入得到Entry链表中后,又触发了flush操作,flush时写入网络后,会把totalPendingSize减去相应的大小,如果totalPengingSize的值小于lowWaterMark则将unwritable设置为false,适合及时响应的场景。