理解Netty中的WaterMark水位线概念
Netty里write数据的时候,是有水位线处理的,只要我们设置了。
Netty中write的时候,其实并没有真正将数据写到网络上,而是写到ChannelOutboundBuffer里面缓存起来的,writeAndFlush的时候则会写入网络。如果我们设置了高低水位线,比如低水位线L、高水位线H,如果ChannelOutboundBuffer里面的缓存size大小超过了高水位线H,那么channel.isWritable()是false,即逻辑上的不可写入;而ChannelOutboundBuffer里面缓存的size低于L则channel.isWritable()变为true,即逻辑上可写入。为什么说逻辑上,因为如果我们在写入之前不判断channel.isWritable,其实是可以写入的。
如果不设置高低水位线限制,可能会有问题,比如网络慢,但是程序往写入数据快,导致很多数据堆积在ChannelOutboundBuffer中,使得内存爆炸。
Abstract.AbstractUnsafe中,如下:
- AbstraceUnsafe中有个ChannelOutboundBuffer,write()时就是存在这个buffer里面,数据结构是一维数组
- filterOutboundMessage里面,AbstractNioByteChannel里面,会把ByteBuffer转换为DirectByteBuffer或者FileRegion,因为ByteBuffer是JVM内存,转换为DirectByteBuffer或者FileRegion后,减轻JVM GC的压力,同时可以使用zero copy的优点
- estimatorHandle.size()里面返还ByteBuffer中的byte个数,即发送的数据是多少byte
- addMessage()里面,如图1所示,在一维链表的最后添加Entry,这个新的Entry就是我们新write的数据
- addMessage()里面,如List-2所示,TOTAL_PENDING_SIZE_UPDATER是一个AtomicLongFieldUpdater,把ChannelOutboundBuffer里面的totalPendingSize加上新write的数据大小,如果大小highWaterMark值,则设置unwritabble
List-1
public final void write(Object msg, ChannelPromise promise) {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
// If the outboundBuffer is null we know the channel was closed and so
// need to fail the future right away. If it is not null the handling of the rest
// will be done in flush0()
// See https://github.com/netty/netty/issues/2362
safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
// release message now to prevent resource-leak
ReferenceCountUtil.release(msg);
return;
}
int size;
try {
msg = filterOutboundMessage(msg);
size = pipeline.estimatorHandle().size(msg);
if (size < 0) {
size = 0;
}
} catch (Throwable t) {
safeSetFailure(promise, t);
ReferenceCountUtil.release(msg);
return;
}
outboundBuffer.addMessage(msg, size, promise);
}
图1
List-2
private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
if (size == 0) {
return;
}
long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
setUnwritable(invokeLater);
}
}
AnstractUnsafe里面flush时就是把ChannelOutboundBuffer里面的Entry链表中未flush的Entry写入到网络中,即返回给调用方,所以writeAndFlush是写入得到Entry链表中后,又触发了flush操作,flush时写入网络后,会把totalPendingSize减去相应的大小,如果totalPengingSize的值小于lowWaterMark则将unwritable设置为false,适合及时响应的场景。
下一篇: Netty那点事(一)概述