JAVA 并发编程系列 (13) Future、FutureTask 异步小王子
美团本地生活面试:模拟外卖订单处理,客户支付提交订单后,查询订单详情,后台需要查询店铺备餐进度、以及外卖员目前位置信息后再返回。
时间好快,一转眼不到一个月时间,已经完成分享synchronized、volatile、CAS、AQS、ReentrantLock、Semaphore、CountDownLatch、CyclicBarrier、并发锁、Condition、线程池、ThreadLocal等多个核心基础原理和案例剖析。
其实我编写文章速度真的很慢,基本每篇文章需要写2~3个小时去梳理。确保基础理论、源码分析、面试案例、优缺点等均分享到位,力求每篇都是干货实用,让每位看到我文章的同学,不管是面试、还是应用到工作实践,都能有所收益。
今天我们围绕Future是什么、怎么用,实践demo来展开分享,实现原理架构来展开。
一、Future是什么
首先我们回到一个问题,就是为什么需要Future、FutureTask?
之前我们用过的线程池ThreadPoolExecutor、线程Thread都可以执行异步任务,但是无法执行带有返回值的异步任务。而Future是可以执行这种带有返回值的异步任务。线程池ThreadPoolExecutor、Thread线程可以通过提交执行Future类型的任务,就可以获取任务返回值。
和Callable、Runnable一样,Future是一个接口。我们看一下它的接口源码。
//Runnable接口,只有一个run方法
@FunctionalInterface
public interface Runnable {
public abstract void run();
}
//Callable接口,只有一个call方法
@FunctionalInterface
public interface Callable<V> {
V call() throws Exception;
}
//本文主角Future有5个方法
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
代码非常少,有5个方法,但是核心方法是get(),还是isDone();
get()方法:用来读取任务的返回结果。此外,如果任务未执行,调用该方法的当前线程会进入阻塞等待。
isDone()方法:检查计算是否已完成。这个方法不会阻塞线程。日常使用Future,一般是先调用isDone()方法判断结果是否返回,然后再调用get()方法获取执行结果。
一句话总结:Future是异步计算任务,提交Future任务后可以继续干别的;等干完别的事,再回来通过get()方法去读取Future任务结果。
再简单总结:Future是支持在未来读取结果的异步计算任务。
cancel()方法:用来尝试取消任务,仅仅是尝试,不一定成功。如果任务已经开始执行,那么它就不能被取消。
isCancelled()方法:就是用来检查这个Future任务是否被取消。
Future和之前分享的信号量Semaphore、CountDownLatch倒数门闩、CyclicBarrier循环屏障都不一样,唯一和Condition条件队列有点像,支持多个线程协调进行。支持异步读取任务结果这个特性,Future可以很方便支持多个任务并发执行,以及在最后汇总获取并发结果,最后返回给终端。
二、应用实践:模拟同时查外卖信息
我们用FutureTask实现一个并发面试题:模拟外卖订单处理,客户支付提交订单后,查询订单详情,后台需要查询店铺备餐进度、以及外卖员目前位置信息后再返回。
package lading.java.mutithread;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
/**
* 模拟外卖订单处理,客户支付提交订单后,查询订单详情,后台
* 通过查询店铺备餐进度、以及外卖员目前位置信息。
*/
public class Demo015Future {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//1、异步查询商家系统任务
FutureTask<Boolean> checkFoodIsReadyTask = new FutureTask<>(() -> {
System.out.println(Thread.currentThread().getName()+"转发请求到商家系统,查询餐厅当前订单备餐进度...");
Thread.sleep(2000);
boolean foodIsOk = true;
System.out.println(Thread.currentThread().getName()+"商家接口返回是否备餐完成,结果是:" + foodIsOk);
return foodIsOk;
});
//2、异步查询外卖员系统任务
FutureTask<Boolean> checkCourierIsReadyTask = new FutureTask<>(() -> {
System.out.println(Thread.currentThread().getName()+"转发请求到外卖员系统,查询外卖员是否已到店...");
Thread.sleep(800);
boolean courierIsOk = true;
System.out.println(Thread.currentThread().getName()+"查询外卖员是否已到店,结果是:" + courierIsOk);
return courierIsOk;
});
//并发查询商家、外卖员情况,大约2s
new Thread(checkFoodIsReadyTask).start();
new Thread(checkCourierIsReadyTask).start();
Thread.sleep(100);
//只是判断是否完成,不会阻塞
if (!checkFoodIsReadyTask.isDone()) {
System.out.println(Thread.currentThread().getName() + "查询店铺备餐进度未完成,继续等等...");
}
if (!checkCourierIsReadyTask.isDone()) {
System.out.println(Thread.currentThread().getName() + "查询外卖员情况未完成,继续等等...");
}
//如果结果没返回会阻塞等待
System.out.println(Thread.currentThread().getName() + "线程 成功查到商家备餐结果:" + checkFoodIsReadyTask.get());
System.out.println(Thread.currentThread().getName() + "线程 成功查到外卖员结果:" + checkCourierIsReadyTask.get());
}
}
运行结果:
三、硬核干活:FutureTask的实现原理源码分析
我们从它的属性开始,然后讲实现的方法原理。
3.1 FutureTask的属性
FutureTask 的源码也不多,属性就这5个。
线程状态state、执行任务的callable,任务执行的返回结果outCome,正在运行的线程runner,等待队列里的waiters节点。
3.1.1 当前任务线程状态state,以及状态枚举
这七种任务状态之间相互转换关系:
1、正常结束
NEW -> COMPLETING -> NORMAL
2、异常结束
NEW -> COMPLETING -> EXCEPTIONAL
3、任务被取消
NEW -> CANCELLED
4、任务出现中断
NEW -> INTERRUPTING -> INTERRUPTED
//当前任务线程的状态,以下几个枚举值都是state不同状态
private volatile int state;
//当前任务线程刚创建状态
private static final int NEW = 0;
//当前任务线程即将完成,ing是一个即将完成状态
private static final int COMPLETING = 1;
//表示当前任务线程正常结束的状态
private static final int NORMAL = 2;
//表示当前任务线程有异常
private static final int EXCEPTIONAL = 3;
//表示当前任务线程被取消
private static final int CANCELLED = 4;
//表示当前线程已经被打了中断标识
private static final int INTERRUPTING = 5;
//表示当前线程已经被中断
private static final int INTERRUPTED = 6;
3.2 其他属性意义
//Callable 来执行任务,Callable 有返回值的线程
private Callable<V> callable;
//Callable任务执行的返回结果
private Object outcome;
//当前正在运行的线程
private volatile Thread runner;
//等待队列节点,WaitNode是FutureTask的内部类
private volatile WaitNode waiters;
waiters等待队列:这个是单向队列,里面是等待本任务执行结果的线程。比如ABCD四个线程,其中A线程执行了任务,B、C、D线程,都等A线程的返回结果,就需要在waiters等待队列里等着。
//等待队列源码
static final class WaitNode {
//被阻塞的线程,也就
volatile Thread thread;
//这里看出它就是个单向队列
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}
3.3 run()方法原理
run方法比较简单,不放源码,直接说,就是执行定义的callable任务,任务执行完成后,通过CAS去更新outCome返回值。
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
3.4 get()方法原理
这个方法和run()一样重要,其他线程想要获取本任务结果,都是通过get方法读取。逻辑也是很简单。其中waitDone()方法,就是进入等待等列等结果。
public V get() throws InterruptedException, ExecutionException {
int s = state;
//1、如果任务没执行完成,当前想读取任务结果的线程就进入阻塞等待
if (s <= COMPLETING)
s = awaitDone(false, 0L);
//2、如果任务执行完成,就返回结果
return report(s);
}
3.5 其他方法
//当前任务状态是否被取消,直接读状态值
public boolean isCancelled() {
return state >= CANCELLED;
}
//当前任务状态是否以及执行结束,直接读状态值
public boolean isDone() {
return state != NEW;
}
本系列文章推荐:
1、JAVA并发编程系列(12)ThreadLocal就是这么简单|建议收藏
2、JAVA并发编程系列(11)线程池底层原理架构剖析
上一篇: C++ 中的类型转换
推荐阅读
-
JAVA 并发编程系列 (13) Future、FutureTask 异步小王子
-
并发编程|从 Future 到 CompletableFuture - 简化 Java 中的异步编程
-
异步编程RxJava-介绍-前言 前段时间写了一篇对协程的一些理解,里面提到了不管是协程还是callback,本质上其实提供的是一种异步无阻塞的编程模式;并且介绍了java中对异步无阻赛这种编程模式的支持,主要提到了Future和CompletableFuture;之后有同学在下面留言提到了RxJava,刚好最近在看微服务设计这本书,里面提到了响应式扩展(Reactive extensions,Rx),而RxJava是Rx在JVM上的实现,所有打算对RxJava进一步了解。 RxJava简介 RxJava的官网地址:https://github.com/ReactiveX/RxJava, 其中对RxJava进行了一句话描述:RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM. 大意就是:一个在Java VM上使用可观测的序列来组成异步的、基于事件的程序的库。 更详细的说明在Netflix技术博客的一篇文章中描述了RxJava的主要特点: 1.易于并发从而更好的利用服务器的能力。 2.易于有条件的异步执行。 3.一种更好的方式来避免回调地狱。 4.一种响应式方法。 与CompletableFuture对比 之前提到CompletableFuture真正的实现了异步的编程模式,一个比较常见的使用场景: CompletableFuture<Integer> future = CompletableFuture.supplyAsync(耗时函数); Future<Integer> f = future.whenComplete((v, e) -> { System.out.println(v); System.out.println(e); }); System.out.println("other..."); 下面用一个简单的例子来看一下RxJava是如何实现异步的编程模式: Observable<Long> observable = Observable.just(1, 2) .subscribeOn(Schedulers.io).map(new Func1<Integer, Long> { @Override public Long call(Integer t) { try { Thread.sleep(1000); //耗时的操作 } catch (InterruptedException e) { e.printStackTrace; } return (long) (t * 2); } }); observable.subscribe(new Subscriber<Long> { @Override public void onCompleted { System.out.println("onCompleted"); } @Override public void onError(Throwable e) { System.out.println("error" + e); } @Override public void onNext(Long result) { System.out.println("result = " + result); } }); System.out.println("other..."); Func1中以异步的方式执行了一个耗时的操作,Subscriber(观察者)被订阅到Observable(被观察者)中,当耗时操作执行完会回调Subscriber中的onNext方法。 其中的异步方式是在subscribeOn(Schedulers.io)中指定的,Schedulers.io可以理解为每次执行耗时操作都启动一个新的线程。 结构上其实和CompletableFuture很像,都是异步的执行一个耗时的操作,然后在有结果的时候主动告诉我结果。那我们还需要RxJava干嘛,不知道你有没有注意,上面的例子中其实提供2条数据流[1,2],并且处理完任何一个都会主动告诉我,当然这只是它其中的一项功能,RxJava还有很多好用的功能,在下面的内容会进行介绍。 异步观察者模式 上面这段代码有没有发现特别像设计模式中的:观察者模式;首先提供一个被观察者Observable,然后把观察者Subscriber添加到了被观察者列表中; RxJava中一共提供了四种角色:Observable、Observer、Subscriber、Subjects Observables和Subjects是两个被观察者,Observers和Subscribers是观察者; 当然我们也可以查看一下源码,看一下jdk中的Observer和RxJava的Observer jdk中的Observer: public interface Observer { void update(Observable o, Object arg); } RxJava的Observer: public interface Observer<T> { void onCompleted; void onError(Throwable e); void onNext(T t); } 同时可以发现Subscriber是implements Observer的: public abstract class Subscriber<T> implements Observer<T>, Subscription 可以发现RxJava中在Observer中引入了2个新的方法:onCompleted和onError onCompleted:即通知观察者Observable没有更多的数据,事件队列完结 onError:在事件处理过程中出异常时,onError会被触发,同时队列自动终止,不允许再有事件发出。 正是因为RxJava提供了同步和异步两种方式进行事件的处理,个人觉得异步的方式更能体现RxJava的价值,所以这里给他命名为异步观察者模式。 好了,下面正式介绍RxJava的那些灵活的操作符,这里仅仅是简单的介绍和简单的实例,具体用在什么场景下,会在以后的文章中介绍 Maven引入