文章目录
- 简介
- 继承体系
- 源码分析
- Runnable接口
- Callable接口
- Future接口
- 成员属性
- 构造方法
- 成员方法
- run()
- get()
- cancel()
- 总结
简介
在Java中一般通过继承Thread类或者实现Runnable接口这两种方式来创建多线程,但是这两种方式都有个缺陷,就是不能在执行完成后获取执行的结果,因此Java 1.5之后提供了Callable和Future接口,通过它们就可以在任务执行完毕之后得到任务的执行结果。
继承体系
源码分析
Runnable接口
public interface Runnable {
public abstract void run();
}
可以看到Runnable接口既没有返回值也没有抛出异常
Callable接口
//Runnable是没有返回结果的任务,而Callable则是有返回结果的任务
public interface Callable<V> {
/**
* 有返回结果,并且可能抛出异常
*/
V call() throws Exception;
}
可以看到Callable是个泛型接口,泛型V就是要call()方法返回的类型。Callable接口和Runnable接口很像,都可以被另外一个线程执行,但是,Runnable不会返回数据也不能抛出异常。
Future接口
/**
* 表示异步执行的结果,有三个功能:
* 1.获取异步执行任务的结果
* 2.查看异步任务的执行状态(取消或终止)
* 3.取消异步任务
*/
public interface Future<V> {
/**
* 尝试取消任务,如果任务已经完成或已经取消,则取消失败。
* 1.如果任务没未被启动,则该任务不会被运行;
* 2.如果任务已经被启动,参数mayInterruptIfRunning决定是否执行当前任务的线程是否应该被中断,这只是作为一种终止任务的尝试
* 执行这个方法之后,以后的isDone方法调用都会返回true。
* 如果这个方法返回true,以后的isCancelled方法调用都会返回true。
*/
boolean cancel(boolean mayInterruptIfRunning);
/**
* cancel()调用返回true之后,这个方法会返回true。
*/
boolean isCancelled();
/**
* 如果当前任务执行成功,或者被取消,或者抛出异常,则返回true
*/
boolean isDone();
/**
* 阻塞直到任务完成,并返回任务执行结果。
* 当异步任务被取消,或抛出异常,get()方法会抛出相应的异常
*/
V get() throws InterruptedException, ExecutionException;
/**
* 阻塞一定时间等待任务完成,并返回任务执行结果,超过时间未返回结果会抛出异常
*/
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
- Future只是一个接口,不能直接用来创建对象,FutureTask是Future的实现类
成员属性
/**
* 可能的状态转换:
* NEW -> COMPLETING -> NORMAL 正常完成
* NEW -> COMPLETING -> EXCEPTIONAL 异常完成
* NEW -> CANCELLED 取消
* NEW -> INTERRUPTING -> INTERRUPTED 打断
*/
// 表示当前task的状态
private volatile int state;
// 表示当前任务尚未执行
private static final int NEW = 0;
// 表示当前任务正在结束,尚未完全结束,一种临界状态
private static final int COMPLETING = 1;
// 表示当前任务正常结束
private static final int NORMAL = 2;
// 表示当前任务执行过程中发生了异常。 内部封装的 callable.run() 向上抛出异常了
private static final int EXCEPTIONAL = 3;
// 表示当前任务被取消
private static final int CANCELLED = 4;
// 表示当前任务中断中..
private static final int INTERRUPTING = 5;
// 表示当前任务已中断
private static final int INTERRUPTED = 6;
/** 如果构造FutureTask传入Runnable,则会使用装饰者设计模式伪装成 Callable了 */
private Callable<V> callable;
/** 从get()返回的结果或抛出的异常 */
private Object outcome; // non-volatile, protected by state reads/writes
/** 表示当前任务被线程执行期间,保存当前执行任务的线程对象引用*/
private volatile Thread runner;
/** 因为会有很多线程去get当前任务的结果,所以会有阻塞,这里使用了一种数据结构 stack 头插、头取的一个队列。 */
private volatile WaitNode waiters;
构造方法
public FutureTask(Callable<V> callable) {
// 非空校验
if (callable == null)
throw new NullPointerException();
// callable就是我们自己的任务
this.callable = callable;
// 设置当前任务状态为NEW: 表示当前任务尚未执行
this.state = NEW; // ensure visibility of callable 确保可调用文件的可见性
}
public FutureTask(Runnable runnable, V result) {
// 使用装饰者模式将runnable转换为了callable接口,外部线程通过get获取
// 当前任务执行结束时,结果可能为 null 也可能为传进来的值
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
// callable
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
// RunnableAdapter
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}
- 当参数为Callable时,就直接赋值给callable
- 当参数为Runnable时,就还要传入一个返回结果,并且用装饰者模式把Runnable装饰成Callable
成员方法
RunnableFuture 接口
public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}
- RunnableFuture 接口继承了Runnable和Future接口。而FutureTask重写run方法
run()
- 运行当前任务,其中涉及
setException、set、handlePossibleCancellationInterrupt
public void run() {
/**
* 1、状态如果不是NEW,说明任务或者已经执行过,或者已经被取消,直接返回
* 2、状态如果是NEW,则CAS尝试把当前执行线程保存在runner字段中,如果赋值失败(当前任务被其它线程抢占了)则直接返回,
* 保证callable任务只被运行一次
*/
if (state != NEW ||
!RUNNER.compareAndSet(this, null, Thread.currentThread()))
return;
// 如果执行到这里,说明当前task一定是 NEW 状态,而且当前线程也抢占TASK成功!
try {
// callable 就是我们自己封装逻辑的callable任务 或者装饰后的runnable
Callable<V> c = callable;
// 再次检验、防止空指针异常、防止外部线程 cancel掉当前任务。
if (c != null && state == NEW) {
// 结果的引用
V result;
// true 表示callable.run 代码块执行成功 未抛出异常
// false 表示callable.run 代码块执行失败 抛出异常
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
// 设置异常信息
setException(ex);
}
if (ran)
// 正常运行结束
// 设置正常结束的结果
set(result);
}
} finally {
// 将当前执行任务的线程置为null
runner = null;
// 当前任务的状态
int s = state;
if (s >= INTERRUPTING)
// 说明当前任务处于中断中或者已中断状态
// 让出cpu,不断的判断是否是中断中...
handlePossibleCancellationInterrupt(s);
}
}
- 判断当前任务的state是否等于NEW(任务未执行),如果不为NEW则说明任务或者已经执行过,或者已经被取消,直接返回。
- 如果状态为NEW则接着会通过unsafe类把任务执行线程引用采用CAS保存在runner字段中,如果保存失败,则直接返回。
- 执行任务,设置任务返回结果。
- 如果任务执行发生异常,则调用setException()方法保存异常信息
set
以CAS的方式设置结果v给outcome
protected void set(V v) {
// 使用CAS方式设置当前任务状态为完成中(一种临界状态)
// 也有可能会失败,就是其他线程在本线程CAS的时候,就把task取消了
if (STATE.compareAndSet(this, NEW, COMPLETING)) {
outcome = v;
// 将结果v赋值给outcome之后,马上会将当前任务状态修改为NORMAL(正常结束状态)
STATE.setRelease(this, NORMAL); // final state 最终的状态
// 唤醒之前挂起的线程
finishCompletion();
}
}
finishCompletion
移除并唤醒所有等待线程,执行done,置空callable
private void finishCompletion() {
// 遍历阻塞队列 q指向waiters链表的头结点
for (WaitNode q; (q = waiters) != null;) {
// 使用cas设置 waiters为null
// 为了防止外部线程使用cancel取消当前任务,也会触发finishCompletion方法。(小概率事件)
if (WAITERS.weakCompareAndSet(this, q, null)) {
// 自旋
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
// 唤醒当前节点对应的线程(在awaitDone方法最后一个else判断中park,在此处唤醒)
LockSupport.unpark(t);
}
WaitNode next = q.next;
// next == null 说明是最后一个节点,则直接break即可
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
// 模板方法,可以被覆盖
done();
// 将callable 设置为null helpGC
callable = null; // to reduce footprint
}
setException
以CAS的方式设置异常信息t给outcome
protected void setException(Throwable t) {
if (STATE.compareAndSet(this, NEW, COMPLETING)) {
// 引用的是 callable 向上层抛出来的异常。
outcome = t;
// 将当前任务的状态 修改为 EXCEPTIONAL(发生了异常)
STATE.setRelease(this, EXCEPTIONAL); // final state
finishCompletion();
}
}
handlePossibleCancellationInterrupt
,状态为中断中…则会让出cpu。在cancel里面会有设置为中断中状态
private void handlePossibleCancellationInterrupt(int s) {
// It is possible for our interrupter to stall before getting a
// chance to interrupt us. Let's spin-wait patiently.
if (s == INTERRUPTING)
while (state == INTERRUPTING)
// 如果是中断中,则让出cpu
Thread.yield(); // wait out pending interrupt
// assert state == INTERRUPTED;
// We want to clear any interrupt we may have received from
// cancel(true). However, it is permissible to use interrupts
// as an independent mechanism for a task to communicate with
// its caller, and there is no way to clear only the
// cancellation interrupt.
//
// Thread.interrupted();
}
get()
- 获取当前任务执行结束后得到的结果,其中涉及
awaitDone
、report
public V get() throws InterruptedException, ExecutionException {
int s = state;
// COMPLETING(尚未完全结束,一种临界状态)
if (s <= COMPLETING)
// 说明当前任务还没有结束,当前线程就会被阻塞
// awaitDone执行完后会返回task当前状态,如果该方法执行期间,task被中断了,则会直接抛出中断异常:
// awaitDone是futureTask实现阻塞的关键方法: 等待任务执行完毕,如果任务取消或者超时则停止!
s = awaitDone(false, 0L);
return report(s);
}
- 如果此时任务已经执行完毕则会直接返回任务结果,如果任务还没执行完毕,则调用方会阻塞直到任务执行结束返回结果为止
awaitDone
是futureTask实现阻塞的关键方法
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
// deadline=0 不会超时
final long deadline = timed ? System.nanoTime() + nanos : 0L;
// 引用当前线程封装成 WaitNode对象(头插、头取的一个队列。)
WaitNode q = null;
// 表示当前线程 waitNode对象是否入队/压栈
boolean queued = false;
// 自旋
for (;;) {
// 判断阻塞线程是否被中断,如果被中断则在等待队列中删除该节点并抛出InterruptedException异常
if (Thread.interrupted()) {
// 当前线程节点出队
removeWaiter(q);
// 上抛,使get方法抛出中断异常。
throw new InterruptedException();
}
// 假设当前线程是被其它线程使用unpark(thread) 唤醒的话,会正常自旋,走下面逻辑:
// 获取当前任务最新状态
int s = state;
// 条件成立:说明当前任务已经有结果了.. (可能是正常完成、异常、中断、取消等等)
if (s > COMPLETING) {
// 条件成立:说明已经为当前线程创建过WaitNode了,此时需要将 node.thread = null helpGC
if (q != null)
q.thread = null;
// 直接返回当前状态.
return s;
}
// 条件成立:说明当前任务接近完成状态(表示任务已经结束但是任务执行线程还没来得及给outcome赋值)
// 这里让当前线程释放cpu让其他线程优先执行 ,进行下一次抢占cpu:
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
// 条件成立:第一次自旋,当前线程还未创建 WaitNode 对象,此时为当前线程创建 WaitNode对象,也就是创建一个结点
else if (q == null)
q = new WaitNode();
// 条件成立:第二次自旋,当前线程已经创建 WaitNode对象了,但是node对象还未入队
else if (!queued){
// 当前线程node节点 next 指向原队列的头节点 waiters 一直指向队列的头!
q.next = waiters;
// cas方式设置waiters引用指向当前线程node, 成功的话 queued == true 否则,可能其它线程先你一步入队了。
queued = UNSAFE.compareAndSwapObject(this, waitersOffset, waiters, q);
}
// 第三次自旋,会到这里:表示是否设置了超时时间
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
// 已经超时的话,移除等待节点
removeWaiter(q);
return state;
}
// 未超时,将当前线程挂起指定时间
LockSupport.parkNanos(this, nanos);
}
else
// 走到这里,当前get操作的线程就会被park了。线程状态会变为 WAITING状态,相当于休眠了..
// 除非有其它线程将你唤醒 或者 将当前线程中断。
// 如果当前线程被其他线程唤醒,醒来时,还是从这里向下继续执行(继续进入自旋for进行条件判断)
// (在上面的finishCompletion中会唤醒这个挂起的线程!)
LockSupport.park(this);
}
}
- 判断调用get()的线程是否被其他线程中断,如果是的话则在等待队列中删除对应节点然后抛出InterruptedException异常。
- 获取任务当前状态,如果当前任务状态大于COMPLETING则表示任务执行完成,则把thread字段置null(协助GC)并返回结果。
- 如果任务处于COMPLETING状态,则表示任务已经处理完成(正常执行完成或者执行出现异常),但是执行结果或者异常原因还没有保存到outcome字段中。这个时候调用线程让出执行权让其他线程优先执行。
- 如果等待节点为空,则构造一个等待节点WaitNode。
- 如果第四步中新建的节点还没如队列,则CAS的把该节点加入waiters队列的首节点。
- 阻塞等待。
report
去获取最终task执行结束得到的结果
@SuppressWarnings("unchecked")
private V report(int s) throws ExecutionException {
// 正常情况下,outcome 保存的是callable运行结束的结果
// 非正常情况下,保存的是 callable 抛出的异常。
Object x = outcome;
// 条件成立(正常情况):当前任务状态正常结束
if (s == NORMAL)
// 直接返回callable运算结果
return (V)x;
// 条件成立(非正常情况):当前任务是被取消或中断状态
if (s >= CANCELLED)
// 抛异常!
throw new CancellationException();
// 执行到这,说明callable接口实现中,是有bug的...
throw new ExecutionException((Throwable)x);
}
cancel()
- 将当前线程的任务取消(中断)掉
public boolean cancel(boolean mayInterruptIfRunning) {
// tate == NEW 成立,表示当前任务处于运行中或者处于线程池任务队列中.
// mayInterruptIfRunning为true则修改为中断中..,为false则修改为任务被取消
// 条件成立:说明CAS修改状态成功,可以去执行下面逻辑了,否则返回false,表示cancel失败。
if (!(state == NEW && STATE.compareAndSet
(this, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
// 执行当前FutureTask 的线程,有可能现在是null,是null 的情况是: 当前任务在 队列中,还没有线程获取到它呢
Thread t = runner;
if (t != null)
// 给runner线程一个中断信号
t.interrupt();
} finally { // final state
// 设置任务状态为 中断完成。
STATE.setRelease(this, INTERRUPTED);
}
}
} finally {
// 唤醒所有get()阻塞的线程。
finishCompletion();
}
return true;
}
- 根据mayInterruptIfRunning是否为true,CAS设置状态为INTERRUPTING或CANCELLED,设置成功,继续第二步,否则返回false
- 如果mayInterruptIfRunning为true,调用runner.interupt(),设置状态为INTERRUPTED
- 唤醒所有在get()方法等待的线程
总结
- 通过FutureTask不仅能够获取任务执行的结果,还有感知到任务执行的异常,甚至还可以取消任务
- FutureTask其实就是典型的异常调用的实现方式
- 比如RPC框架常用的调用方式有同步调用、异步调用,其实它们本质上都是异步调用,它们就是用FutureTask的方式来实现的
参考文章
线程池源码分析_01 FutureTask源码分析
FutureTask源码解析(JDK1.8)
FutureTask源码解读