欢迎您访问 最编程 本站为您分享编程语言代码,编程技术文章!
您现在的位置是: 首页

Java 异步任务协调 - CompletableFuture (II)

最编程 2024-07-16 14:51:24
...
【直播预告】程序员逆袭 CEO 分几步?

Java异步任务编排—CompletableFuture(二)

CompletableFuture API
  • 默认情况下CompletableFuture会使用公共的ForkJoinPool线程池,这个线程池默认创建的线程数是 CPU 的核数(也可以通过 JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism 来设置ForkJoinPool线程池的线程数)。如果所有CompletableFuture共享一个线程池,那么一旦有任务执行一些很慢的 I/O 操作,就会导致线程池中所有线程都阻塞在 I/O 操作上,从而造成线程饥饿,进而影响整个系统的性能。所以,强烈建议你要根据不同的业务类型创建不同的线程池,以避免互相干扰

  • 方法不以Async结尾,意味着Action使用相同的线程执行,而Async可能会使用其它的线程去执行(如果使用相同的线程池,也可能会被同一个线程选中执行),如果以Async结尾,却又没自定义线程池,则还是使用公共的ForkJoinPool线程池,

1 创建异步任务 API

CompletableFuture创建异步任务,一般有supplyAsync和runAsync两个方法:

  • supplyAsync执行CompletableFuture任务,支持返回值。
  • runAsync执行CompletableFuture任务,没有返回值。

举个栗子:

public static void main(String[] args) {
    //可以自定义线程池
    ExecutorService executor = Executors.newCachedThreadPool();
    //runAsync的使用
    CompletableFuture<Void> runFuture = CompletableFuture.runAsync(() -> System.out.println("runAsync,为了部落"), executor);
    //supplyAsync的使用
    CompletableFuture<String> supplyFuture = CompletableFuture.supplyAsync(() -> {
                System.out.print("supplyAsync,为了联盟");
                return "哈哈哈哈哈"; }, executor);
    //runAsync的future没有返回值,输出null
    System.out.println(runFuture.join());
    //supplyAsync的future,有返回值
    System.out.println(supplyFuture.join());
    executor.shutdown(); // 线程池需要关闭
}



//输出
runAsync,为了部落
null
supplyAsync,为了联盟哈哈哈哈哈

2 依赖关系

  • thenRun():不关心上一个任务的执行结果,无传参,无返回值

    做完第一个任务后,再做第二个任务,但是前后两个任务没有参数传递,第二个任务也没有返回值

    CompletableFuture<String> completableFuture   = CompletableFuture.supplyAsync(() -> "Hello");
    CompletableFuture<Void> future = completableFuture.thenRun(() -> System.out.println("Computation finished."));
     
    future.get();
    
  • thenApply():依赖上一个任务的结果,把前面任务的执行结果,交给后面的Function,有返回值

      第一个任务执行完成后,执行第二个回调方法任务,会将第一个任务的执行结果,作为入参,传递到回调方法中,并且回调方法是有返回值的。
    

    CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "Hello");
    CompletableFuture<String> future = completableFuture.thenApply(s -> s + " World");
    assertEquals("Hello World", future.get());

  • thenAccept(): 依赖上一个任务的结果,把前面任务的执行结果,交给后面的Function,无返回值

    第一个任务执行完成后,执行第二个回调方法任务,会将该任务的执行结果,作为入参,传递到回调方法中,但是回调方法是没有返回值的

    CompletableFuture<String> completableFuture= CompletableFuture.supplyAsync(() -> "Hello");
    CompletableFuture<Void> future = completableFuture.thenAccept(s -> System.out.println("Computation returned: " + s));
     
    future.get();
    
  • thenCompose():用来连接两个有依赖关系的任务,结果由第二个任务返回

    在第一个任务执行完成后,将该任务的执行结果,作为方法入参,去执行指定的方法。该方法会返回一个新的CompletableFuture实例

    CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "Hello");
    CompletableFuture<String> future   = completableFuture.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World"));
     
    assertEquals("Hello World", future.get());
    

3 组合关系

  • and集合关系
    • thenCombine():执行两个独立的任务,并对其结果执行某些操作,有返回值
    • thenAccepetBoth():两个任务执行完成后,将结果交给thenAccepetBoth处理,无返回值
    • runAfterBoth():两个任务都执行完成后,执行下一步操作(Runnable类型任务),不会把执行结果当做方法入参,且没有返回值
