Netty 源码解析:ChannelOutboundBuffer 实现与 Flush 过程
前面文章说了,ChannelHandlerContext#write只是将数据缓存到ChannelOutboundBuffer,等到ChannelHandlerContext#flush时,再将ChannelOutboundBuffer缓存的数据写到Channel中。
本文分享Netty中ChannelOutboundBuffer的实现以及Flush过程。
源码分析基于Netty 4.1
每个Channel的AbstractUnsafe#outboundBuffer 都维护了一个ChannelOutboundBuffer。
ChannelOutboundBuffer,出站数据缓冲区,负责缓存
ChannelHandlerContext#write
的数据。通过链表管理数据,链表节点为内部类Entry。
关键字段如下
Entry tailEntry; // 链表最后一个节点,新增的节点添加其后。 Entry unflushedEntry; // 链表中第一个未刷新的节点 Entry flushedEntry; // 链表中第一个已刷新但数据未写入的节点 int flushed; // 已刷新但数据未写入的节点数
ChannelHandlerContext#flush操作前,需要先刷新一遍待处理的节点(主要是统计本次ChannelHandlerContext#flush操作可以写入多少个节点数据),从unflushedEntry开始。刷新完成后使用flushedEntry标志第一个待写入的节点,flushed为待写入节点数。
前面分享Netty读写过程的文章说过,AbstractUnsafe#write处理写操作时,会调用ChannelOutboundBuffer#addMessage将数据缓存起来
public void addMessage(Object msg, int size, ChannelPromise promise) { // #1 Entry entry = Entry.newInstance(msg, size, total(msg), promise); if (tailEntry == null) { flushedEntry = null; } else { Entry tail = tailEntry; tail.next = entry; } tailEntry = entry; if (unflushedEntry == null) { unflushedEntry = entry; } incrementPendingOutboundBytes(entry.pendingSize, false); }
#1
构建一个Entry,注意,这里使用了对象池RECYCLER,后面有文章详细解析。
主要是更新tailEntry和unflushedEntry
#2
如果当前缓存数量超过阀值WriteBufferWaterMark#high,更新unwritable标志为true,并触发 pipeline.fireChannelWritabilityChanged()
方法。
由于ChannelOutboundBuffer链表没有大小限制,不断累积数据可能导致 OOM,
为了避免这个问题,我们可以在unwritable标志为true时,不再继续缓存数据。
Netty只会更新unwritable标志,并不阻止数据缓存,我们可以根据需要实现该功能。示例如下
if (ctx.channel().isActive() && ctx.channel().isWritable()) { ctx.writeAndFlush(responseMessage); } else { ... }
addFlush方法负责刷新节点(ChannelHandlerContext#flush操作前调用该方法统计可写入节点数据数)
public void addFlush() { // #1 Entry entry = unflushedEntry; if (entry != null) { // #2 if (flushedEntry == null) { // there is no flushedEntry yet, so start with the entry flushedEntry = entry; } do { // #3 flushed ++; if (!entry.promise.setUncancellable()) { // Was cancelled so make sure we free up memory and notify about the freed bytes int pending = entry.cancel(); decrementPendingOutboundBytes(pending, false, true); } entry = entry.next; // #4 } while (entry != null); // All flushed so reset unflushedEntry // #5 unflushedEntry = null; } }
#1
从unflushedEntry节点开始处理
#2
赋值flushedEntry为unflushedEntry。
ChannelHandlerContext#flush写入完成后会置空flushedEntry
#3
增加flushed
设置节点的ChannelPromise不可取消
#4
从unflushedEntry开始,遍历后面节点
#5
置空unflushedEntry,表示当前所有节点都已刷新。
nioBuffers方法负责将当前缓存的ByteBuf转发为(jvm)ByteBuffer
public ByteBuffer[] nioBuffers(int maxCount, long maxBytes) { assert maxCount > 0; assert maxBytes > 0; long nioBufferSize = 0; int nioBufferCount = 0; // #1 final InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get(); ByteBuffer[] nioBuffers = NIO_BUFFERS.get(threadLocalMap); Entry entry = flushedEntry; while (isFlushedEntry(entry) && entry.msg instanceof ByteBuf) { if (!entry.cancelled) { ByteBuf buf = (ByteBuf) entry.msg; final int readerIndex = buf.readerIndex(); final int readableBytes = buf.writerIndex() - readerIndex; if (readableBytes > 0) { // #2 if (maxBytes - readableBytes nioBuffers.length) { nioBuffers = expandNioBufferArray(nioBuffers, neededSpace, nioBufferCount); NIO_BUFFERS.set(threadLocalMap, nioBuffers); } // #4 if (count == 1) { ByteBuffer nioBuf = entry.buf; if (nioBuf == null) { // cache ByteBuffer as it may need to create a new ByteBuffer instance if its a // derived buffer entry.buf = nioBuf = buf.internalNioBuffer(readerIndex, readableBytes); } nioBuffers[nioBufferCount++] = nioBuf; } else { ... } if (nioBufferCount == maxCount) { break; } } } entry = entry.next; } this.nioBufferCount = nioBufferCount; this.nioBufferSize = nioBufferSize; return nioBuffers; }
#1
从线程缓存中获取nioBuffers变量,这样可以避免反复构造ByteBuffer数组的性能损耗
#2
maxBytes,即本次操作最大的字节数。
maxBytes - readableBytes < nioBufferSize
,表示如果本次操作后将超出maxBytes,退出
#3
buf.nioBufferCount(),获取ByteBuffer数量,CompositeByteBuf可能有多个ByteBuffer组成。
neededSpace,即nioBuffers数组中ByteBuffer数量,nioBuffers长度不够时需要扩容。
#4
buf.internalNioBuffer(readerIndex, readableBytes)
,使用readerIndex, readableBytes构造一个ByteBuffer。
这里涉及ByteBuf相关知识,后面有文章详细解析。
ChannelHandlerContext#flush完成后,需要移除对应的缓存节点。
public void removeBytes(long writtenBytes) { for (;;) { // #1 Object msg = current(); if (!(msg instanceof ByteBuf)) { assert writtenBytes == 0; break; } final ByteBuf buf = (ByteBuf) msg; final int readerIndex = buf.readerIndex(); final int readableBytes = buf.writerIndex() - readerIndex; // #2 if (readableBytes writtenBytes // #3 if (writtenBytes != 0) { buf.readerIndex(readerIndex + (int) writtenBytes); progress(writtenBytes); } break; } } clearNioBuffers(); }
#1
current方法返回flushedEntry节点缓存数据。
结果null时,退出循环
#2
当前节点的数据已经全部写入,
progress方法唤醒数据节点上ChannelProgressivePromise的监听者
writtenBytes减去对应字节数
remove()方法移除节点,释放ByteBuf,flushedEntry标志后移。
#3
当前节点的数据部分写入,它应该是本次ChannelHandlerContext#flush操作的最后一个节点
更新ByteBuf的readerIndex,下次从这里开始读取数据。
退出
移除数据节点
public boolean remove() { Entry e = flushedEntry; if (e == null) { clearNioBuffers(); return false; } Object msg = e.msg; ChannelPromise promise = e.promise; int size = e.pendingSize; // #1 removeEntry(e); if (!e.cancelled) { // only release message, notify and decrement if it was not canceled before. // #2 ReferenceCountUtil.safeRelease(msg); safeSuccess(promise); decrementPendingOutboundBytes(size, false, true); } // recycle the entry // #3 e.recycle(); return true; }
#1
flushed减1
当flushed为0时,flushedEntry赋值为null,否则flushedEntry指向后一个节点。
#2
释放ByteBuf
#3
当前节点返回对象池中,以便复用。
下面来看一下ChannelHandlerContext#flush操作过程。
ChannelHandlerContext#flush -> HeadContext#flush -> AbstractUnsafe#flush
public final void flush() { assertEventLoop(); ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { return; } // #1 outboundBuffer.addFlush(); // #2 flush0(); }
#1
刷新outboundBuffer中数据节点
#2
写入操作
flush -> NioSocketChannel#doWrite
protected void doWrite(ChannelOutboundBuffer in) throws Exception { SocketChannel ch = javaChannel(); int writeSpinCount = config().getWriteSpinCount(); do { // #1 if (in.isEmpty()) { clearOpWrite(); return; } // #2 int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite(); ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite); int nioBufferCnt = in.nioBufferCount(); switch (nioBufferCnt) { case 0: // #3 writeSpinCount -= doWrite0(in); break; case 1: { // #4 ByteBuffer buffer = nioBuffers[0]; int attemptedBytes = buffer.remaining(); final int localWrittenBytes = ch.write(buffer); if (localWrittenBytes 0); incompleteWrite(writeSpinCount < 0); }
#1
通过ChannelOutboundBuffer#flushed判断是否没有数据可以写,没有数据则清除关注事件OP_WRITE,直接返回。
#2
获取ChannelOutboundBuffer中ByteBuf维护的(jvm)ByteBuffer,并统计nioBufferSize,nioBufferCount。
#3
这时没有ByteBuffer,但是可能有其他类型的数据(如FileRegion类型),调用doWrite0继续处理,这里不再深入
#4
只有一个ByteBuffer,调用SocketChannel#write将数据写入Channel。
#5
如果写入数据数量小于等于0,说明数据没有被写出去(可能是因为套接字的缓冲区满了等原因),那么就需要关注该Channel上的OP_WRITE事件,方便下次EventLoop将Channel轮询出来的时候,能继续写数据。
#6
移除ChannelOutboundBuffer缓存数据节点。
#7
有多个ByteBuffer,调用 SocketChannel#write(ByteBuffer[] srcs, int offset, int length)
,批量写入,与上一种情况处理类似
回顾之前文章《事件循环机制实现原理》中对NioEventLoop#processSelectedKey方法的解析
... if ((readyOps & SelectionKey.OP_WRITE) != 0) { ch.unsafe().forceFlush(); }
这里会调用forceFlush方法,再次写入数据。
FlushConsolidationHandler
ChannelHandlerContext#flush是很昂贵的操作,可能触发系统调用,但数据又不能缓存太久,使用FlushConsolidationHandler可以尽量达到写入延迟与吞吐量之间的权衡。
FlushConsolidationHandler中维护了explicitFlushAfterFlushes变量,
在ChannelOutboundHandler#channelRead中调用flush,如果调用次数小于explicitFlushAfterFlushes, 会拦截flush操作不执行。
在channelReadComplete后调用flush,则不会拦截flush操作。
本文涉及ByteBuf组件,它是Netty中的内存缓冲区,后面有文章解析。
文章最后,附图一张