我们来深入解析 FutureTask。下面将从它们的用法开始,逐步深入到底层实现、方法和接口设计,并探讨它们对于我们自己设计实现以及其他编程实践的学习意义。
主要就是放入等待队列(CAS操作一个链表头),完成任务的线程唤醒等待线程继续逻辑。
FutureTask 深入解析
FutureTask 是一个可取消的异步计算任务。它实现了 RunnableFuture 接口,该接口同时继承了 Runnable 和 Future 接口。这意味着 FutureTask 既可以作为一个 Runnable 被执行器(如 ExecutorService)执行,也可以作为一个 Future 来获取异步计算的结果、查询计算状态或取消计算。
FutureTask 通常用于包装一个 Callable 或 Runnable 对象。
基本用法示例:
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
public class FutureTaskExample {
public static void main(String[] args) throws Exception {
// 1. 创建 Callable 任务
Callable<String> callableTask = () -> {
System.out.println(Thread.currentThread().getName() + " is executing callable task...");
TimeUnit.SECONDS.sleep(2); // 模拟耗时操作
return "Callable Result";
};
// 2. 使用 Callable 创建 FutureTask
FutureTask<String> futureTask1 = new FutureTask<>(callableTask);
// 也可以使用 Runnable 创建 FutureTask (通常需要提供一个结果,若无则为 null)
Runnable runnableTask = () -> {
System.out.println(Thread.currentThread().getName() + " is executing runnable task...");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
};
FutureTask<String> futureTask2 = new FutureTask<>(runnableTask, "Runnable Result (if provided)");
// 3. 提交 FutureTask 到 ExecutorService 执行
// FutureTask 本身就是 Runnable,可以直接被线程执行或提交给 ExecutorService
ExecutorService executor = Executors.newFixedThreadPool(2);
executor.submit(futureTask1);
executor.submit(futureTask2);
// 或者直接 new Thread(futureTask1).start();
System.out.println("Tasks submitted.");
// 4. 获取结果 (get() 方法会阻塞直到任务完成)
try {
System.out.println("Waiting for futureTask1 result...");
String result1 = futureTask1.get(); // 阻塞等待
System.out.println("futureTask1 result: " + result1);
System.out.println("Waiting for futureTask2 result with timeout...");
// get(long timeout, TimeUnit unit) 可以设置超时
String result2 = futureTask2.get(2, TimeUnit.SECONDS);
System.out.println("futureTask2 result: " + result2);
} catch (InterruptedException e) {
System.err.println("Task interrupted: " + e.getMessage());
} catch (java.util.concurrent.ExecutionException e) {
System.err.println("Task execution failed: " + e.getCause());
} catch (java.util.concurrent.TimeoutException e) {
System.err.println("Task timed out: " + e.getMessage());
}
// 5. 检查任务状态和取消
if (!futureTask1.isDone()) {
System.out.println("futureTask1 is not done yet.");
}
if (futureTask1.isCancelled()) {
System.out.println("futureTask1 was cancelled.");
}
// 尝试取消一个未完成的任务
FutureTask<Integer> cancellableTask = new FutureTask<>(() -> {
TimeUnit.SECONDS.sleep(5);
return 100;
});
new Thread(cancellableTask).start();
Thread.sleep(100); // 给任务一点时间启动
boolean cancelled = cancellableTask.cancel(true); // true 表示如果任务正在运行,则中断它
System.out.println("CancellableTask cancelled: " + cancelled);
System.out.println("CancellableTask isCancelled: " + cancellableTask.isCancelled());
System.out.println("CancellableTask isDone: " + cancellableTask.isDone()); // cancel 后 isDone() 也为 true
executor.shutdown();
}
}
FutureTask
是 Java 并发包中一个非常核心的类,它代表一个可取消的异步计算。它巧妙地结合了 Future
接口(用于获取异步计算的结果)和 Runnable
接口(使得它可以被 Executor
执行)。
1. 状态管理 (State Management)
FutureTask
内部维护一个 volatile int state
字段来表示任务的当前状态。状态包括:
-
NEW
: 初始状态,任务尚未开始或正在运行。 -
COMPLETING
: 任务已完成,正在设置结果(一个短暂的中间状态)。 -
NORMAL
: 任务正常完成,结果已设置。 -
EXCEPTIONAL
: 任务因抛出异常而完成,异常已设置。 -
CANCELLED
: 任务被取消(在开始运行前)。 -
INTERRUPTING
: 任务被取消,并且正在尝试中断运行任务的线程(一个短暂的中间状态)。 -
INTERRUPTED
: 任务被取消,并且运行任务的线程已被中断。
状态之间的转换通过 CAS (Compare-And-Set) 操作(使用 VarHandle STATE
)来保证原子性。
2. 任务执行 (run()
方法)
当 FutureTask
的 run()
方法被调用时(通常由一个 Executor
的工作线程调用):
-
首先会通过 CAS 操作尝试将
runner
字段(volatile Thread runner
)从null
设置为当前线程。这确保了只有一个线程可以实际执行任务。 -
如果设置成功并且任务状态是
NEW
,则会调用内部的Callable
对象的call()
方法。 -
如果
call()
方法正常返回,则调用set(V result)
方法设置结果,并将状态转换为NORMAL
。 -
如果
call()
方法抛出异常,则调用setException(Throwable t)
方法设置异常,并将状态转换为EXCEPTIONAL
。 -
在
finally
块中,runner
字段会被重置为null
。还会检查任务是否在运行期间被取消并需要中断(状态为INTERRUPTING
或INTERRUPTED
),如果是,则会调用handlePossibleCancellationInterrupt()
处理。
3. 获取结果 (get()
和 get(long, TimeUnit)
方法)
-
get()
方法:-
首先检查当前状态
s = state
。 -
如果任务尚未完成 (
s <= COMPLETING
),则调用awaitDone(boolean timed, long nanos)
方法阻塞等待。 -
一旦任务完成(状态变为
NORMAL
,EXCEPTIONAL
,CANCELLED
, 或INTERRUPTED
),awaitDone
返回,然后get()
方法调用report(int s)
来返回结果或抛出相应的异常。 -
NORMAL
: 返回结果。 -
EXCEPTIONAL
: 抛出ExecutionException
(包装了原始异常)。 -
CANCELLED
或INTERRUPTED
: 抛出CancellationException
。
-
-
get(long, TimeUnit)
** 方法**:类似get()
,但带有超时机制。如果在超时时间内任务未完成,则抛出TimeoutException
。
4. 取消任务 (cancel(boolean mayInterruptIfRunning)
方法)
-
尝试通过 CAS 将状态从
NEW
转换为CANCELLED
(如果mayInterruptIfRunning
为false
) 或INTERRUPTING
(如果mayInterruptIfRunning
为true
)。 -
如果 CAS 成功:
-
如果
mayInterruptIfRunning
为true
:-
获取
runner
线程。 -
如果
runner
不为null
,则调用runner.interrupt()
来中断执行任务的线程。 -
在
finally
块中,将状态设置为INTERRUPTED
(使用STATE.setRelease
保证内存可见性)。
-
-
最后,调用
finishCompletion()
来唤醒所有等待的线程。
-
-
如果 CAS 失败(例如任务已经完成或已被取消),则返回
false
。
5. 等待队列 (WaitNode
和 waiters
字段)
-
private volatile WaitNode waiters;
:这是一个指向等待线程链表头部的指针。这个链表是一个简单的 Treiber 栈 (LIFO 栈)。 -
WaitNode
是一个静态内部类,代码如下:
// ...
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}
// ...
每个 WaitNode
封装了一个等待结果的线程 (thread = Thread.currentThread()
) 和一个指向下一个节点的指针 (next
)。
6. 阻塞和唤醒机制 (awaitDone()
和 finishCompletion()
)
-
awaitDone(boolean timed, long nanos)
:
当一个线程调用 get()
并且任务未完成时,会进入此方法。它会创建一个新的 WaitNode
,然后在一个循环中:
-
通过 CAS 将新的
WaitNode
添加到waiters
链表的头部(实现入栈)。 -
再次检查任务状态,如果已完成,则移除刚添加的节点并返回状态。
-
如果任务仍未完成,则调用
LockSupport.park(this)
(或LockSupport.parkNanos(this, nanos)
) 使当前线程阻塞。 -
当线程被唤醒时,如果是因为中断,则从等待队列中移除节点并抛出
InterruptedException
。如果是因为超时,则从等待队列中移除节点并返回当前状态。
-
finishCompletion()
:
当任务完成(通过 set
, setException
, 或 cancel
)时,此方法被调用。
-
它会遍历
waiters
链表,并对每个WaitNode
中的线程调用LockSupport.unpark(q.thread)
来唤醒它们。 -
遍历完成后,调用
done()
方法(这是一个空方法,供子类覆盖以执行完成回调)。 -
最后将
callable
设为null
以帮助 GC。
哪个线程负责管理唤醒 get
等待的线程?
负责唤醒等待线程的是完成任务的那个线程。具体来说:
-
如果是任务正常执行完成或抛出异常,那么是执行
run()
方法的线程(即runner
线程)在调用set()
或setException()
后,最终会调用finishCompletion()
来唤醒所有等待者。 -
如果是任务被取消,那么是调用
cancel()
方法的线程在成功取消任务后,会调用finishCompletion()
来唤醒所有等待者。
等待 get()
方法结果的线程被封装在 WaitNode
对象中。每个 WaitNode
包含:
-
volatile Thread thread;
: 对等待线程本身的引用。 -
volatile WaitNode next;
: 指向链表中下一个WaitNode
的引用。
这些 WaitNode
对象形成一个后进先出 (LIFO) 的栈式链表,其头节点由 FutureTask
的 volatile WaitNode waiters;
字段指向。当一个线程需要等待时,它会创建一个新的 WaitNode
并将其 CAS 到 waiters
链表的头部。当任务完成时,finishCompletion()
方法会遍历这个链表并唤醒每个节点中的线程。
设计优势
这种设计避免了使用更重的锁(如 AbstractQueuedSynchronizer
,早期版本的 FutureTask
使用过它),转而使用轻量级的 CAS 操作和 LockSupport
进行线程的阻塞和唤醒,这在很多情况下能提供更好的性能。