CompletableFuture<String> cfA = CompletableFuture.supplyAsync(() -> "resultA");
CompletableFuture<String> cfB = CompletableFuture.supplyAsync(() -> "resultB");
//想要使用两个Future结果时,但不需要将任何结果值进行返回时,可以用 thenAcceptBoth,它表示后续的处理不需要返回值,而 thenCombine 表示需要返回值
cfA.thenCombine(cfB, (resultA, resultB) -> "result A + B");
cfA.thenAcceptBoth(cfB, (resultA, resultB) -> {});
  • 聚合关系
    • applyToEither():两个任务哪个执行的快,就使用哪一个结果,有返回值
    • acceptEither():两个任务哪个执行的快,就消费哪一个结果,无返回值
    • runAfterEither():不会把执行结果当做方法入参,且没有返回值

applyToEither / acceptEither / runAfterEither 都表示将两个CompletableFuture组合起来,只要其中一个执行完了,就会执行某个任务

//第一个异步任务,休眠2秒,保证它执行晚点
        CompletableFuture<String> first = CompletableFuture.supplyAsync(()->{
            try{
                Thread.sleep(2000L);
                System.out.println("执行完第一个异步任务");}
            catch (Exception e){
                return "第一个任务异常";
            }
            return "第一个异步任务";
        });

        //第二个异步任务
        CompletableFuture<String> second = CompletableFuture.supplyAsync(() -> {
                            System.out.println("执行完第二个任务");
                            return "第一个任务还在睡觉,这是第二个任务";}
                        );
                
        CompletableFuture acceptEither =  second.acceptEitherAsync(first, result ->System.out.println(result+"==acceptEither"));
        CompletableFuture applyToEither = second.applyToEitherAsync(first,result->{
            System.out.println(result+"==applyToEither");
            return result;
        });
        CompletableFuture runAfterEither =  second.runAfterEitherAsync(first, () ->System.out.println("hello"));
  • 并行执行
    • allOf():当所有给定的 CompletableFuture 完成时,返回一个新的 CompletableFuture
CompletableFuture<String> future1    = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2   = CompletableFuture.supplyAsync(() -> "Beautiful");
CompletableFuture<String> future3   = CompletableFuture.supplyAsync(() -> "World");
 
CompletableFuture<Void> combinedFuture  = CompletableFuture.allOf(future1, future2, future3);
 
combinedFuture.get();
 
assertTrue(future1.isDone());
assertTrue(future2.isDone());
assertTrue(future3.isDone());

allOf局限性在于它不会返回所有任务的综合结果。相反,你必须手动从Futures获取结果。幸运的是,CompletableFuture.join()方法和Java 8 Streams API可以解决:

String combined = Stream.of(future1, future2, future3).map(CompletableFuture::join).collect(Collectors.joining(" "));
 
assertEquals("Hello Beautiful World", combined);
  • anyOf():当任何一个给定的CompletablFuture完成时,返回一个新的CompletableFuture,如果执行的任务异常,anyOfCompletableFuture,执行get方法,会抛出异常
 CompletableFuture<String> future1    = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return "Hello";
            }
        );

        CompletableFuture<String> future3   = CompletableFuture.supplyAsync(() -> "World");
        CompletableFuture<Object> combinedFuture  = CompletableFuture.anyOf(future1, future3);

        System.out.println(combinedFuture.get());

        System.out.println(future1.get());
        System.out.println(future3.get());
        
        //结果
        World
        Hello
        World

4 结果处理 异常捕获

  • whenComplete:当任务完成时,将使用结果(或 null)和此阶段的异常(或 null如果没有)执行给定操作,无返回值;并且whenComplete方法返回的CompletableFutureresult是上个任务的结果
     CompletableFuture<String> future1    = CompletableFuture.supplyAsync(() -> "Hello");
     CompletableFuture<String> future3   = future1.whenComplete((a, throwable) -> {
            System.out.println("上个任务执行完啦,还把" + a + "传过来");
        });

    System.out.println(future3.get());
  • whenComplete:当任务完成时,将使用结果(或 null)和此阶段的异常(或 null如果没有)执行给定操作,有返回值;并且whenComplete方法返回的CompletableFutureresult是回调方法的结果
    CompletableFuture<String> future1    = CompletableFuture.supplyAsync(() -> "Hello");
    CompletableFuture<String> future3   = future1.handle((a, throwable) -> {
            System.out.println("上个任务执行完啦,还把" + a + "传过来");
            return "world";
        });

   System.out.println(future3.get());
  • exceptionally:某个任务执行异常时,执行的回调方法,并且会把抛出的异常作为参数,传递到回调方法
CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(
                ()->{
                    System.out.println("当前线程名称:" + Thread.currentThread().getName());
                    throw new RuntimeException();
                }
        );

        CompletableFuture<String> exceptionFuture = orgFuture.exceptionally((e) -> {
            e.printStackTrace();
            return "歪歪歪?你的程序异常啦";
        });

        System.out.println(exceptionFuture.get());

5 超时处理

