Java] CompletableFuture 异步任务协调CompletableFuture 异步任务协调
最编程
2024-07-16 14:25:52
...
参考视频资料:
https://www.bilibili.com/video/BV1nA411g7d2 https://www.bilibili.com/video/BV1S54y1u79K
一、启动一个异步任务
runAsync 简单开启一个独立的线程,异步完成一个任务:
runAsync不会返回结果
public class AsyncThreadTest { @SneakyThrows private static String getThreadName() { return Thread.currentThread().getName(); } @SneakyThrows private static void awaitSec() { Thread.sleep(1000L); } @SneakyThrows private static void awaitSec(int sec) { Thread.sleep(sec * 1000L); } /** * 表示有两件事情需要同时执行 * 例如 开发一个功能 前端和后端同时进行 */ @Test @SneakyThrows public void demo01() { System.out.println("开会讨论需求"); // 前端开发前台页面 CompletableFuture<Void> async = CompletableFuture.runAsync(() -> { int times = 6; // 假设前端开发耗时为6 while (times != 0) { System.out.println(getThreadName() + " 前端开发前台页面... 剩余时间 " + times); awaitSec(); --times; } }); int times = 3; // 假设后端开发耗时为3 while (times != 0) { System.out.println(getThreadName() +" 后端开发后台接口... 剩余时间 " + times); awaitSec(); -- times; } // join 等待该任务完成后才能继续下一步,如果不等待,主线程任务跑完了就会不等异步任务直接结束程序 async.join(); System.out.println("功能开发完成!"); } }
supplyAsync 和runAsync的区别就是能返回任务的结果:
阻塞主线程等待任务完成的方法有join和get,join和get都会返回任务的结果(阻塞主线程)
区别是join会封装异常处理,get要求手动处理异常
/** * 如果需要知晓异步任务的结果,设置返回值返回 */ @Test @SneakyThrows public void demo02() { System.out.println("开会讨论需求"); // 前端开发前台页面 CompletableFuture<String> async = CompletableFuture.supplyAsync(() -> { int times = 6; // 假设前端开发耗时为6 while (times != 0) { System.out.println(getThreadName() + " 前端开发前台页面... 剩余时间 " + times); awaitSec(); --times; } return "编写的操作手册"; }); int times = 3; // 假设后端开发耗时为3 while (times != 0) { System.out.println(getThreadName() + " 后端开发后台接口... 剩余时间 " + times); Thread.sleep(1000L); -- times; } // join 等待该任务完成后才能继续下一步 (和get一样 get要求处理异常, join不要求) String res = async.join(); // get方法本身自带join 阻塞执行 String res2 = async.get(); System.out.println("功能开发完成!" + res + " " + res2); }
二、布置多个异步任务
任务链上的结果类型必须一致,不可以一会thenRun一会thenApply这样调用
thenCompose方法,在上一个任务完成后再执行入参的异步任务
要求入参一个CompletionStage的子类实例
/** * thenCompose方法,当之前一个任务完成之后再开始执行方法内的异步任务 * 连接上一个任务 */ @Test @SneakyThrows public void demo03() { System.out.println("开会讨论需求"); // 前端开发前台页面 CompletableFuture<String> async = CompletableFuture.supplyAsync(() -> { int times = 6; // 假设前端开发耗时为6 while (times != 0) { System.out.println(getThreadName() + " 前端开发前台页面... 剩余时间 " + times); awaitSec(); --times; } return "页面组件编写完成!"; }).thenCompose(before -> CompletableFuture.supplyAsync(() -> { int times = 3; // 假设前端开发耗时为 while (times != 0) { System.out.println(getThreadName() + " 前端开发联调接口... 剩余时间 " + times); awaitSec(); --times; } return "接口联调完成!" + before; })); int times = 3; // 假设后端开发耗时为3 while (times != 0) { System.out.println(getThreadName() + " 后端开发后台接口... 剩余时间 " + times); awaitSec(); -- times; } String res = async.join(); // String res2 = async.get(); // System.out.println("功能开发完成!" + res + " " + res2); System.out.println("功能开发完成!" + res); }
thenApply方法只需要你提供任务逻辑即可
thenApply沿用上一个任务的线程,给当前任务使用
/** * thenApply 方法,当之前一个任务完成之后再开始执行方法内的异步任务 * 连接上一个任务 * 和thenCompose不一样,不需要返回CompleteStage实现,默认放置了,只需要逻辑 */ @Test @SneakyThrows public void demo06() { System.out.println("开会讨论需求"); // 前端开发前台页面 CompletableFuture<String> async = CompletableFuture.supplyAsync(() -> { int times = 6; // 假设前端开发耗时为6 while (times != 0) { System.out.println(getThreadName() + " 前端开发前台页面... 剩余时间 " + times); awaitSec(); --times; } return "页面组件编写完成!"; }).thenApply(before -> { int times = 3; // 假设前端开发耗时为 while (times != 0) { System.out.println(getThreadName() + " 前端开发联调接口... 剩余时间 " + times); awaitSec(); --times; } return "接口联调完成!" + before; }); int times = 3; // 假设后端开发耗时为3 while (times != 0) { System.out.println(getThreadName() + " 后端开发后台接口... 剩余时间 " + times); awaitSec(); -- times; } String res = async.join(); // String res2 = async.get(); // System.out.println("功能开发完成!" + res + " " + res2); System.out.println("功能开发完成!" + res); }
thenApplyAsync方法在自定义线程池入参时,可能开一个新线程来执行
/** * thenApplyAsync 方法, 使用自定义线程池入参时,会新开一个线程执行任务 * */ @Test @SneakyThrows public void demo07() { System.out.println("开会讨论需求"); // 前端开发前台页面 CompletableFuture<String> async = CompletableFuture.supplyAsync(() -> { int times = 6; // 假设前端开发耗时为6 while (times != 0) { System.out.println(getThreadName() + " 前端开发前台页面... 剩余时间 " + times); awaitSec(); --times; } return "页面组件编写完成!"; }).thenApplyAsync(before -> { int times = 3; // 假设前端开发耗时为 while (times != 0) { System.out.println(getThreadName() + " 前端开发联调接口... 剩余时间 " + times); awaitSec(); --times; } return "接口联调完成!" + before; }); int times = 3; // 假设后端开发耗时为3 while (times != 0) { System.out.println(getThreadName() + " 后端开发后台接口... 剩余时间 " + times); awaitSec(); -- times; } String res = async.join(); // String res2 = async.get(); // System.out.println("功能开发完成!" + res + " " + res2); System.out.println("功能开发完成!" + res); }
thenRun 不知晓任务结果,不返回处理结果
/** * thenRun 将不会返回结果,对任务完成之后执行一些你想做的事情 * (不会把返回结果传入) */ @Test public void demo12() { System.out.println("周一上班"); CompletableFuture<Void> empty = CompletableFuture.supplyAsync(() -> { int times = 6; // 假设前端开发耗时为6 while (times != 0) { System.out.println(getThreadName() + " 前端开发前台页面... 剩余时间 " + times); awaitSec(); --times; } return "页面组件编写完成!"; }).thenRun(() -> { // 上一个任务完成后调用 System.out.println("今天的任务完成"); System.out.println("提前下班回家"); }); empty.join(); }
thenAccept 对上一个任务完成的通知
传入上一个任务结果,不会返回这个处理结果
/** * thenAccept 将不会返回结果,对任务完成之后执行一些你想做的事情 * (会把返回结果传入) */ @Test public void demo11() { System.out.println("周一上班"); CompletableFuture<Void> empty = CompletableFuture.supplyAsync(() -> { int times = 6; // 假设前端开发耗时为6 while (times != 0) { System.out.println(getThreadName() + " 前端开发前台页面... 剩余时间 " + times); awaitSec(); --times; } return "页面组件编写完成!"; }).thenAccept(result -> { // 上一个任务完成后调用 System.out.println("今天的任务完成:" + result); System.out.println("提前下班回家"); }); empty.join(); }
thenCombine 方法整合两个异步任务,合并任务结果处理返回
/** * thenCombine方法,追加一个新的异步任务,和之前的任务同时启动,两个任务都执行完成后回调一个合并方法,返回结果 * 合并两个任务 异步的线程使用同一个 */ @Test @SneakyThrows public void demo05() { System.out.println("开会讨论需求"); // 前端开发前台页面 现在需要两个前端来开发页面 CompletableFuture<String> async = CompletableFuture.supplyAsync(() -> { int times = 3; while (times != 0) { System.out.println("前端开发1前台页面... 剩余时间 " + times); awaitSec(); --times; } return "页面组件1编写完成!"; }).thenCombine(CompletableFuture.supplyAsync(() -> { int times = 3; while (times != 0) { System.out.println("前端开发2前台页面... 剩余时间 " + times); awaitSec(); --times; } return "页面组件2编写完成"; }), (a, b) -> { System.out.println(a); System.out.println(b); System.out.println("先摸会儿鱼再说"); return "页面完成"; }); int times = 3; // 假设后端开发耗时为3 while (times != 0) { System.out.println("后端开发后台接口... 剩余时间 " + times); awaitSec(); -- times; } String res = async.join(); // String res2 = async.get(); // System.out.println("功能开发完成!" + res + " " + res2); System.out.println("功能开发完成!" + res); }
三、多任务结果处理
applyToEither方法,取最先完成任务的结果处理
/** * applyToEither 同时执行两个异步任务,取最先完成的任务的结果返回,附带一个处理方法 * */ @Test @SneakyThrows public void demo08() { System.out.println("开会讨论需求"); // 周一上班 两种情况 CompletableFuture<String> async = CompletableFuture.supplyAsync(() -> { awaitSec(); awaitSec(); System.out.println("老板没来,摸鱼!"); return "今天我tm摸爆"; }).applyToEither(CompletableFuture.supplyAsync(() -> { awaitSec(); System.out.println("老板来了,干活!"); return "今天我被老板爆叼"; }), toDay -> "今天的结果是:" + toDay); String res = async.join(); System.out.println(res); }
acceptEither,消费最先完成任务的结果(不返回结果)
/** * acceptEither 同时执行两个异步任务,取最先完成的任务的结果返回,附带一个处理方法 * (applyToEither 会返回处理结果,acceptEither不处理结果返回,纯消费) * */ @Test @SneakyThrows public void demo17() { System.out.println("开会讨论需求"); // 周一上班 两种情况 CompletableFuture<Void> async = CompletableFuture.supplyAsync(() -> { awaitSec(); awaitSec(); System.out.println("老板没来,摸鱼!"); return "今天我tm摸爆"; }).acceptEither(CompletableFuture.supplyAsync(() -> { awaitSec(); System.out.println("老板来了,干活!"); return "今天我被老板爆叼"; }), System.out::println); async.join(); }
runAfterEither 通知有个任务最先完成了
/** * runAfterEither 同时执行两个异步任务,不关心哪个任务最快,只是最快的任务执行完后通知你要做点什么 * 没有返回值也没有入参 */ @Test @SneakyThrows public void demo18() { System.out.println("开会讨论需求"); // 周一上班 两种情况 CompletableFuture<Void> async = CompletableFuture.supplyAsync(() -> { awaitSec(); awaitSec(); System.out.println("老板没来,摸鱼!"); return "今天我tm摸爆"; }).runAfterEither(CompletableFuture.supplyAsync(() -> { awaitSec(); System.out.println("老板来了,干活!"); return "今天我被老板爆叼"; }), () -> System.out.println("ssss") ); async.join(); }
allOf方法,异步组任务完成后通知需要做什么
/** * allOf 所有置入的异步任务全部完成后调用此任务 * (allOf的任务不接受所有的任务的结果),必须要提取任务引用,一个个手动join获取 * 或者封装到集合容器遍历join */ @Test public void demo15() { CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> { awaitSec(3); return "Future1的结果"; }); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> { awaitSec(4); return "Future2的结果"; }); CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> { awaitSec(5); return "Future3的结果"; }); CompletableFuture<String>[] futures = new CompletableFuture[]{future1, future2, future3}; // step 4: 使用allOf方法合并多个异步任务 CompletableFuture<Void> allOfFuture = CompletableFuture.allOf(futures); // step 5: 当多个异步任务都完成后,使用回调操作文件结果,统计符合条件的文件个数 CompletableFuture<String> countFuture = allOfFuture.thenApply(v -> Arrays.stream(futures).map(f -> f.join()).collect(Collectors.joining())); // step 6: 主线程打印输出文件个数 System.out.println("count = " + countFuture.join()); }
anyOf 方法, 任务组中最先完成的任务后通知需要做什么
/** * anyOf 所有置入的异步任务中,获取最快完成任务的结果 * */ @Test public void demo14() { CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> { awaitSec(3); return "Future1的结果"; }); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> { awaitSec(4); return "Future2的结果"; }); CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> { awaitSec(5); return "Future3的结果"; }); CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(future1, future2, future3); // 输出Future2的结果 System.out.println(anyOfFuture.join()); }
四、任务异常捕获
exceptionally方法捕获任务链上的异常
(不捕获任务链异常,任务直接中断执行,无法查看在哪一层任务的异常)
/** * exceptionally 在上面的任务中出现异常进入该方法,并将这个异常传入 * (异步任务链上的所有异常都会被 exceptionally 捕获) 一般放在在最后声明异常的处理 * */ @Test @SneakyThrows public void demo09() { System.out.println("开会讨论需求"); // 周一上班 两种情况 CompletableFuture<String> async = CompletableFuture.supplyAsync(() -> { awaitSec(); awaitSec(); System.out.println("老板没来,摸鱼!"); return "今天我tm摸爆"; }).exceptionally(e -> { // 这里记录错误日志 return "老板高兴直接宣布今天休息!直接下班"; }).applyToEither(CompletableFuture.supplyAsync(() -> { awaitSec(); System.out.println("老板来了,干活!"); if (true) throw new RuntimeException(); return "今天我被老板爆叼"; }), toDay -> "今天的结果是:" + toDay).exceptionally(e -> { // 这里记录错误日志 return "老板高兴直接宣布今天休息!直接下班"; }); String res = async.join(); System.out.println(res); }
handle方法 捕获上个任务中可能出现的异常,保证这个任务的执行不会中断
/** * handle方法,在任务链中处理前一次的异常,不会报错,继续传递结果给下一个任务 */ @Test public void demo16() { CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { int r = 1 / 0; return "result1"; }).handle((ret, ex) -> { if (ex != null) { System.out.println("我们得到异常:" + ex.getMessage()); return "Unknown1"; } return ret; }).thenApply(result -> { String str = null; int len = str.length(); return result + " result2"; }).handle((ret, ex) -> { if (ex != null) { System.out.println("我们得到异常:" + ex.getMessage()); return "Unknown2"; } return ret; }).thenApply(result -> { return result + " result3"; }); String ret = future.join(); }
原文地址:https://www.cnblogs.com/mindzone/p/17236680.html
上一篇: Java 如何实现异步(通过子线程)
下一篇: java 工具-高并发概念
推荐阅读
-
JAVA 项目中的异步任务
-
深入分析 CompletableFuture:Java 异步世界的奇迹
-
Java 异步编程 Future, CompletableFuture
-
如何使用 Java 异步编程(多线程 CompletableFuture)
-
java 异步任务实现思路简述
-
用 Java 实现异步任务 可终止、可中断和可重续函数
-
并发编程|从 Future 到 CompletableFuture - 简化 Java 中的异步编程
-
Java 异步编程 2 CompletableFuture
-
Java 异步任务调度--CompletableFuture (II)
-
Java 异步实现四种方法--CompletableFuture