生产环境java.util.concurrent.RejectedExecutionException:事件执行器终止 错误分析
缘由
生产上发生了事故,用户服务其中一台服务很多情况下都不可用,于是查询日志,发现了在从redis获取用户信息缓存的时候出现了错误,栈如下:
org.springframework.data.redis.RedisSystemException: Unknown redis exception; nested exception is java.util.concurrent.RejectedExecutionException: event executor terminated
at org.springframework.data.redis.FallbackExceptionTranslationStrategy.getFallback(FallbackExceptionTranslationStrategy.java:53)
at org.springframework.data.redis.FallbackExceptionTranslationStrategy.translate(FallbackExceptionTranslationStrategy.java:43)
at org.springframework.data.redis.connection.lettuce.LettuceConnection.convertLettuceAccessException(LettuceConnection.java:270)
at org.springframework.data.redis.connection.lettuce.LettuceStringCommands.convertLettuceAccessException(LettuceStringCommands.java:799)
at org.springframework.data.redis.connection.lettuce.LettuceStringCommands.set(LettuceStringCommands.java:180)
at org.springframework.data.redis.connection.DefaultedRedisConnection.set(DefaultedRedisConnection.java:288)
at com.hyzh.common.redis.util.RedisDistributedLock.lambda$setRedis$0(RedisDistributedLock.java:152)
at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:228)
at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:188)
at com.hyzh.common.redis.util.RedisDistributedLock.setRedis(RedisDistributedLock.java:152)
at com.hyzh.common.redis.util.RedisDistributedLock.lock(RedisDistributedLock.java:71)
at com.hyzh.common.redis.aspect.DistributedLockAspect.around(DistributedLockAspect.java:70)
at sun.reflect.GeneratedMethodAccessor748.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.aop.aspectj.AbstractAspectJAdvice.invokeAdviceMethodWithGivenArgs(AbstractAspectJAdvice.java:644)
at org.springframework.aop.aspectj.AbstractAspectJAdvice.invokeAdviceMethod(AbstractAspectJAdvice.java:633)
at org.springframework.aop.aspectj.AspectJAroundAdvice.invoke(AspectJAroundAdvice.java:70)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:175)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749)
at org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:95)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:691)
at com.hyzh.unistars.user.service.impl.UserActionServiceImpl$$EnhancerBySpringCGLIB$$59bb712b.findActionConfigByCode(<generated>)
at com.hyzh.unistars.user.service.impl.UserActionServiceImpl.findActionConfigByCode(UserActionServiceImpl.java:160)
at com.hyzh.unistars.user.service.impl.UserActionServiceImpl$$FastClassBySpringCGLIB$$5acd3627.invoke(<generated>)
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:771)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749)
at org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:95)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:691)
at com.hyzh.unistars.user.service.impl.UserActionServiceImpl$$EnhancerBySpringCGLIB$$59bb712b.findActionConfigByCode(<generated>)
at com.hyzh.unistars.user.service.impl.UserTaskServiceImpl.addUserAccountLogsByUserIdsAndCodes(UserTaskServiceImpl.java:505)
at org.apache.dubbo.common.bytecode.Wrapper53.invokeMethod(Wrapper53.java)
at org.apache.dubbo.rpc.proxy.javassist.JavassistProxyFactory$1.doInvoke(JavassistProxyFactory.java:47)
at org.apache.dubbo.rpc.proxy.AbstractProxyInvoker.invoke(AbstractProxyInvoker.java:84)
at org.apache.dubbo.config.invoker.DelegateProviderMetaDataInvoker.invoke(DelegateProviderMetaDataInvoker.java:56)
at org.apache.dubbo.rpc.protocol.InvokerWrapper.invoke(InvokerWrapper.java:56)
at org.apache.dubbo.monitor.support.MonitorFilter.invoke(MonitorFilter.java:92)
at org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper$1.invoke(ProtocolFilterWrapper.java:82)
at org.apache.dubbo.rpc.protocol.dubbo.filter.TraceFilter.invoke(TraceFilter.java:81)
at org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper$1.invoke(ProtocolFilterWrapper.java:82)
at org.apache.dubbo.rpc.filter.TimeoutFilter.invoke(TimeoutFilter.java:48)
at org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper$1.invoke(ProtocolFilterWrapper.java:82)
at com.hyzh.common.dubbo.ExceptionFilter.invoke(ExceptionFilter.java:33)
at org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper$1.invoke(ProtocolFilterWrapper.java:82)
at com.alibaba.csp.sentinel.adapter.dubbo.SentinelDubboProviderFilter.invoke(SentinelDubboProviderFilter.java:73)
at org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper$1.invoke(ProtocolFilterWrapper.java:82)
at com.hyzh.common.dubbo.DistributedLogTracerInterceptor.invoke(DistributedLogTracerInterceptor.java:41)
at org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper$1.invoke(ProtocolFilterWrapper.java:82)
at org.apache.dubbo.rpc.filter.ContextFilter.invoke(ContextFilter.java:96)
at org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper$1.invoke(ProtocolFilterWrapper.java:82)
at org.apache.dubbo.rpc.filter.GenericFilter.invoke(GenericFilter.java:148)
at org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper$1.invoke(ProtocolFilterWrapper.java:82)
at org.apache.dubbo.rpc.filter.ClassLoaderFilter.invoke(ClassLoaderFilter.java:38)
at org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper$1.invoke(ProtocolFilterWrapper.java:82)
at org.apache.dubbo.rpc.filter.EchoFilter.invoke(EchoFilter.java:41)
at org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper$1.invoke(ProtocolFilterWrapper.java:82)
at org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper$CallbackRegistrationInvoker.invoke(ProtocolFilterWrapper.java:157)
at org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol$1.reply(DubboProtocol.java:152)
at org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeHandler.handleRequest(HeaderExchangeHandler.java:102)
at org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeHandler.received(HeaderExchangeHandler.java:193)
at org.apache.dubbo.remoting.transport.DecodeHandler.received(DecodeHandler.java:51)
at org.apache.dubbo.remoting.transport.dispatcher.ChannelEventRunnable.run(ChannelEventRunnable.java:57)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.RejectedExecutionException: event executor terminated
at io.netty.util.concurrent.SingleThreadEventExecutor.reject(SingleThreadEventExecutor.java:926)
at io.netty.util.concurrent.SingleThreadEventExecutor.offerTask(SingleThreadEventExecutor.java:353)
at io.netty.util.concurrent.SingleThreadEventExecutor.addTask(SingleThreadEventExecutor.java:346)
at io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:828)
at io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:818)
at io.netty.util.concurrent.AbstractScheduledEventExecutor.schedule(AbstractScheduledEventExecutor.java:261)
at io.netty.util.concurrent.AbstractScheduledEventExecutor.schedule(AbstractScheduledEventExecutor.java:177)
at io.netty.util.concurrent.AbstractEventExecutorGroup.schedule(AbstractEventExecutorGroup.java:50)
at io.netty.util.concurrent.AbstractEventExecutorGroup.schedule(AbstractEventExecutorGroup.java:32)
at io.lettuce.core.protocol.CommandExpiryWriter.potentiallyExpire(CommandExpiryWriter.java:164)
at io.lettuce.core.protocol.CommandExpiryWriter.write(CommandExpiryWriter.java:111)
at io.lettuce.core.RedisChannelHandler.dispatch(RedisChannelHandler.java:187)
at io.lettuce.core.StatefulRedisConnectionImpl.dispatch(StatefulRedisConnectionImpl.java:171)
at io.lettuce.core.AbstractRedisAsyncCommands.dispatch(AbstractRedisAsyncCommands.java:467)
at io.lettuce.core.AbstractRedisAsyncCommands.set(AbstractRedisAsyncCommands.java:1213)
at sun.reflect.GeneratedMethodAccessor749.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at io.lettuce.core.FutureSyncInvocationHandler.handleInvocation(FutureSyncInvocationHandler.java:57)
at io.lettuce.core.internal.AbstractInvocationHandler.invoke(AbstractInvocationHandler.java:80)
at com.sun.proxy.$Proxy219.set(Unknown Source)
at org.springframework.data.redis.connection.lettuce.LettuceStringCommands.set(LettuceStringCommands.java:178)
... 66 common frames omitted
同时,也发现了在用户服务也报了内存溢出日志如下:
December 9th 2020, 12:35:16.041 2020-12-09 12:25:52,956 [WARN ] [lettuce-eventExecutorLoop-3-2] [] [i.n.u.c.SingleThreadEventExecutor] Unexpected exception from an event executor:
java.lang.OutOfMemoryError: GC overhead limit exceeded
于是开始重点分析SingleThreadEventExecutor这个类,这个类是netty中eventloop的父类,所以可以理解为他的作用就是轮询读写事件的。
分析总结
从栈信息可以看出,错误出现的时机通过netty的channel往redis写数据的时候抛的这个异常,写数据的时候代码会执行到SingleThreadEventExecutor类的excute方法:
private void execute(Runnable task, boolean immediate) {
boolean inEventLoop = inEventLoop();
addTask(task);
if (!inEventLoop) {
startThread();
if (isShutdown()) {
boolean reject = false;
try {
if (removeTask(task)) {
reject = true;
}
} catch (UnsupportedOperationException e) {
// The task queue does not support removal so the best thing we can do is to just move on and
// hope we will be able to pick-up the task before its completely terminated.
// In worst case we will log on termination.
}
if (reject) {
reject();
}
}
}
if (!addTaskWakesUp && immediate) {
wakeup(inEventLoop);
}
}
分析这个方法,大意是将当前写事件提交到SingleThreadEventExecutor任务队列中。报错是在addTask这个方法:
/**
* Add a task to the task queue, or throws a {@link RejectedExecutionException} if this instance was shutdown
* before.
*/
protected void addTask(Runnable task) {
ObjectUtil.checkNotNull(task, "task");
if (!offerTask(task)) {
reject(task);
}
}
这个方法判断如果没有插入任务队列成功就调用reject(task)拒绝任务,reject(task)里面抛出的异常就是我们看到的最外面的异常。于是看一看offerTask方法:
final boolean offerTask(Runnable task) {
if (isShutdown()) {
reject();
}
return taskQueue.offer(task);
}
@Override
public boolean isShutdown() {
return state >= ST_SHUTDOWN;
}
private static final int ST_NOT_STARTED = 1;
private static final int ST_STARTED = 2;
private static final int ST_SHUTTING_DOWN = 3;
private static final int ST_SHUTDOWN = 4;
private static final int ST_TERMINATED = 5;
这里可以看出offerTask的时候校验了当前的SingleThreadEventExecutor的状态是否是结束和终止。那么问题来了,是什么导致的SingleThreadEventExecutor的状态为终止的勒,通过前面缘由中日志可以看到SingleThreadEventExecutor也打印了堆溢出的错误日志,搜索这个日志是哪里打印的,发现:
private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run() {
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
for (;;) {
int oldState = state;
if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
break;
}
}
// Check if confirmShutdown() was called at the end of the loop.
if (success && gracefulShutdownStartTime == 0) {
if (logger.isErrorEnabled()) {
logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " +
"be called before run() implementation terminates.");
}
}
try {
// Run all remaining tasks and shutdown hooks. At this point the event loop
// is in ST_SHUTTING_DOWN state still accepting tasks which is needed for
// graceful shutdown with quietPeriod.
for (;;) {
if (confirmShutdown()) {
break;
}
}
// Now we want to make sure no more tasks can be added from this point. This is
// achieved by switching the state. Any new tasks beyond this point will be rejected.
for (;;) {
int oldState = state;
if (oldState >= ST_SHUTDOWN || STATE_UPDATER.compareAndSet(
SingleThreadEventExecutor.this, oldState, ST_SHUTDOWN)) {
break;
}
}
// We have the final set of tasks in the queue now, no more can be added, run all remaining.
// No need to loop here, this is the final pass.
confirmShutdown();
} finally {
try {
cleanup();
} finally {
// Lets remove all FastThreadLocals for the Thread as we are about to terminate and notify
// the future. The user may block on the future and once it unblocks the JVM may terminate
// and start unloading classes.
// See https://github.com/netty/netty/issues/6596.
FastThreadLocal.removeAll();
STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
threadLock.countDown();
int numUserTasks = drainTasks();
if (numUserTasks > 0 && logger.isWarnEnabled()) {
logger.warn("An event executor terminated with " +
"non-empty task queue (" + numUserTasks + ')');
}
terminationFuture.setSuccess(null);
}
}
}
}
});
}
这个方法,这个方法里面同时在finally设置了状态为ST_TERMINATED,这个方法大意是启动eventloop真正的线程来轮询读写事件,这个方法将在首次执行excute方法的时候被调用,注意这里的executor变量是一个线程池,于是查阅资料才认识到,eventloopgroup下的每个eventloop实际上都会引用到一个线程池,这个线程池线程数目与eventloop总大小相同,是fork/join框架创建的,真正执行轮询逻辑实际上是这个线程池去提交的。eventloop的意义实际上只是为了保存任务队列。真正的去轮询任务队列的逻辑是由其代理的线程池去执行的。
这样也就清楚了,整个事故实际很简单,就是先由于内存溢出的错误导致eventloop的状态为终止,这个终止状态据我看到的代码貌似是不可逆的,然后当在调用chanel.write的时候提交任务到eventloop校验状态为终止所以报了拒绝错误。
解决这个bug的重要途径是找到内存溢出的原因,这里就不赘述了,因为在网上没有搜到这个错的原因,所以好奇跟了一下代码,同时对netty的eventloop加深了一丢丢理解,所以这里做个记录。
上一篇: Tomcat 架构设计精髓解析--连接器高内聚低耦合设计
下一篇: 表格(连锁操作 2)
推荐阅读
-
生产环境java.util.concurrent.RejectedExecutionException:事件执行器终止 错误分析
-
纯干货分享 | 研发效能提升——敏捷需求篇-而敏捷需求是提升效能的方式中不可或缺的模块之一。 云智慧的敏捷教练——Iris Xu近期在公司做了一场分享,主题为「敏捷需求挖掘和组织方法,交付更高业务价值的产品」。Iris具有丰富的团队敏捷转型实施经验,完成了企业多个团队从传统模式到敏捷转型的落地和实施,积淀了很多的经验。 这次分享主要包含以下2个部分: 第一部分是用户影响地图 第二部分是事件驱动的业务分析Event driven business analysis(以下简称EDBA) 用户影响地图,是一种从业务目标到产品需求映射的需求挖掘和组织的方法。 在软件开发过程中可能会遇到一些问题,比如大家使用不同的业务语言、技术语言,造成角色间的沟通阻碍,还会导致一些问题,比如需求误解、需求传递错误等;这会直接导致产品的功能需求和要实现的业务目标不是映射关系。 但在交付期间,研发人员必须要将这些需求实现交付,他们实则并不清楚这些功能需求产生的原因是什么、要解决客户的哪些痛点。研发人员往往只是拿到了解决方案,需要把它实现,但没有和业务侧一起去思考解决方案是否正确,能否真正的帮助客户解决问题。而用户影响地图通常是能够连接业务目标和产品功能的一种手段。 我们在每次迭代里加入的假设,也就是功能需求。首先把它先实现,再逐步去验证我们每一个小目标是否已经实现,再看下一个目标要是什么。那影响地图就是在这个过程中帮我们不断地去梳理目标和功能之间的关系。 我们在软件开发中可能存在的一些问题 针对这些问题,我们如何避免?先简单介绍做敏捷转型的常规思路: 先做团队级的敏捷,首先把产品、开发、测试人员,还有一些更后端的人员比如交互运维的同学放在一起,组成一个特训团队做交付。这个团队要包含交付过程中所涉及的所有角色。 接着业务敏捷要打通整个业务环节和研发侧的一个交付。上图中可以看到在敏捷中需求是分层管理的,第一层是业务需求,在这个层级是以用户目标和业务目标作为输入进行规划,同时需要去考虑客户的诉求。业务人员通过获取到的业务需求,进一步的和团队一起将其分解为产品需求。所以业务需求其实是我们真正去发布和运营的单元,它可以被独立发布到我们的生产环境上。我们的产品需求其实就是产品的具体功能,它是我们集成和测试的对象,也就是我们最终去部署到系统上的一个基本单元。产品需求再到了我们的开发团队,映射到迭代计划会上要把它分解为相应的技术任务,包括我们平时所说的比如一些前端的开发、后端的开发、测试都是相应的技术任务。所以业务敏捷要达到的目标是需要去持续顺畅高质量的交付业务价值。 将这几个点串起来,形成金字塔结构。最上层我们会把业务目标放在整个金字塔的塔尖。这个业务目标是通过用户的目标以及北极星指标确立的。确认业务目标后再去梳理相应的业务流程,最后生产。另外产品需求包含了操作流程和业务规则,具需求交付时间、工程时间以及我们的一些质量标准的要求。 谈到用户影响的地图,在敏捷江湖上其实有一个传说,大家都有一个说法叫做敏捷需求的“任督二脉”。用户影响地图其实就是任脉,在黑客马拉松上用过的用户故事地图其实叫督脉。所以说用户影响地图是在用户故事地图之前,先帮我们去梳理出我们要做哪些东西。当我们真正识别出我们要实现的业务活动之后,用户故事地图才去梳理我们整个的业务工作流,以及每个工作流节点下所要包含的具体功能和用户故事。所以说用户影响地图需要解决的问题,我们包括以下这些: 首先是范围蔓延,我们在整张地图上,功能和对应的业务目标是要去有一个映射的。这就避免了一些在我们比如有很多干系人参与的会议上,那大家都有不同想法些立场,会提出很多需求(正确以及错误的需求)。这个时候我们会依据目标去看这些需求是否真的是会影响我们的目标。 这里提到的错误需求,比如是利益相关的人提出的、客户认为产品应该有的、某个产品经理需求分析师认为可以有的....但是这些功能在用户影响地图中匹配不到对应目标的话,就需要降低优先级或弃掉。另外,通常我们去制定解决方案的时候,会考虑较完美的实现,导致解决方案括很多的功能。这个时候关键目标至关重要,会帮助我们梳理筛选、确定优先级。 看一下用户影响到地图概貌 总共分为一个三层的结构: 第一层why,你的业务目标哪个是最重要的,为什么?涉及到的角色有哪些? 第二层how ,怎样产生影响?影响用户角色什么样的行为? (不需要去列出所有的影响,基于业务目标) 第三层what,最关键的是在梳理需求时不需一次把所有细节想全,这通常团队中经常遇到的问题。 我们用这个例子来看一下 这是一个客服中心的影响地图,业务目标是 3个月内不增加客服人数的前提下能支持1.5倍的用户数。此业务目标设定是符合 smart 原则的,specific非常的具体,miserable 是可以衡量的,action reoriented是面向活动的, real list 也是很实际的。 量化的目标会指引我们接下来的行动,梳理一个业务目标,尽量去量化,比如 :我们通过打造一条什么样的流水线,能够提高整个部署的效率,时间是原来的 1/2 。这样才是一个能量化的有意义的目标。 回到这幅图, how 层级识别出来的内容,客服角色:想要对它施加的影响,把客户引导到论坛上,帮助客户更容易的跟踪问题,更快速的去定位问题。初级用户:方论坛上找到问题。高级用户:在论坛上回答问题。通过我们这些用户角色,进行活动,完成在不增加客户客服人数的前提下支持更多的用户数量。 最后一个层级,才是我们日常接触比较多的真正的功能的特性和需求,比如引导到客户到论坛上,其实这个产品就需要有一个常见问题的论坛的链接。这个层次需要我们团队进一步地在交付,在每个迭代之前做进一步的梳理,细化成相应的用户故事。 这个是云智慧团队中,自己做的影响地图的范例,可以看下整个的层级结构。序号表示优先级。 那我们用户影响地图可以总结为: