netty案例,netty4.1源码分析篇五《一行简单的writeAndFlush的都做了哪些事》
前言介绍
对于使用netty的小伙伴来说,ctx.writeAndFlush()再熟悉不过了,它可以将我们的消息发送出去。那么它都执行了那些行为呢,是怎么将消息发送出去的呢。
源码分析
1、由一行简单发送消息开始
发送消息的代码非常简单,也是我们非常常用的发送消息的方式ctx.writeAndFlush
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //接收msg消息{与上一章节相比,此处已经不需要自己进行解码} System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 接收到消息:" + msg); //通知客户端链消息发送成功 String str = "客户端收到[微信公众号:bugstack虫洞栈]:" + new Date() + " " + msg + "\r\n"; ctx.writeAndFlush(str); }
2、跟进writeAndFlush | ChannelHandlerContext.writeAndFlush
AbstractChannelHandlerContext.java
@Override public ChannelFuture writeAndFlush(Object msg) { return writeAndFlush(msg, newPromise()); } @Override public ChannelPromise newPromise() { return new DefaultChannelPromise(channel(), executor()); }
在这段代码中我们可以看到,writeAndFlush方法里提供了一个默认的newPromise()作为参数传递。{promise:v. 许诺;承诺;答应;保证;使很可能;预示}在Netty中发送消息是一个异步操作,那么可以通过往hannelPromise中注册回调监听listener来得到该操作是否成功。
在发送消息时添加监听
ctx.writeAndFlush("hi 微信公众号:bugstack虫洞栈 | 欢迎关注&获取专题源码", ctx.newProgressivePromise().addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { future.isSuccess(); } }));
3、继续向下一层跟进代码 | AbstractChannelHandlerContext.invokeWriteAndFlush
private void invokeWriteAndFlush(Object msg, ChannelPromise promise) { if (invokeHandler()) { invokeWrite0(msg, promise); invokeFlush0(); } else { writeAndFlush(msg, promise); } }
3.1、首先通过invokeHandler()判断通道处理器已添加到管道
Makes best possible effort to detect if {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} was called yet. If not return {@code false} and if called or could not detect return {@code true}. If this method returns {@code false} we will not invoke the {@link ChannelHandler} but just forward the event. This is needed as {@link DefaultChannelPipeline} may already put the {@link ChannelHandler} in the linked-list but not called {@link ChannelHandler#handlerAdded(ChannelHandlerContext)}.
3.2、执行消息处理
invokeWrite0;首先将消息内容放入输出缓冲区中[ChannelOutboundBuffer]
invokeFlush0;然后将输出缓冲区中的数据通过socket发送到网络中
4、分析invokeWrite0执行内容 | AbstractChannelHandlerContext.invokeWrite0
private void invokeWrite0(Object msg, ChannelPromise promise) { try { ((ChannelOutboundHandler) handler()).write(this, msg, promise); } catch (Throwable t) { notifyOutboundHandlerException(t, promise); } }
((ChannelOutboundHandler) handler()).write是一个出站事件[ChannelOutboundHandler],会由ChannelOutboundHandlerAdapter处理;
/** * Calls {@link ChannelHandlerContext#write(Object, ChannelPromise)} to forward * to the next {@link ChannelOutboundHandler} in the {@link ChannelPipeline}. * * Sub-classes may override this method to change behavior. */ @Skip @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { ctx.write(msg, promise); }
接下来会走到ChannelPipeline中,来执行网络数据发送;| DefaultChannelPipeline.write
@Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { unsafe.write(msg, promise); }
5、unsafe.write执行分析
unsafe是我们构建NioServerSocketChannel或NioSocketChannel对象时,一并构建一个成员属性,它会完成底层真正的网络操作等。NioServerSocketChannel中持有的unsafe成员变量是NioMessageUnsafe对象,而NioSocketChannel中持有的unsafe成员变量是NioSocketChannelUnsafe对象。这里我们要看的是NioSocketChannel的write流程
@Override public final void write(Object msg, ChannelPromise promise) { assertEventLoop(); ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { // If the outboundBuffer is null we know the channel was closed and so // need to fail the future right away. If it is not null the handling of th // will be done in flush0() // See https://github.com/netty/netty/issues/2362 safeSetFailure(promise, newWriteException(initialCloseCause)); // release message now to prevent resource-leak ReferenceCountUtil.release(msg); return; } int size; try { msg = filterOutboundMessage(msg); size = pipeline.estimatorHandle().size(msg); if (size < 0) { size = 0; } } catch (Throwable t) { safeSetFailure(promise, t); ReferenceCountUtil.release(msg); return; } outboundBuffer.addMessage(msg, size, promise); }
- 获取该NioSocketChannel的ChannelOutboundBuffer成员属性。(确切地来说ChannelOutboundBuffer是NioSocketChannelUnsafe对象中的成员属性,而NioSocketChannelUnsafe才是NioSocketChannel的成员属性。)每一个NioSocketChannel会维护一个它们自己的ChannelOutboundBuffer,用于存储待出站写请求。
判断该outboundBuffer是否为null,如果为null则说明该NioSocketChannel已经关闭了,那么就会标志该异步写操作为失败完成,并释放写消息后返回。
AbstractNioByteChannel.java | filterOutboundMessage过滤待发送的消息:
@Override protected final Object filterOutboundMessage(Object msg) { if (msg instanceof ByteBuf) { ByteBuf buf = (ByteBuf) msg; if (buf.isDirect()) { return msg; } return newDirectBuffer(buf); } if (msg instanceof FileRegion) { return msg; } throw new UnsupportedOperationException( "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES); }
过滤待发送的消息,只有ByteBuf(堆 or 非堆)以及 FileRegion可以进行最终的Socket网络传输,其他类型的数据是不支持的,会抛UnsupportedOperationException异常。并且会把堆ByteBuf转换为一个非堆的ByteBuf返回。也就说,最后会通过socket传输的对象时非堆的ByteBuf和FileRegion。
[size = pipeline.estimatorHandle().size(msg);]估计待发送数据的大小:
DefaultMessageSizeEstimator.java | 通过ByteBuf.readableBytes()判断消息内容大小,估计待发送消息数据的大小,如果是FileRegion的话直接饭0,否则返回ByteBuf中可读取字节数。
private static final class HandleImpl implements Handle { private final int unknownSize; private HandleImpl(int unknownSize) { this.unknownSize = unknownSize; } @Override public int size(Object msg) { if (msg instanceof ByteBuf) { return ((ByteBuf) msg).readableBytes(); } if (msg instanceof ByteBufHolder) { return ((ByteBufHolder) msg).content().readableBytes() } if (msg instanceof FileRegion) { return 0; } return unknownSize; } }
ChannelOutboundBuffer.java | ChannelOutboundBuffer.addMessage将消息加入outboundBuffer中等待发送
/** * Add given message to this {@link ChannelOutboundBuffer}. The given {@link ChannelPromise} will be notified once * the message was written. */ public void addMessage(Object msg, int size, ChannelPromise promise) { Entry entry = Entry.newInstance(msg, size, total(msg), promise); if (tailEntry == null) { flushedEntry = null; } else { Entry tail = tailEntry; tail.next = entry; } tailEntry = entry; if (unflushedEntry == null) { unflushedEntry = entry; } // increment pending bytes after adding message to the unflushed arrays. // See https://github.com/netty/netty/issues/1619 incrementPendingOutboundBytes(entry.pendingSize, false); }
6、ChannelOutboundBuffer出栈
一个内部的数据结构,被AbstractChannel用于存储它的待出站写请求。
ChannelOutboundBuffer中有两个属性private Entry unflushedEntry、private Entry flushedEntry。它们都是用Entry对象通过next指针来维护的一个单向链表。以及一个private Entry tailEntry;对象表示始终指向最后一个Entry对象(即,最后加入到该ChannelOutboundBuffer中的写请求的数据消息)
unflushedEntry表示还未刷新的ByteBuf的链表头;flushedEntry表示调用flush()操作时将会进行刷新的ByteBuf的链表头。
7、Entry对象
static final class Entry { private static final Recycler<Entry> RECYCLER = new Recycler<Entry>() { @Override protected Entry newObject(Handle<Entry> handle) { return new Entry(handle); } }; private final Handle<Entry> handle; Entry next; Object msg; ByteBuffer[] bufs; ByteBuffer buf; ChannelPromise promise; long progress; long total; int pendingSize; int count = -1; boolean cancelled; private Entry(Handle<Entry> handle) { this.handle = handle; } static Entry newInstance(Object msg, int size, long total, ChannelPromise promise) { Entry entry = RECYCLER.get(); entry.msg = msg; entry.pendingSize = size + CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD; entry.total = total; entry.promise = promise; return entry; } int cancel() { if (!cancelled) { cancelled = true; int pSize = pendingSize; // release message and replace with an empty buffer ReferenceCountUtil.safeRelease(msg); msg = Unpooled.EMPTY_BUFFER; pendingSize = 0; total = 0; progress = 0; bufs = null; buf = null; return pSize; } return 0; }