Java 通过时间梯度实现异步回调接口
1. 背景
在业务处理完之后,需要调用其他系统的接口,将相应的处理结果通知给对方,若是同步请求,假如调用的系统出现异常或是宕机等事件,会导致自身业务受到影响,事务会一直阻塞,数据库连接不够用等异常现象,可以通过异步回调来防止阻塞,但异步的情况还存在一个问题,若调用一次不成功的话接下来怎么处理?这个地方就需要按时间梯度回调,比如前期按10s间隔回调,回调3次,若不成功按30s回调,回调2次,再不成功按分钟回调,依次类推……相当于给了对方系统恢复的时间,不可能一直处于异常或宕机等异常状态,若是再不成功可以再通过人工干预的手段去处理了,具体业务具体实现。
2. 技术实现
大体实现思路如下图,此过程用到两个队列,当前队列和Next队列,当前队列用来存放第一次需要回调的数据对象,如果调用不成功则放入Next队列,按照制定的时间策略再继续回调,直到成功或最终持久化后人工接入处理。
用到的技术如下:
•http请求库,retrofit2
•队列,LinkedBlockingQueue
•调度线程池,ScheduledExecutorService
3. 主要代码说明
3.1 回调时间梯度的策略设计
采用枚举来对策略规则进行处理,便于代码上的维护,该枚举设计三个参数,级别、回调间隔、回调次数;
/** * 回调策略 */ public enum CallbackType { //等级1,10s执行3次 SECONDS_10(1, 10, 3), //等级2,30s执行2次 SECONDS_30(2, 30, 2), //等级3,60s执行2次 MINUTE_1(3, 60, 2), //等级4,5min执行1次 MINUTE_5(4, 300, 1), //等级5,30min执行1次 MINUTE_30(5, 30*60, 1), //等级6,1h执行2次 HOUR_1(6, 60*60, 1), //等级7,3h执行2次 HOUR_3(7, 60*60*3, 1), //等级8,6h执行2次 HOUR_6(8, 60*60*6, 1); //级别 private int level; //回调间隔时间 秒 private int intervalTime; //回调次数 private int count; }
3.2 数据传输对象设计
声明抽象父类,便于其他对象调用传输继承。
/** * 消息对象父类 */ public abstract class MessageInfo { //开始时间 private long startTime; //更新时间 private long updateTime; //是否回调成功 private boolean isSuccess=false; //回调次数 private int count=0; //回调策略 private CallbackType callbackType; }
要传输的对象,继承消息父类;
/** * 工单回调信息 */ public class WorkOrderMessage extends MessageInfo { //车架号 private String vin; //工单号 private String workorderno; //工单状态 private Integer status; //工单原因 private String reason; //操作用户 private Integer userid; }
3.3 调度线程池的使用
//声明线程池,大小为16 private ScheduledExecutorService pool = Executors.newScheduledThreadPool(16); ...略 while (true){ //从队列获取数据,交给定时器执行 try { WorkOrderMessage message = MessageQueue.getMessageFromNext(); long excueTime = message.getUpdateTime()+message.getCallbackType().getIntervalTime()* 1000; long t = excueTime - System.currentTimeMillis(); if (t/1000 < 5) {//5s之内将要执行的数据提交给调度线程池 System.out.println("MessageHandleNext-满足定时器执行条件"+JSONObject.toJSONString(message)); pool.schedule(new Callable<Boolean>() { @Override public Boolean call() throws Exception { remoteCallback(message); return true; } }, t, TimeUnit.MILLISECONDS); }else { MessageQueue.putMessageToNext(message); } } catch (InterruptedException e) { System.out.println(e); } }
3.4 retrofit2的使用,方便好用。
具体可查看官网相关文档进行了解,用起来还是比较方便的。http://square.github.io/retrofit/
retrofit初始化:
import retrofit2.Retrofit; import retrofit2.converter.gson.GsonConverterFactory; public class RetrofitHelper { private static final String HTTP_URL = "http://baidu.com/"; private static Retrofit retrofit; public static Retrofit instance(){ if (retrofit == null){ retrofit = new Retrofit.Builder() .baseUrl(HTTP_URL) .addConverterFactory(GsonConverterFactory.create()) .build(); } return retrofit; } }
如果需要修改超时时间,连接时间等可以这样初始话,Retrofit采用OkHttpClient
import okhttp3.OkHttpClient; import retrofit2.Retrofit; import retrofit2.converter.gson.GsonConverterFactory; import java.util.concurrent.TimeUnit; public class RetrofitHelper { private static final String HTTP_URL = "http://baidu.com/"; private static Retrofit retrofit; public static Retrofit instance(){ if (retrofit == null){ retrofit = new Retrofit.Builder() .baseUrl(HTTP_URL) .client(new OkHttpClient.Builder() .connectTimeout(30, TimeUnit.SECONDS)//连接时间 .readTimeout(30, TimeUnit.SECONDS)//读时间 .writeTimeout(30, TimeUnit.SECONDS)//写时间 .build()) .addConverterFactory(GsonConverterFactory.create()) .build(); } return retrofit; } }
Retrofit使用通过接口调用,要先声明一个接口;
import com.alibaba.fastjson.JSONObject; import com.woasis.callbackdemo.bean.WorkOrderMessage; import retrofit2.Call; import retrofit2.http.Body; import retrofit2.http.POST; public interface WorkOrderMessageInterface { @POST("/api") Call<JSONObject> updateBatteryInfo(@Body WorkOrderMessage message); }
接口和实例对象准备好了,接下来就是调用;
private void remoteCallback(WorkOrderMessage message){ //实例接口对象 WorkOrderMessageInterface workOrderMessageInterface = RetrofitHelper.instance().create(WorkOrderMessageInterface.class); //调用接口方法 Call<JSONObject> objectCall = workOrderMessageInterface.updateBatteryInfo(message); System.out.println("远程调用执行:"+new Date()); //异步调用执行 objectCall.enqueue(new Callback<JSONObject>() { @Override public void onResponse(Call<JSONObject> call, Response<JSONObject> response) { System.out.println("MessageHandleNext****调用成功"+Thread.currentThread().getId()); message.setSuccess(true); System.out.println("MessageHandleNext-回调成功"+JSONObject.toJSONString(message)); } @Override public void onFailure(Call<JSONObject> call, Throwable throwable) { System.out.println("MessageHandleNext++++调用失败"+Thread.currentThread().getId()); //失败后再将数据放入队列 try { //对回调策略初始化 long currentTime = System.currentTimeMillis(); message.setUpdateTime(currentTime); message.setSuccess(false); CallbackType callbackType = message.getCallbackType(); //获取等级 int level = CallbackType.getLevel(callbackType); //获取次数 int count = CallbackType.getCount(callbackType); //如果等级已经最高,则不再回调 if (CallbackType.HOUR_6.getLevel() == callbackType.getLevel() && count == message.getCount()){ System.out.println("MessageHandleNext-等级最高,不再回调, 线下处理:"+JSONObject.toJSONString(message)); }else { //看count是否最大,count次数最大则增加level if (message.getCount()<callbackType.getCount()){ message.setCount(message.getCount()+1); }else {//如果不小,则增加level message.setCount(1); level += 1; message.setCallbackType(CallbackType.getTypeByLevel(level)); } MessageQueue.putMessageToNext(message); } } catch (InterruptedException e) { e.printStackTrace(); System.out.println("MessageHandleNext-放入队列数据失败"); } } }); }
3.5结果实现
4.总结
本次实现了按照时间梯度去相应其他系统的接口,不再导致本身业务因其他系统的异常而阻塞。
源码:https://github.com/liuzwei/callback-demo
以上所述是小编给大家介绍的Java按时间梯度实现异步回调接口,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对网站的支持!
推荐阅读
-
异步回调的 Java 实现
-
Java 通过时间梯度实现异步回调接口
-
异步回调的 Java 实现
-
异步编程RxJava-介绍-前言 前段时间写了一篇对协程的一些理解,里面提到了不管是协程还是callback,本质上其实提供的是一种异步无阻塞的编程模式;并且介绍了java中对异步无阻赛这种编程模式的支持,主要提到了Future和CompletableFuture;之后有同学在下面留言提到了RxJava,刚好最近在看微服务设计这本书,里面提到了响应式扩展(Reactive extensions,Rx),而RxJava是Rx在JVM上的实现,所有打算对RxJava进一步了解。 RxJava简介 RxJava的官网地址:https://github.com/ReactiveX/RxJava, 其中对RxJava进行了一句话描述:RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM. 大意就是:一个在Java VM上使用可观测的序列来组成异步的、基于事件的程序的库。 更详细的说明在Netflix技术博客的一篇文章中描述了RxJava的主要特点: 1.易于并发从而更好的利用服务器的能力。 2.易于有条件的异步执行。 3.一种更好的方式来避免回调地狱。 4.一种响应式方法。 与CompletableFuture对比 之前提到CompletableFuture真正的实现了异步的编程模式,一个比较常见的使用场景: CompletableFuture<Integer> future = CompletableFuture.supplyAsync(耗时函数); Future<Integer> f = future.whenComplete((v, e) -> { System.out.println(v); System.out.println(e); }); System.out.println("other..."); 下面用一个简单的例子来看一下RxJava是如何实现异步的编程模式: Observable<Long> observable = Observable.just(1, 2) .subscribeOn(Schedulers.io).map(new Func1<Integer, Long> { @Override public Long call(Integer t) { try { Thread.sleep(1000); //耗时的操作 } catch (InterruptedException e) { e.printStackTrace; } return (long) (t * 2); } }); observable.subscribe(new Subscriber<Long> { @Override public void onCompleted { System.out.println("onCompleted"); } @Override public void onError(Throwable e) { System.out.println("error" + e); } @Override public void onNext(Long result) { System.out.println("result = " + result); } }); System.out.println("other..."); Func1中以异步的方式执行了一个耗时的操作,Subscriber(观察者)被订阅到Observable(被观察者)中,当耗时操作执行完会回调Subscriber中的onNext方法。 其中的异步方式是在subscribeOn(Schedulers.io)中指定的,Schedulers.io可以理解为每次执行耗时操作都启动一个新的线程。 结构上其实和CompletableFuture很像,都是异步的执行一个耗时的操作,然后在有结果的时候主动告诉我结果。那我们还需要RxJava干嘛,不知道你有没有注意,上面的例子中其实提供2条数据流[1,2],并且处理完任何一个都会主动告诉我,当然这只是它其中的一项功能,RxJava还有很多好用的功能,在下面的内容会进行介绍。 异步观察者模式 上面这段代码有没有发现特别像设计模式中的:观察者模式;首先提供一个被观察者Observable,然后把观察者Subscriber添加到了被观察者列表中; RxJava中一共提供了四种角色:Observable、Observer、Subscriber、Subjects Observables和Subjects是两个被观察者,Observers和Subscribers是观察者; 当然我们也可以查看一下源码,看一下jdk中的Observer和RxJava的Observer jdk中的Observer: public interface Observer { void update(Observable o, Object arg); } RxJava的Observer: public interface Observer<T> { void onCompleted; void onError(Throwable e); void onNext(T t); } 同时可以发现Subscriber是implements Observer的: public abstract class Subscriber<T> implements Observer<T>, Subscription 可以发现RxJava中在Observer中引入了2个新的方法:onCompleted和onError onCompleted:即通知观察者Observable没有更多的数据,事件队列完结 onError:在事件处理过程中出异常时,onError会被触发,同时队列自动终止,不允许再有事件发出。 正是因为RxJava提供了同步和异步两种方式进行事件的处理,个人觉得异步的方式更能体现RxJava的价值,所以这里给他命名为异步观察者模式。 好了,下面正式介绍RxJava的那些灵活的操作符,这里仅仅是简单的介绍和简单的实例,具体用在什么场景下,会在以后的文章中介绍 Maven引入
-
实现 Java 异步回调
-
测试事物 (I) - 同步和异步 - 同步:当函数调用启动时,调用不会返回或调用者不会继续执行后续操作,直到获得该调用的结果。异步:异步与同步相反,当发出异步过程调用时,调用者可以继续执行后续操作,直到调用者得到结果。调用完成后,通常会通过状态(轮询)、通知(消息)和回调通知调用者。对于异步调用,调用的返回不受调用者的控制。 从上述定义中我们可以看出,同步和异步可以由调用者或被调用者控制。我们暂且将调用者视为客户端,而被调用者视为服务器。 在客户端调用服务器接口后,如果客户端需要等待服务器返回结果才能进行下一步,那么它就是同步的。如果在进行下一步之前不需要等待服务器返回结果,那么它就是同步的。如果在进行下一步之前不需要等待服务器的结果,那么它就是异步的。 服务器本身既可以控制同步,也可以控制异步。对于需要长时间计算的函数,服务器会将其设置为异步,在客户端发出请求后立即向客户端返回结果,这没有任何实际意义,只是表示服务器收到了请求。对于很快就能得到结果的请求,使用同步就很好,返回的结果包含了得到这个请求所需的数据。 那么,同步测试和异步测试需要注意什么呢? 1. 用户体验 现在很多 APP 都有审核的功能,需要用户提供各种信息来完成相应的认证。这些审核可以是自动的,也可以是手动的等。自动审核一般比较快,因为它需要用户提供各种信息来完成相应的认证。自动审核一般比较快,因为需要比对的数据都存在服务器端,直接进行程序比对即可。人工审核则比较慢,快的几分钟,如果赶上周末,2-3 天都不一定。因此,对于自动审核和人工审核,需要合理利用同步和异步,达到最佳的用户体验。 自动审核速度快,用户提交信息后稍微等待一下就应该能得到审核结果,最多在客户端等待不超过 5 秒,看到加载等待一会儿就可以了。在这种情况下,使用同步就非常合适了,客户端没有得到结果,等到服务器端把审核结果传回来马上显示在客户端上,用户的体验会非常好,毕竟很快就得到了答案。 人工审核时,我们不能寄希望于总有一个人一直在那里审核,即使有,快的话也要几分钟,毕竟核对数据什么的都是人工的。用户在这种情况下不能等,但也不能总在这个页面上等。于是异步就派上用场了,客户端发出审核请求,服务器收到后马上告诉客户端我收到了,但不可能给出结果。客户端不管有没有收到服务器的请求,都会告诉用户等待 XX 时间才能查看结果,用户不需要在这个页面上一直等待。直到服务器端计算完成,然后通过推送消息通知客户端,或者用户再次进入身份验证页面进行身份验证结果请求时,才会得知自己的身份验证结果。 进行测试时,如果遇到长时间无法获取结果的界面,建议客户端使用异步请求,以减少用户体验不佳的情况!
-
对话NGC蔡岩:从机制创新到价值沉淀,解析DeFi产品开发逻辑 |链捕手 - 真正的DeFi产品首先要有足够的安全性和稳定性,如果能在此基础上有一些功能创新,那就非常好了。像 Uniswap 这样逐渐成为 DeFi 基础架构的产品,可遇而不可求。 链式捕手:固定利率协议之前关注度比较高,但观察下来发现,大部分协议还是类似于传统金融CDO(抵押债务凭证)的玩法,风险系数很高,您如何理解这块业务的价值和风险? 蔡岩:确实有些定息协议类似CDO玩法,背后绑定一个债券,但并不是所有的定息协议都是这样的玩法,像这种CDO玩法的主要代表项目是88mph,背后绑定的是Aave、Compoud这样的借贷协议,在此基础上做定息和浮息债券;像APWine,背后同样是Aave,它会发行期货收益代币来锁定你的收益;Notional本身是做借贷市场的,在此基础上做定息协议。 非 CDO 的玩法,比如 Horizon,更像是一个利率撮合器,背后需要用户通过拍卖产生更合适的目标收益率;像 Saffron、BarnBridge 等是通过风险分级来定义不同的收益率。总的来说,创新还是挺多的。 价值层面是创新和想象力,因为在传统金融领域,比如银行做固定收益证券,或者评级机构给风险分级,这些业务都非常大,利润也很丰厚。而 DeFi 的对口业务给了类似业务很大的想象空间。尤其是固定利率协议的成熟产品不多,尝试各种微创新是很有意义的。 风险程度还是要具体到不同的玩法,比如,在 Aave、Compoud 等借贷协议的固定利率协议背后,如果这些借贷协议受到攻击,与之绑定的固定利率协议也会受损。 同样,如果自己做借贷市场,可能更需要更强的开发能力。再有,如果该程序的机制或参数设计不当,同样会导致协议运行不稳定,并可能造成大量用户清盘。 总的来说,风险在于固定利率协议的设计,这是一个非常复杂的过程,需要不断地尝试和出错。 链式捕捉器:刚刚提到背后是Aave/Compound的固定费率协议风险较大,您认为Aave最大的不确定性和创新点分在哪里? 蔡岩:其实爱钱进一直被认为是走在行业前列的项目,他们的迭代速度非常快,比如率先尝试闪贷、推出新的经济激励模式、推出目前业内首个安全模块、尝试L2解决方案等等。 而在主要的借贷业务上,他们又十分谨慎,比如在抵押率、清算系数等风险参数的设计上相对于其他借贷协议较为保守,并不会存在为了吸引更多借贷资金而降低风险的要求。 与许多 DeFi 项目一样,即使 Aave 进行了多次审计,也无法保证不存在漏洞。前段时间,Aave 刚进入 V2 阶段时,白帽黑客就指出了某个漏洞。 之前的创新点可能是闪电借贷,这是当时业内独一无二的新产品功能,也为 Aave 带来了不少收益。当然,也有人批评闪电贷只能方便黑客实现资金效益的最大化,但工具本身并没有错,未来闪电贷肯定会有更多的应用场景。 其次是安全模块的设计,这有点像项目本身的储备金库,保障项目的安全性,这也是爱维开创的先河。说实话,目前大多数项目都没有做到代币模式的良性或正向运营,也做不到像Aave一样的安全模块,这是一个不小的门槛。 Chaincatcher从某种程度上来说,挖矿模式是DeFi财富效应的根本支撑,但Aave的CEO却说挖矿机制带来的动力是不可持续的,您怎么看这个观点? 蔡岩:"挖矿机制 "不可能失效,因为它是一种激励机制,或者说是项目冷启动的一种方式。但流动性开采亚博体育手机客户端不会一直高涨。比如去年11月的流行性挖矿高APY持续了一两个月就崩盘了,导致DeFi市场大幅回调。 Aave、Uniswap、Synthetix等项目真正爆发进入市值前15名也是在今年2月,我更倾向于这是头部DeFi长期价值的体现。虽然大家都喜欢抢高APY的矿机,但我个人很少参与挖矿,所以我并不觉得流动性挖矿是DeFi的基本面支撑。
-
玩转Java底层:JMX详解 - jconsole与自定义MBean监控工具的实际应用与区别" 在日常JVM调优中,我们熟知的jconsole工具通过JMX包装的bean以图形化形式展示管理数据,而像jstat和jmap这类内建监控工具则由JVM直接支持。本文将以jconsole为例,深入讲解其实质——基于JMX的MBean功能,包括可视化界面上的bean属性查看和操作调用。 MBeans在jconsole中的体现是那些可观察的组件属性和方法,如上图所示,通过名为"Verbose"的属性能看到其值为false,同时还能直接操作该bean的方法,例如"closeJerryMBean"。 尽管jconsole给我们提供了直观的可视化界面,但请注意,这里的MBean并非固定不变,开发者可根据JMX提供的接口将自己的自定义bean展示到jconsole。以下步骤展示了如何创建并注册一个名为"StudyJavaMBean"的自定义MBean: 1. 首先定义接口`StudyJavaMBean`,接口需遵循MBean规范,即后缀为"MBean"且包含getter方法代表属性,如`getApplicationName`,和无返回值的setter方法代表操作,如`closeJerryMBean`。 ```java public interface StudyJavaMBean { String getApplicationName(); void closeJerryMBean(); } ``` 2. 编写接口的实现类`StudyJavaMBeanImpl`,实现接口中的方法: ```java public class StudyJavaMBeanImpl implements StudyJavaMBean { @Override public String getApplicationName() { return "每天学Java"; } @Override public void closeJerryMBean() { System.out.println("关闭Jerry应用"); } } ``` 3. 在代码中注册自定义MBean,涉及的关键步骤包括: - 获取平台MBeanServer - 定义ObjectName,指定唯一的MBean标识符 - 注册MBean到服务器 - 启动RMI连接器服务,以便jconsole能够访问 ```java public void registerMBean() throws Exception { // ... 具体实现省略 ... } ``` 实际运行注册后的MBean,您将在jconsole中发现并查看自定义bean的属性和调用相关方法。然而,这种方式相较于传统的属性/日志查看和HTTP接口,实用性相对有限,可能存在潜在的安全风险。但不可否认的是,JMX及其MBean机制对于获取操作系统信息、内存状态等关键性能指标仍然具有重要价值。例如: 1. **获取操作系统信息**:通过JMX MBean,可以直接获取到诸如CPU使用率、操作系统版本等系统级信息,这对于资源管理和优化工作具有显著帮助。