SpringBoot 异步事件实现 异步(ApplicationEventPublisher、ApplicationEvent)
最编程
2024-07-16 15:27:41
...
SpringBoot 异步事件实现异步(ApplicationEventPublisher、ApplicationEvent)
当把一个事件发布到Spring提供的ApplicationContext
中,被监听器侦测到,就会执行对应的处理方法。
实现步骤:
- 自定义发布的事件类,需要继承 ApplicationEvent 或者PayloadApplicationEvent(该类也仅仅是对ApplicationEvent的一层封装)
- 监听事件 使用注解 @EventListener 或者 自定义监听器在主函数添加监听器
- s使用ApplicationEventPublisher 或者 实现其接口的子类 发布自定义事件 (@Autowired注入即可)
一、事件本身 ApplicationEvent
事件是一个自定义的类,需要继承Spring提供的ApplicationEvent
。
import com.alibaba.fastjson.JSONObject;
import org.springframework.context.ApplicationEvent;
/**
* 自定义 事件
* @author 王泽华
* 2019/10/22 14:59
*/
public class EsSaveEvent extends ApplicationEvent {
private JSONObject data;
public EsSaveEvent(JSONObject source) {
super(source);
this.data = source;
}
public JSONObject getData() {
return data;
}
}
二、事件监听
1、自定义监听器
基本方法是实现ApplicationListener
接口,自定义一个监听器,实现onApplicationEvent()
方法,然后添加到ApplicationContext
。
import org.springframework.context.ApplicationListener;
public class MyListener implements ApplicationListener<EsSaveEvent> {
@Override
public void onApplicationEvent(EsSaveEvent event) {
//事件处理
System.out.print("监听到MyEvent事件");
}
}
注意:需要在springboot 启动类添加监听器,如下所示:
public static void main(String[] args) {
SpringApplication application = new SpringApplication(MyApplication.class);
SpringBoot的启动类中添加监听器
application.addListeners(new MyListener());
application.run(args);
}
2、使用注解@EventListener
(推荐):
原理就是通过扫描这个注解,创建监听器并添加到ApplicationContext
。
import com.alibaba.fastjson.JSONObject;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import java.util.Arrays;
import java.util.List;
/**
* @author :王泽华
* @Title: EventListener
* @date 2019/10/2214:18
*/
@Component
public class EventListener {
//监听事件
@org.springframework.context.event.EventListener
public void listenEvent(EsSaveEvent event) {
//可以添加事务处理
//divide(event);
JSONObject object = event.getData();
//对数据进行处理
System.out.println("data:"+object.toJSONString());
}
}
//在监听器中重新开一个事务(可选)
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void divide(EsSaveEvent event) {
System.out.println("事务处理");
}
}
三、事件发布
事件发布可以用springboot内部类 也可以自定义
1、直接注入 ApplicationEventPublisher
使用 ApplicationEventPublisher,注入ApplicationEventPublisher接口,调用publisher.publishEvent(new EsSaveEvent(data)) 发布事件。
@Autowired
private ApplicationEventPublisher publisher;
public test(){
JSONObject data=new JSONObject();
publisher.publishEvent(new EsSaveEvent(data));
}
2、自定义事件发布类(继承ApplicationEventPublisher)
A、实现 ApplicationEventPublisher
import com.alibaba.fastjson.JSONObject;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.stereotype.Component;
@Component
public class EventService implements ApplicationEventPublisherAware {
public ApplicationEventPublisher publisher;
@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.publisher = applicationEventPublisher;
}
public String doEventWork(String msg) {
JSONObject data=new JSONObject();
data.put("message","------------publish event:" + msg);
EsSaveEvent event = new EsSaveEvent(data);
publisher.publishEvent(event);
return "OK";
}
}
B、测试
@SpringBootTest
@RunWith(SpringRunner.class)
public class EventServiceTest {
@Autowired
private EventService service;
@Test
public void eventTest() {
String msg="Java Code";
service.doEventWork(msg);
}
}
四、注意事项:
1、如果2个事件之间是继承关系,会先监听到子类事件,处理完再监听父类。
2、监听器方法一定要try-catchy异常,否则会造成发布事件(有事务的)的方法进行回滚
3、可以使用@Order注解控制多个监听器的执行顺序,@Order 传入的值越小,执行顺序越高
推荐阅读
-
SpringBoot 异步事件实现 异步(ApplicationEventPublisher、ApplicationEvent)
-
SpringBoot 与 Kafka 集成,实现百万级数据异步处理,实践介绍!
-
SpringBoot > 启动异步事件监听器机制
-
异步编程RxJava-介绍-前言 前段时间写了一篇对协程的一些理解,里面提到了不管是协程还是callback,本质上其实提供的是一种异步无阻塞的编程模式;并且介绍了java中对异步无阻赛这种编程模式的支持,主要提到了Future和CompletableFuture;之后有同学在下面留言提到了RxJava,刚好最近在看微服务设计这本书,里面提到了响应式扩展(Reactive extensions,Rx),而RxJava是Rx在JVM上的实现,所有打算对RxJava进一步了解。 RxJava简介 RxJava的官网地址:https://github.com/ReactiveX/RxJava, 其中对RxJava进行了一句话描述:RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM. 大意就是:一个在Java VM上使用可观测的序列来组成异步的、基于事件的程序的库。 更详细的说明在Netflix技术博客的一篇文章中描述了RxJava的主要特点: 1.易于并发从而更好的利用服务器的能力。 2.易于有条件的异步执行。 3.一种更好的方式来避免回调地狱。 4.一种响应式方法。 与CompletableFuture对比 之前提到CompletableFuture真正的实现了异步的编程模式,一个比较常见的使用场景: CompletableFuture<Integer> future = CompletableFuture.supplyAsync(耗时函数); Future<Integer> f = future.whenComplete((v, e) -> { System.out.println(v); System.out.println(e); }); System.out.println("other..."); 下面用一个简单的例子来看一下RxJava是如何实现异步的编程模式: Observable<Long> observable = Observable.just(1, 2) .subscribeOn(Schedulers.io).map(new Func1<Integer, Long> { @Override public Long call(Integer t) { try { Thread.sleep(1000); //耗时的操作 } catch (InterruptedException e) { e.printStackTrace; } return (long) (t * 2); } }); observable.subscribe(new Subscriber<Long> { @Override public void onCompleted { System.out.println("onCompleted"); } @Override public void onError(Throwable e) { System.out.println("error" + e); } @Override public void onNext(Long result) { System.out.println("result = " + result); } }); System.out.println("other..."); Func1中以异步的方式执行了一个耗时的操作,Subscriber(观察者)被订阅到Observable(被观察者)中,当耗时操作执行完会回调Subscriber中的onNext方法。 其中的异步方式是在subscribeOn(Schedulers.io)中指定的,Schedulers.io可以理解为每次执行耗时操作都启动一个新的线程。 结构上其实和CompletableFuture很像,都是异步的执行一个耗时的操作,然后在有结果的时候主动告诉我结果。那我们还需要RxJava干嘛,不知道你有没有注意,上面的例子中其实提供2条数据流[1,2],并且处理完任何一个都会主动告诉我,当然这只是它其中的一项功能,RxJava还有很多好用的功能,在下面的内容会进行介绍。 异步观察者模式 上面这段代码有没有发现特别像设计模式中的:观察者模式;首先提供一个被观察者Observable,然后把观察者Subscriber添加到了被观察者列表中; RxJava中一共提供了四种角色:Observable、Observer、Subscriber、Subjects Observables和Subjects是两个被观察者,Observers和Subscribers是观察者; 当然我们也可以查看一下源码,看一下jdk中的Observer和RxJava的Observer jdk中的Observer: public interface Observer { void update(Observable o, Object arg); } RxJava的Observer: public interface Observer<T> { void onCompleted; void onError(Throwable e); void onNext(T t); } 同时可以发现Subscriber是implements Observer的: public abstract class Subscriber<T> implements Observer<T>, Subscription 可以发现RxJava中在Observer中引入了2个新的方法:onCompleted和onError onCompleted:即通知观察者Observable没有更多的数据,事件队列完结 onError:在事件处理过程中出异常时,onError会被触发,同时队列自动终止,不允许再有事件发出。 正是因为RxJava提供了同步和异步两种方式进行事件的处理,个人觉得异步的方式更能体现RxJava的价值,所以这里给他命名为异步观察者模式。 好了,下面正式介绍RxJava的那些灵活的操作符,这里仅仅是简单的介绍和简单的实例,具体用在什么场景下,会在以后的文章中介绍 Maven引入
-
Spring 事件同步和异步以及实现机制
-
SpringBoot 与 MQTT 集成并实现异步线程调用