SpringBoot > 启动异步事件监听器机制
Spring 监听/观察者 模式 (异步启用)
eg业务场景: 用户下单,订单创建成功,需要发送邮件通知用户,为用户的订单创建行为增加积分,短信通知等等(订单主体,订单商品附属表信息,订单发货信息,订单分期支付信息,订单优惠信息,支付优惠信息)一系列的动作的处理。
- 事件 Event
(发送邮件,创建订单商品附属表信息,订单发货信息,订单分期支付信息,订单优惠信息,支付优惠信息 等等)可以理解为事件;在关注的业务 如 【订单的创建(事件源)】,创建订单后需要【发送邮件通知(事件发布)】(事件:需要发送邮件或是其他业务) , 这些后续不影响订单主流程的业务,可以拆解到监听业务处理。
- 监听 Listener
事件发送后,需要通知的对象,告诉需要进行的下一步的操作。如 发邮件 。监听到事件发送后,做具体的业务处理,调用发送邮件逻辑进行发送。
- 开启异步配置 Config
配置方式有两种: 注解 和 xml配置
- xml配置
<!--异步线程池可以定义多个,若在使用注解 @Async 时没有指定使用哪个线程池,则使用默认的线程
1. ‘id' : 线程的名称的前缀
2. ‘pool-size':线程池的大小。支持范围”min-max”和固定值(此时线程池core和max sizes相同)
3. ‘queue-capacity' :排队队列长度
-->
<!-- 缺省的异步任务线程池 -->
<task:annotation-driven executor="asyncExecutor" />
<task:executor id="asyncExecutor" pool-size="100-10000" queue-capacity="10" />
<!-- 处理email发送的线程池 -->
<task:executor id="emailExecutor" pool-size="15-1000" queue-capacity="5" keep-alive="5"/>
- 注解 @EnableAsync 配置(配置类)
@Configuration
@EnableAsync
public class SpringConfig {
/** Set the ThreadPoolExecutor's core pool size. */
private int corePoolSize = 10;
/** Set the ThreadPoolExecutor's maximum pool size. */
private int maxPoolSize = 200;
/** Set the capacity for the ThreadPoolExecutor's BlockingQueue. */
private int queueCapacity = 10;
private String ThreadNamePrefix = "asyncExecutor-";
@Bean
public Executor asyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(corePoolSize);
executor.setMaxPoolSize(maxPoolSize);
executor.setQueueCapacity(queueCapacity);
executor.setThreadNamePrefix(ThreadNamePrefix);
// rejection-policy:当pool已经达到max size的时候,如何处理新任务
// CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
}
- 使用 @Async 注解启用异步线程处理
这个注解用于标注某个方法或某个类里面的所有方法都是需要异步处理的。被注解的方法被调用的时候,会在新线程中执行,而调用它的方法会在原来的线程中执行。这样可以避免阻塞、以及保证任务的实时性。适用于处理log、发送邮件、短信……等。
/**
* 异步创建兑换码
* @Title: TicektCodeListener
* @Description:
*/
@Component
public class TicektCodeListener {
@Autowired
private MkDiscountCardService mkDiscountCardService;
//异步监听器
@Async
@EventListener
public void dualEven(TicketCodeEvent event){
mkDiscountCardService.createCardCode(event.getDto());
}
}
项目使用实例:
下面使用 一个优惠券的券码创建业务,来举例使用。Spring的监听/观察者 模式。
步骤:
- 定义事件
事件定义,用于监听者获取 事件的属性,信息传递
/**
* 兑换码
* @Title: TODO
*/
public class TicketCodeEvent {
private static final long serialVersionUID = 1L;
private MkDiscountCardDto dto;
public TicketCodeEvent(String code,MkDiscountCardDto dto) {
this.code = code;
this.dto = dto;
}
private String code;
public String getCode() {
return code;
}
public void setCode(String code) {
this.code = code;
}
public MkDiscountCardDto getDto() {
return dto;
}
public void setDto(MkDiscountCardDto dto) {
this.dto = dto;
}
}
- 定义监听器
注意:
- @Componet ,加入bean管理 , 同时要注意spring的包是否有扫描到。
- @Async , 启用异步线程处理
- @EventListener ,声明这个事件监听器,监听事件为: TicketCodeEvent
/**
* 异步创建兑换码
* @Title: TicektCodeListener
* @Description:
*/
@Component
public class TicektCodeListener {
@Autowired
private MkDiscountCardService mkDiscountCardService;
//异步监听器
@Async
@EventListener
public void dualEven(TicketCodeEvent event){
mkDiscountCardService.createCardCode(event.getDto());
}
}
- 事件发布
-
使用注解注入获取上下文,此次没有采用 接口实现的方式
-
注意点:调用上下文 context.publishEvent(Event e) 发布事件
@Service
public class MkDiscountCardServiceImpl implements MkDiscountCardService/*,ApplicationContextAware*/{
private static final Logger logger = LoggerFactory.getLogger(MkDiscountCardServiceImpl.class);
@Autowired
private ApplicationContext context;//上下文,用于发布事件
@Override
@Transactional
public Long createDiscountCardAct(CreateMkDiscountCardDto dto) {
/**.....
省略一堆逻辑
....*/
//使用 spring 观察者模式 异步处理
MkDiscountCardDto cardDto = BeanUtils.copyProps(entity, MkDiscountCardDto.class);
TicketCodeEvent event = new TicketCodeEvent("",cardDto);//构建事件
this.context.publishEvent(event); //发布事件
return id;
}
上一篇: 如何以最佳方式使用 java8 异步调用
下一篇: Java 异步调用同步方法示例详情
推荐阅读
-
SpringBoot 异步事件实现 异步(ApplicationEventPublisher、ApplicationEvent)
-
SpringBoot > 启动异步事件监听器机制
-
异步编程RxJava-介绍-前言 前段时间写了一篇对协程的一些理解,里面提到了不管是协程还是callback,本质上其实提供的是一种异步无阻塞的编程模式;并且介绍了java中对异步无阻赛这种编程模式的支持,主要提到了Future和CompletableFuture;之后有同学在下面留言提到了RxJava,刚好最近在看微服务设计这本书,里面提到了响应式扩展(Reactive extensions,Rx),而RxJava是Rx在JVM上的实现,所有打算对RxJava进一步了解。 RxJava简介 RxJava的官网地址:https://github.com/ReactiveX/RxJava, 其中对RxJava进行了一句话描述:RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM. 大意就是:一个在Java VM上使用可观测的序列来组成异步的、基于事件的程序的库。 更详细的说明在Netflix技术博客的一篇文章中描述了RxJava的主要特点: 1.易于并发从而更好的利用服务器的能力。 2.易于有条件的异步执行。 3.一种更好的方式来避免回调地狱。 4.一种响应式方法。 与CompletableFuture对比 之前提到CompletableFuture真正的实现了异步的编程模式,一个比较常见的使用场景: CompletableFuture<Integer> future = CompletableFuture.supplyAsync(耗时函数); Future<Integer> f = future.whenComplete((v, e) -> { System.out.println(v); System.out.println(e); }); System.out.println("other..."); 下面用一个简单的例子来看一下RxJava是如何实现异步的编程模式: Observable<Long> observable = Observable.just(1, 2) .subscribeOn(Schedulers.io).map(new Func1<Integer, Long> { @Override public Long call(Integer t) { try { Thread.sleep(1000); //耗时的操作 } catch (InterruptedException e) { e.printStackTrace; } return (long) (t * 2); } }); observable.subscribe(new Subscriber<Long> { @Override public void onCompleted { System.out.println("onCompleted"); } @Override public void onError(Throwable e) { System.out.println("error" + e); } @Override public void onNext(Long result) { System.out.println("result = " + result); } }); System.out.println("other..."); Func1中以异步的方式执行了一个耗时的操作,Subscriber(观察者)被订阅到Observable(被观察者)中,当耗时操作执行完会回调Subscriber中的onNext方法。 其中的异步方式是在subscribeOn(Schedulers.io)中指定的,Schedulers.io可以理解为每次执行耗时操作都启动一个新的线程。 结构上其实和CompletableFuture很像,都是异步的执行一个耗时的操作,然后在有结果的时候主动告诉我结果。那我们还需要RxJava干嘛,不知道你有没有注意,上面的例子中其实提供2条数据流[1,2],并且处理完任何一个都会主动告诉我,当然这只是它其中的一项功能,RxJava还有很多好用的功能,在下面的内容会进行介绍。 异步观察者模式 上面这段代码有没有发现特别像设计模式中的:观察者模式;首先提供一个被观察者Observable,然后把观察者Subscriber添加到了被观察者列表中; RxJava中一共提供了四种角色:Observable、Observer、Subscriber、Subjects Observables和Subjects是两个被观察者,Observers和Subscribers是观察者; 当然我们也可以查看一下源码,看一下jdk中的Observer和RxJava的Observer jdk中的Observer: public interface Observer { void update(Observable o, Object arg); } RxJava的Observer: public interface Observer<T> { void onCompleted; void onError(Throwable e); void onNext(T t); } 同时可以发现Subscriber是implements Observer的: public abstract class Subscriber<T> implements Observer<T>, Subscription 可以发现RxJava中在Observer中引入了2个新的方法:onCompleted和onError onCompleted:即通知观察者Observable没有更多的数据,事件队列完结 onError:在事件处理过程中出异常时,onError会被触发,同时队列自动终止,不允许再有事件发出。 正是因为RxJava提供了同步和异步两种方式进行事件的处理,个人觉得异步的方式更能体现RxJava的价值,所以这里给他命名为异步观察者模式。 好了,下面正式介绍RxJava的那些灵活的操作符,这里仅仅是简单的介绍和简单的实例,具体用在什么场景下,会在以后的文章中介绍 Maven引入
-
SpringBoot 事件监听机制源代码分析(上) SpringBoot 源代码(九)
-
基于 springboot 实现定时任务,并添加事件事件处理机制
-
Spring 事件同步和异步以及实现机制
-
2.Zookeeper集成springboot操作节点、事件监听器,实现分布式锁定
-
弹簧事件监听器机制-尾部
-
标题:一文搞定Redis面试,附Redis面试大纲+常见Redis面试题-一、基础篇 快速上手 ①. 什么是redis ②. 为什么使用redis ③. 安装 ④. 基本使用(常见数据结构的命令) Java操作redis ①. Jedis ②. SpringBoot 启动redis的方式 ①. 配置文件 ②. 生产环境启动方案 二、进阶篇 redis实现session共享 redis缓存的使用 ①. 注解式 ②. Spring Cache 数据库和缓存双写一致性问题——穿透 redis实现附近的人 redis实现计数器 redis事务 redis分布式锁的使用 redis集群 redis实现延时队列 redis实现限流 redis实现布隆过滤器 发布订阅 redis优化 三、原理篇 redis单线程为什么性能好 数据类型的底层实现 持久化机制 过期策略 内存淘汰 redis优化 哨兵模
-
UVM设计模式详解(七):命令行为、三种序列启动策略、起止事件元素、中介机制以及虚拟序列