解析Netty中关于ByteBuf内存泄露及释放
近日在使用Netty框架开发程序中出现了内存泄露的问题,百度加调试了一番,做个整理。
直接看解决方法请移步Netty内存泄漏解决ERROR io.netty.util.ResourceLeakDetector - LEAK: ByteBuf.release() was not called before it's garbage-collected
1. ByteBuf分类、回收及使用场景
Netty中按是否使用了池化技术,ByteBuf分为两类,一类是非池化的ByteBuf,包括UnpooledHeapByteBuf、UnpooledDirectByteBuf等等,每次I/O读写都会创建一个新ByteBuf,频繁进行大块内存的分配和回收对性能有一定影响,非池化的ByteBuf可以通过JVM GC自动回收,也推荐手动回收UnpooledDirectByteBuf等使用堆外内存的ByteBuf;另一类是池化的ByteBuf,包括pooledHeapByteBuf、pooledDirectByteBuf等等,其先申请一块大内存池,在内存池中分配空间,对于这种应用级别的内存二次分配,就需要手动对池化的ByteBuf进行释放,否则就有可能出现内存泄露的问题。
ByteBuf 该如何选择: 一般业务数据的内存分配选用Java堆内存UnpooledHeapByteBuf,其实现简单,回收快,不会出现内存管理问题;对于I/O数据的内存分配一般选用池化的直接内存PooledDirectByteBuf,避免Java堆内存到直接内存的拷贝,但使用池化ByteBuf时切记自己分配的内存一定要在用完后手动释放。
Netty的接收和发送ByteBuf采用的DirectBuffers,使用堆外直接内存进行Socket读写,不需要进行字节缓冲区的二次拷贝。如果使用传统的堆内存(heap buffers)进行socket读写,JVM会将堆内存Buffer拷贝一份到直接内存中,然后才写入socket中。相比堆外直接内存,消息在发送过程中多了一次缓冲区的内存拷贝。
2. ByteBuf的计数器实现
那对于池化的ByteBuf在使用完释放回池中时,如何知道自己被引用多少次,并且在没有其他引用的时候被释放呢?ByteBuf的具备实现类都继承了AbstractReferenceCountedByteBuf类,该类实现了对计数器的操作功能。当某一操作使得ByteBuf的引用增加时,调用retain()函数,使计数器的值原子增加,当某一操作使得ByteBuf的引用减少时,调用release()函数,使计数器的值原子减少,减少到0便会触发回收操作。关于AbstractReferenceCountedByteBuf类的源码分析,请见Netty框架AbstractReferenceCountedByteBuf 源码分析。
3. ByteBuf计数器的调用场景
(1)当一个ByteBuf新建时,或从另一个ByteBuf创建出独立个体时(比如copy(),readBytes(int length)),新的ByteBuf在初始化的时候,自己的计数器也会初始化。这种ByteBuf使用结束后,要主动释放
(2)有些ByteBuf从另一个ByteBuf衍生出来时(比如decode(),retainedSlice(index, length)),底层共用同一个buffer,也会调用retain()函数,来使得计数器增加。使用完毕也要主动释放。
(3)有些ByteBuf从另一个ByteBuf衍生出来时(比如duplicate(), slice(), order()),底层与父类ByteBuf共用一个buffer,其没有自己的计数器,共用父类ByteBuf的计数器,计数器也不会增加。因此,如果要把这些衍生ByteBuf传递给其他函数时,必须要主动调用retain()函数,并在本函数释放父类ByteBuf,在下一个函数里释放衍生ByteBuf。如下面代码
ByteBuf parent = ctx.alloc().directBuffer(512); parent.writeBytes(...); try { while (parent.isReadable(16)) { ByteBuf derived = parent.readSlice(16); derived.retain(); process(derived); } } finally { parent.release(); } ... public void process(ByteBuf buf) { ... buf.release(); }
以我遇到的内存泄漏的场景为例
(1)readBytes(int length)函数可能会调用到如下代码,新生成一个ByteBuf
@Override protected PooledByteBuf<ByteBuffer> newByteBuf(int maxCapacity) { if (HAS_UNSAFE) { return PooledUnsafeDirectByteBuf.newInstance(maxCapacity); } else { return PooledDirectByteBuf.newInstance(maxCapacity); } }
(2)ByteBuf in = (ByteBuf) super.decode(ctx,inByteBuf) 调用decode函数时,会调用到buffer.retainedSlice(index, length)函数,会返回原ByteBuf的一个片段,同时使原ByteBuf的计数器增加。它们底层共用同一个buffer,修改一个会影响另一个。
final <U extends AbstractPooledDerivedByteBuf> U init( AbstractByteBuf unwrapped, ByteBuf wrapped, int readerIndex, int writerIndex, int maxCapacity) { wrapped.retain(); // Retain up front to ensure the parent is accessible before doing more work. parent = wrapped; rootParent = unwrapped; ...... }
4. 谁来释放ByteBuf
最基本的规则是谁最后访问ByteBuf,谁最后负责释放。需注意的是:
(1)发送组件将ByteBuf传递给接收组件,发送组件一般不负责释放,由接收组件释放;
(2)如果一个组件除了接收处理ByteBUf,而不做其他操作(比如再传给其他组件),这个组件负责释放ByteBuf。
例如
Action | Who should release? |
Who released? |
1. main() creates buf
|
buf →main()
|
main () releases buf
|
2. main() calls a() with buf
|
buf →a()
|
a() releases buf
|
3. a() returns buf merely. |
buf →main()
|
main () releases buf
|
4. main() calls b() with buf
|
buf →b()
|
b() releases buf
|
5. b() returns the copy of buf
|
buf →b()
|
b() releases buf,
|
6. main() calls c() with copy
|
copy →c()
|
c() releases copy
|
7. c() swallows copy
|
copy →c()
|
c() releases copy
|
5. 在 ChannelHandler负责链中,如何释放
(1)在Inbound messages中
a. 如果ChannelHandler中,只有处理ByteBuf的操作,不会调ctx.fireChannelRead(buf)把ByteBuf传递下去,那就要在这个ChannelHandler中释放ByteBuf。
b. 如果ChannelHandler中,会调ctx.fireChannelRead(buf)把ByteBuf传递给下一个ChannelHandler,那在当前ChannelHandler中不需要释放ByteBuf,由最后一个使用该ByteBuf的ChannelHandler释放。
c. 如果处理的ByteBuf是由decode()等会增加计数器的操作生成的,不再传递时,ByteBuf也要释放。
d. 如果不确定要不要释放,或者简化释放的过程,可以调用ReferenceCountUtil.release(ByteBuf)函数。
e. 也可以把ChannelHandler都承继自SimpleChannelInboundHandler虚类,该类会在channelRead函数中调用ReferenceCountUtil.release(msg)来帮助释放ByteBuf,如下代码所示,channelRead0(ctx, imsg)是一个虚函数,子类实现channelRead0函数用来完成处理逻辑。
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { boolean release = true; try { if (acceptInboundMessage(msg)) { @SuppressWarnings("unchecked") I imsg = (I) msg; channelRead0(ctx, imsg); } else { release = false; ctx.fireChannelRead(msg); } } finally { if (autoRelease && release) { ReferenceCountUtil.release(msg); } } }
(2)在Outbound messages中
Outbound messages中的ByteBuf都是由应用程序产生的,由Netty负责释放。
6. 内存泄露检测
使用引用计数的缺点在于容易产生内存泄露,因为JVM不知道引用计数的存在。当一个对象不可达时,JVM可能会收回该对象,但这个对象的引用计数可能还不是0,这就导致该对象从池里分配的空间不能归还到池里,从而导致内存泄露。
Netty提供了一种内存泄露检测机制,可以通过配置参数不同选择不同的检测级别,参数设置为java -Dio.netty.leakDetection.level=advanced
-
DISABLED
:完全禁用内存泄露检测,不推荐 -
SIMPLE
:抽样1%的ByteBuf,提示是否有内存泄露 -
ADVANCED
:抽样1%的ByteBuf,提示哪里产生了内存泄露 -
PARANOID
:对每一个ByteBu进行检测,提示哪里产生了内存泄露
我在测试时,直接提示了ByteBuf内存泄露的位置,如下,找到自己程序代码,看哪里有新生成的ByteBuf对象没有释放,主动释放一下,调用对象的release()函数,或者用工具类帮助释放ReferenceCountUtil.release(msg)。
2020-06-12 17:04:41.242 [nioEventLoopGroup-2-1] ERROR io.netty.util.ResourceLeakDetector - LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information. Recent access records: Created at: io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:363) io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:187) io.netty.buffer.AbstractByteBufAllocator.buffer(AbstractByteBufAllocator.java:123) io.netty.buffer.AbstractByteBuf.readBytes(AbstractByteBuf.java:872) com.spring.netty.twg.service.TwgMessageDecoder.formatDecoder(TwgMessageDecoder.java:176) com.spring.netty.twg.service.TwgMessageDecoder.getMessageBody(TwgMessageDecoder.java:90) com.spring.netty.twg.service.TwgMessageDecoder.decode(TwgMessageDecoder.java:76) io.netty.handler.codec.LengthFieldBasedFrameDecoder.decode(LengthFieldBasedFrameDecoder.java:332) io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:501)
2020-06-12 17:04:45.460 [nioEventLoopGroup-2-1] ERROR io.netty.util.ResourceLeakDetector - LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information. Recent access records: Created at: io.netty.buffer.SimpleLeakAwareByteBuf.unwrappedDerived(SimpleLeakAwareByteBuf.java:143) io.netty.buffer.SimpleLeakAwareByteBuf.retainedSlice(SimpleLeakAwareByteBuf.java:57) io.netty.handler.codec.LengthFieldBasedFrameDecoder.extractFrame(LengthFieldBasedFrameDecoder.java:498) io.netty.handler.codec.LengthFieldBasedFrameDecoder.decode(LengthFieldBasedFrameDecoder.java:437) com.spring.netty.twg.service.TwgMessageDecoder.decode(TwgMessageDecoder.java:31) io.netty.handler.codec.LengthFieldBasedFrameDecoder.decode(LengthFieldBasedFrameDecoder.java:332)
参考资料
https://netty.io/wiki/reference-counted-objects.html#who-destroys-it
https://blog.****.net/u013202238/article/details/93383887
推荐阅读
-
netty源码解解析(4.0)-23 ByteBuf内存管理:分配和释放
-
【Netty】「萌新入门」(七)ByteBuf 的性能优化-堆内存的分配和释放都是由 Java 虚拟机自动管理的,这意味着它们可以快速地被分配和释放,但是也会产生一些开销。 直接内存需要手动分配和释放,因为它由操作系统管理,这使得分配和释放的速度更快,但是也需要更多的系统资源。 另外,直接内存可以映射到本地文件中,这对于需要频繁读写文件的应用程序非常有用。 此外,直接内存还可以避免在使用 NIO 进行网络传输时发生数据拷贝的情况。在使用传统的 I/O 时,数据必须先从文件或网络中读取到堆内存中,然后再从堆内存中复制到直接缓冲区中,最后再通过 SocketChannel 发送到网络中。而使用直接缓冲区时,数据可以直接从文件或网络中读取到直接缓冲区中,并且可以直接从直接缓冲区中发送到网络中,避免了不必要的数据拷贝和内存分配。 通过 ByteBufAllocator.DEFAULT.directBuffer 方法来创建基于直接内存的 ByteBuf: ByteBuf directBuf = ByteBufAllocator.DEFAULT.directBuffer(16); 通过 ByteBufAllocator.DEFAULT.heapBuffer 方法来创建基于堆内存的 ByteBuf: ByteBuf heapBuf = ByteBufAllocator.DEFAULT.heapBuffer(16); 注意: 直接内存是一种特殊的内存分配方式,可以通过在堆外申请内存来避免 JVM 堆内存的限制,从而提高读写性能和降低 GC 压力。但是,直接内存的创建和销毁代价昂贵,因此需要慎重使用。 此外,由于直接内存不受 JVM 垃圾回收的管理,我们需要主动释放这部分内存,否则会造成内存泄漏。通常情况下,可以使用 ByteBuffer.clear 方法来释放直接内存中的数据,或者使用 ByteBuffer.cleaner 方法来手动释放直接内存空间。 测试代码: public static void testCreateByteBuf { ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(16); System.out.println(buf.getClass); ByteBuf heapBuf = ByteBufAllocator.DEFAULT.heapBuffer(16); System.out.println(heapBuf.getClass); ByteBuf directBuf = ByteBufAllocator.DEFAULT.directBuffer(16); System.out.println(directBuf.getClass); } 运行结果: class io.netty.buffer.PooledUnsafeDirectByteBuf class io.netty.buffer.PooledUnsafeHeapByteBuf class io.netty.buffer.PooledUnsafeDirectByteBuf 池化技术 在 Netty 中,池化技术指的是通过对象池来重用已经创建的对象,从而避免了频繁地创建和销毁对象,这种技术可以提高系统的性能和可伸缩性。 通过设置 VM options,来决定池化功能是否开启: -Dio.netty.allocator.type={unpooled|pooled} 在 Netty 4.1 版本以后,非 Android 平台默认启用池化实现,Android 平台启用非池化实现; 这里我们使用非池化功能进行测试,依旧使用的是上面的测试代码 testCreateByteBuf,运行结果如下所示: class io.netty.buffer.UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeDirectByteBuf class io.netty.buffer.UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf class io.netty.buffer.UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeDirectByteBuf 可以看到,ByteBuf 类由 PooledUnsafeDirectByteBuf 变成了 UnpooledUnsafeDirectByteBuf; 在没有池化的情况下,每次使用都需要创建新的 ByteBuf 实例,这个操作会涉及到内存的分配和初始化,如果是直接内存则代价更为昂贵,而且频繁的内存分配也可能导致内存碎片问题,增加 GC 压力。 使用池化技术可以避免频繁内存分配带来的开销,并且重用池中的 ByteBuf 实例,减少了内存占用和内存碎片问题。另外,池化技术还可以采用类似 jemalloc 的内存分配算法,进一步提升分配效率。 在高并发环境下,池化技术的优点更加明显,因为内存的分配和释放都是比较耗时的操作,频繁的内存分配和释放会导致系统性能下降,甚至可能出现内存溢出的风险。使用池化技术可以将内存分配和释放的操作集中到预先分配的池中,从而有效地降低系统的内存开销和风险。 内存释放 当在 Netty 中使用 ByteBuf 来处理数据时,需要特别注意内存回收问题。 Netty 提供了不同类型的 ByteBuf 实现,包括堆内存(JVM 内存)实现 UnpooledHeapByteBuf 和堆外内存(直接内存)实现 UnpooledDirectByteBuf,以及池化技术实现的 PooledByteBuf 及其子类。 UnpooledHeapByteBuf:通过 Java 的垃圾回收机制来自动回收内存; UnpooledDirectByteBuf:由于 JVM 的垃圾回收机制无法管理这些内存,因此需要手动调用 release 方法来释放内存; PooledByteBuf:使用了池化机制,需要更复杂的规则来回收内存; 由于池化技术的特殊性质,释放 PooledByteBuf 对象所使用的内存并不是立即被回收的,而是被放入一个内存池中,待下次分配内存时再次使用。因此,释放 PooledByteBuf 对象的内存可能会延迟到后续的某个时间点。为了避免内存泄漏和占用过多内存,我们需要根据实际情况来设置池化技术的相关参数,以便及时回收内存; Netty 采用了引用计数法来控制 ByteBuf 对象的内存回收,在博文 「源码解析」ByteBuf 的引用计数机制 中将会通过解读源码的形式对 ByteBuf 的引用计数法进行深入理解; 每个 ByteBuf 对象被创建时,都会初始化为1,表示该对象的初始计数为1。 在使用 ByteBuf 对象过程中,如果当前 handler 已经使用完该对象,需要通过调用 release 方法将计数减1,当计数为0时,底层内存会被回收,该对象也就被销毁了。此时即使 ByteBuf 对象还在,其各个方法均无法正常使用。 但是,如果当前 handler 还需要继续使用该对象,可以通过调用 retain 方法将计数加1,这样即使其他 handler 已经调用了 release 方法,该对象的内存仍然不会被回收。这种机制可以有效地避免了内存泄漏和意外访问已经释放的内存的情况。 一般来说,应该尽可能地保证 retain 和 release 方法成对出现,以确保计数正确。
-
解析Netty中关于ByteBuf内存泄露及释放