JDK 8 版本的CompletableFuture 没有timeout机制,timeout机制是指,如果forkjoin-pool(或者自定义线程池)中一个线程在规定时间内没有返回,那么就结束掉,而不是继续执行直到获取结果,比如main线程200ms内返回,但forkjoin-pool(或者自定义线程池)中某个执行线程执行400ms才返回,而其返回值根本没有被使用到。

实现方案:启动一个 ScheduledThreadpoolExecutor 线程在 timeout 时间后直接调用 CompletableFuture.completeExceptionally(new TimeoutException()),然后用 acceptEither() 或者 applyToEither 看是先计算完成还是先超时:

public class FutureUtil {

    /**
     * cpu 核心数
     */
    private static final int AVALIABLE_PROCESSORS = Runtime.getRuntime().availableProcessors();

    // 最大超时时间
    private static final int TIMEOUT_VALUE = 1500;
    // 时间单位
    private static final TimeUnit TIMEOUT_UNIT = TimeUnit.MILLISECONDS;


    /**
     * Singleton delay scheduler, used only for starting and * cancelling tasks.
     */
    public static final class Delayer {

        static final ScheduledThreadPoolExecutor delayer;

        /**
         * 异常线程,不做请求处理,只抛出异常
         */
        static {
            delayer = new ScheduledThreadPoolExecutor(1, new DaemonThreadFactory());
            delayer.setRemoveOnCancelPolicy(true);
        }

        static ScheduledFuture<?> delay(Runnable command, long delay, TimeUnit unit) {
            return delayer.schedule(command, delay, unit);
        }

        static final class DaemonThreadFactory implements ThreadFactory {
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setDaemon(true);
                t.setName("CompletableFutureScheduler");
                return t;
            }
        }
    }

    /**
     * 根据服务器cpu自定义线程池
     */
    private static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
            AVALIABLE_PROCESSORS,
            3 * AVALIABLE_PROCESSORS,
            3,
            TimeUnit.SECONDS,
            new LinkedBlockingDeque<>(20),
            new ThreadPoolExecutor.CallerRunsPolicy()
    );

    /**
     * 有返回值的异步
     * @param supplier
     * @param <T>
     * @return
     */
    public static  <T> CompletableFuture<T> supplyAsync(Supplier<T> supplier){
        return supplyAsync(TIMEOUT_VALUE,TIMEOUT_UNIT,supplier);
    }

    /**
     * 有返回值的异步 - 可设置超时时间
     * @param timeout
     * @param unit
     * @param supplier
     * @param <T>
     * @return
     */
    public static  <T> CompletableFuture<T> supplyAsync(long timeout, TimeUnit unit,Supplier<T> supplier){
        return CompletableFuture.supplyAsync(supplier, threadPoolExecutor)
                .applyToEither(timeoutAfter(timeout,unit), Function.identity())
                .exceptionally(throwable -> {
                    throwable.printStackTrace();
                    log.error(throwable.getMessage());
                    return null;
                });
    }

    /**
     * 无返回值的异步
     * @param runnable
     * @return
     */
    public static CompletableFuture runAsync(Runnable runnable){
        return runAsync(TIMEOUT_VALUE,TIMEOUT_UNIT,runnable);
    }

    /**
     * 无返回值的异步 - 可设置超时时间
     * @param runnable
     * @return
     */
    public static CompletableFuture runAsync(long timeout, TimeUnit unit,Runnable runnable){
        return CompletableFuture.runAsync(runnable,threadPoolExecutor)
                .applyToEither(timeoutAfter(timeout,unit), Function.identity())
                .exceptionally(throwable -> {
                    throwable.printStackTrace();
                    log.error(throwable.getMessage());
                    return null;
                });
    }

    /**
     * 统一处理异步结果
     * @param futures
     * @return
     */
    public static CompletableFuture allOf(CompletableFuture... futures){
        return allOf(TIMEOUT_VALUE,TIMEOUT_UNIT,futures);
    }

    /**
     * 统一处理异步结果 - 可设置超时时间
     * @param futures
     * @return
     */
    public static CompletableFuture allOf(long timeout, TimeUnit unit,CompletableFuture... futures){
        return CompletableFuture.allOf(futures)
                .applyToEither(timeoutAfter(timeout,unit), Function.identity())
                .exceptionally(throwable -> {
                    throwable.printStackTrace();
                    log.error(throwable.getMessage());
                    return null;
                });
    }

    /**
     * 异步超时处理
     * @param timeout
     * @param unit
     * @param <T>
     * @return
     */
    public static <T> CompletableFuture<T> timeoutAfter(long timeout, TimeUnit unit) {
        CompletableFuture<T> result = new CompletableFuture<T>();
        // timeout 时间后 抛出TimeoutException 类似于sentinel / watcher
        Delayer.delayer.schedule(() -> result.completeExceptionally(new TimeoutException()), timeout, unit);
        return result;
    }

    public static <T> CompletableFuture<T> timeoutAfter() {
        CompletableFuture<T> result = new CompletableFuture<T>();
        // timeout 时间后 抛出TimeoutException 类似于sentinel / watcher
        Delayer.delayer.schedule(() -> result.completeExceptionally(new TimeoutException()), TIMEOUT_VALUE, TIMEOUT_UNIT);
        return result;
    }

}

使用demo

 CompletableFuture<String> future1    = FutureUtil.supplyAsync(10,TimeUnit.MILLISECONDS,() -> {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return "Hello";
        });

        CompletableFuture<String> future3   = future1.handle((a, throwable) -> {
            System.out.println("上个任务执行完啦,还把" + a + "传过来");
            return "world";
        });

        System.out.println(future3.get());
 
 
image.png

在 JDK 9,CompletableFuture 正式提供了 orTimeoutcompleteTimeout 方法,来准确实现异步超时控制。实现原理跟上面是一样的。

6 线程阻塞问题

要合理治理线程资源,最基本的前提条件就是要在写代码时,清楚地知道每一行代码都将执行在哪个线程上。下面我们看一下CompletableFuture的执行线程情况。

CompletableFuture实现了CompletionStage接口,通过丰富的回调方法,支持各种组合操作,每种组合场景都有同步和异步两种方法。

同步方法(即不带Async后缀的方法)有两种情况。

  • 如果注册时被依赖的操作已经执行完成,则直接由当前线程执行。
  • 如果注册时被依赖的操作还未执行完,则由回调线程执行。

异步方法(即带Async后缀的方法):

  • 可以选择是否传递线程池参数Executor运行在指定线程池中;
  • 当不传递Executor时,会使用ForkJoinPool中的共用线程池CommonPool(CommonPool的大小是CPU核数-1,如果是IO密集的应用,线程数可能成为瓶颈)。

例如:

ExecutorService threadPool1 = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(100));
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
    System.out.println("supplyAsync 执行线程:" + Thread.currentThread().getName());
    //业务操作
    return "";
}, threadPool1);
//此时,如果future1中的业务操作已经执行完毕并返回,则该thenApply直接由当前main线程执行;否则,将会由执行以上业务操作的threadPool1中的线程执行。
future1.thenApply(value -> {
    System.out.println("thenApply 执行线程:" + Thread.currentThread().getName());
    return value + "1";
});
//使用ForkJoinPool中的共用线程池CommonPool
future1.thenApplyAsync(value -> {
//do something
  return value + "1";
});
//使用指定线程池
future1.thenApplyAsync(value -> {
//do something
  return value + "1";
}, threadPool1);

7 线程池死锁问题

前面提到,异步回调方法可以选择是否传递线程池参数Executor,这里我们建议强制传线程池,且根据实际情况做线程池隔离。

当不传递线程池时,会使用ForkJoinPool中的公共线程池CommonPool,这里所有调用将共用该线程池,核心线程数=处理器数量-1(单核核心线程数为1),所有异步回调都会共用该CommonPool,核心与非核心业务都竞争同一个池中的线程,很容易成为系统瓶颈。手动传递线程池参数可以更方便的调节参数,并且可以给不同的业务分配不同的线程池,以求资源隔离,减少不同业务之间的相互干扰。

线程池循环引用会导致死锁

public Object doGet() {
  ExecutorService threadPool1 = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(100));
  CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> {
  //do sth
    return CompletableFuture.supplyAsync(() -> {
        System.out.println("child");
        return "child";
      }, threadPool1).join();//子任务
    }, threadPool1);
  return cf1.join();
}

如上代码块所示,doGet方法第三行通过supplyAsync向threadPool1请求线程,并且内部子任务又向threadPool1请求线程。threadPool1大小为10,当同一时刻有10个请求到达,则threadPool1被打满,子任务请求线程时进入阻塞队列排队,但是父任务的完成又依赖于子任务,这时由于子任务得不到线程,父任务无法完成。主线程执行cf1.join()进入阻塞状态,并且永远无法恢复。

为了修复该问题,需要将父任务与子任务做线程池隔离,两个任务请求不同的线程池,避免循环依赖导致的阻塞。

8 异步RPC调用注意不要阻塞IO线程池

服务异步化后很多步骤都会依赖于异步RPC调用的结果,这时需要特别注意一点,如果是使用基于NIO(比如Netty)的异步RPC,则返回结果是由IO线程负责设置的,即回调方法由IO线程触发,CompletableFuture同步回调(如thenApply、thenAccept等无Async后缀的方法)如果依赖的异步RPC调用的返回结果,那么这些同步回调将运行在IO线程上,而整个服务只有一个IO线程池,这时需要保证同步回调中不能有阻塞等耗时过长的逻辑,否则在这些逻辑执行完成前,IO线程将一直被占用,影响整个服务的响应。