我们来深入解析 FutureTask。下面将从它们的用法开始,逐步深入到底层实现、方法和接口设计,并探讨它们对于我们自己设计实现以及其他编程实践的学习意义。
主要就是放入等待队列(CAS操作一个链表头),完成任务的线程唤醒等待线程继续逻辑。
FutureTask 深入解析
FutureTask 是一个可取消的异步计算任务。它实现了 RunnableFuture 接口,该接口同时继承了 Runnable 和 Future 接口。这意味着 FutureTask 既可以作为一个 Runnable 被执行器(如 ExecutorService)执行,也可以作为一个 Future 来获取异步计算的结果、查询计算状态或取消计算。
FutureTask 通常用于包装一个 Callable 或 Runnable 对象。
基本用法示例:
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
public class FutureTaskExample {
public static void main(String[] args) throws Exception {
// 1. 创建 Callable 任务
Callable<String> callableTask = () -> {
System.out.println(Thread.currentThread().getName() + " is executing callable task...");
TimeUnit.SECONDS.sleep(2); // 模拟耗时操作
return "Callable Result";
};
// 2. 使用 Callable 创建 FutureTask
FutureTask<String> futureTask1 = new FutureTask<>(callableTask);
// 也可以使用 Runnable 创建 FutureTask (通常需要提供一个结果,若无则为 null)
Runnable runnableTask = () -> {
System.out.println(Thread.currentThread().getName() + " is executing runnable task...");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
};
FutureTask<String> futureTask2 = new FutureTask<>(runnableTask, "Runnable Result (if provided)");
// 3. 提交 FutureTask 到 ExecutorService 执行
// FutureTask 本身就是 Runnable,可以直接被线程执行或提交给 ExecutorService
ExecutorService executor = Executors.newFixedThreadPool(2);
executor.submit(futureTask1);
executor.submit(futureTask2);
// 或者直接 new Thread(futureTask1).start();
System.out.println("Tasks submitted.");
// 4. 获取结果 (get() 方法会阻塞直到任务完成)
try {
System.out.println("Waiting for futureTask1 result...");
String result1 = futureTask1.get(); // 阻塞等待
System.out.println("futureTask1 result: " + result1);
System.out.println("Waiting for futureTask2 result with timeout...");
// get(long timeout, TimeUnit unit) 可以设置超时
String result2 = futureTask2.get(2, TimeUnit.SECONDS);
System.out.println("futureTask2 result: " + result2);
} catch (InterruptedException e) {
System.err.println("Task interrupted: " + e.getMessage());
} catch (java.util.concurrent.ExecutionException e) {
System.err.println("Task execution failed: " + e.getCause());
} catch (java.util.concurrent.TimeoutException e) {
System.err.println("Task timed out: " + e.getMessage());
}
// 5. 检查任务状态和取消
if (!futureTask1.isDone()) {
System.out.println("futureTask1 is not done yet.");
}
if (futureTask1.isCancelled()) {
System.out.println("futureTask1 was cancelled.");
}
// 尝试取消一个未完成的任务
FutureTask<Integer> cancellableTask = new FutureTask<>(() -> {
TimeUnit.SECONDS.sleep(5);
return 100;
});
new Thread(cancellableTask).start();
Thread.sleep(100); // 给任务一点时间启动
boolean cancelled = cancellableTask.cancel(true); // true 表示如果任务正在运行,则中断它
System.out.println("CancellableTask cancelled: " + cancelled);
System.out.println("CancellableTask isCancelled: " + cancellableTask.isCancelled());
System.out.println("CancellableTask isDone: " + cancellableTask.isDone()); // cancel 后 isDone() 也为 true
executor.shutdown();
}
}
FutureTask 是 Java 并发包中一个非常核心的类,它代表一个可取消的异步计算。它巧妙地结合了 Future 接口(用于获取异步计算的结果)和 Runnable 接口(使得它可以被 Executor 执行)。
1. 状态管理 (State Management)
FutureTask 内部维护一个 volatile int state 字段来表示任务的当前状态。状态包括:
-
NEW: 初始状态,任务尚未开始或正在运行。 -
COMPLETING: 任务已完成,正在设置结果(一个短暂的中间状态)。 -
NORMAL: 任务正常完成,结果已设置。 -
EXCEPTIONAL: 任务因抛出异常而完成,异常已设置。 -
CANCELLED: 任务被取消(在开始运行前)。 -
INTERRUPTING: 任务被取消,并且正在尝试中断运行任务的线程(一个短暂的中间状态)。 -
INTERRUPTED: 任务被取消,并且运行任务的线程已被中断。
状态之间的转换通过 CAS (Compare-And-Set) 操作(使用 VarHandle STATE)来保证原子性。
2. 任务执行 (run() 方法)
当 FutureTask 的 run() 方法被调用时(通常由一个 Executor 的工作线程调用):
-
首先会通过 CAS 操作尝试将
runner字段(volatile Thread runner)从null设置为当前线程。这确保了只有一个线程可以实际执行任务。 -
如果设置成功并且任务状态是
NEW,则会调用内部的Callable对象的call()方法。 -
如果
call()方法正常返回,则调用set(V result)方法设置结果,并将状态转换为NORMAL。 -
如果
call()方法抛出异常,则调用setException(Throwable t)方法设置异常,并将状态转换为EXCEPTIONAL。 -
在
finally块中,runner字段会被重置为null。还会检查任务是否在运行期间被取消并需要中断(状态为INTERRUPTING或INTERRUPTED),如果是,则会调用handlePossibleCancellationInterrupt()处理。
3. 获取结果 (get() 和 get(long, TimeUnit) 方法)
-
get()方法:-
首先检查当前状态
s = state。 -
如果任务尚未完成 (
s <= COMPLETING),则调用awaitDone(boolean timed, long nanos)方法阻塞等待。 -
一旦任务完成(状态变为
NORMAL,EXCEPTIONAL,CANCELLED, 或INTERRUPTED),awaitDone返回,然后get()方法调用report(int s)来返回结果或抛出相应的异常。 -
NORMAL: 返回结果。 -
EXCEPTIONAL: 抛出ExecutionException(包装了原始异常)。 -
CANCELLED或INTERRUPTED: 抛出CancellationException。
-
-
get(long, TimeUnit)** 方法**:类似get(),但带有超时机制。如果在超时时间内任务未完成,则抛出TimeoutException。
4. 取消任务 (cancel(boolean mayInterruptIfRunning) 方法)
-
尝试通过 CAS 将状态从
NEW转换为CANCELLED(如果mayInterruptIfRunning为false) 或INTERRUPTING(如果mayInterruptIfRunning为true)。 -
如果 CAS 成功:
-
如果
mayInterruptIfRunning为true:-
获取
runner线程。 -
如果
runner不为null,则调用runner.interrupt()来中断执行任务的线程。 -
在
finally块中,将状态设置为INTERRUPTED(使用STATE.setRelease保证内存可见性)。
-
-
最后,调用
finishCompletion()来唤醒所有等待的线程。
-
-
如果 CAS 失败(例如任务已经完成或已被取消),则返回
false。
5. 等待队列 (WaitNode 和 waiters 字段)
-
private volatile WaitNode waiters;:这是一个指向等待线程链表头部的指针。这个链表是一个简单的 Treiber 栈 (LIFO 栈)。 -
WaitNode是一个静态内部类,代码如下:
// ...
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}
// ...
每个 WaitNode 封装了一个等待结果的线程 (thread = Thread.currentThread()) 和一个指向下一个节点的指针 (next)。
6. 阻塞和唤醒机制 (awaitDone() 和 finishCompletion())
-
awaitDone(boolean timed, long nanos):
当一个线程调用 get() 并且任务未完成时,会进入此方法。它会创建一个新的 WaitNode,然后在一个循环中:
-
通过 CAS 将新的
WaitNode添加到waiters链表的头部(实现入栈)。 -
再次检查任务状态,如果已完成,则移除刚添加的节点并返回状态。
-
如果任务仍未完成,则调用
LockSupport.park(this)(或LockSupport.parkNanos(this, nanos)) 使当前线程阻塞。 -
当线程被唤醒时,如果是因为中断,则从等待队列中移除节点并抛出
InterruptedException。如果是因为超时,则从等待队列中移除节点并返回当前状态。
-
finishCompletion():
当任务完成(通过 set, setException, 或 cancel)时,此方法被调用。
-
它会遍历
waiters链表,并对每个WaitNode中的线程调用LockSupport.unpark(q.thread)来唤醒它们。 -
遍历完成后,调用
done()方法(这是一个空方法,供子类覆盖以执行完成回调)。 -
最后将
callable设为null以帮助 GC。
哪个线程负责管理唤醒 get 等待的线程?
负责唤醒等待线程的是完成任务的那个线程。具体来说:
-
如果是任务正常执行完成或抛出异常,那么是执行
run()方法的线程(即runner线程)在调用set()或setException()后,最终会调用finishCompletion()来唤醒所有等待者。 -
如果是任务被取消,那么是调用
cancel()方法的线程在成功取消任务后,会调用finishCompletion()来唤醒所有等待者。
等待 get() 方法结果的线程被封装在 WaitNode 对象中。每个 WaitNode 包含:
-
volatile Thread thread;: 对等待线程本身的引用。 -
volatile WaitNode next;: 指向链表中下一个WaitNode的引用。
这些 WaitNode 对象形成一个后进先出 (LIFO) 的栈式链表,其头节点由 FutureTask 的 volatile WaitNode waiters; 字段指向。当一个线程需要等待时,它会创建一个新的 WaitNode 并将其 CAS 到 waiters 链表的头部。当任务完成时,finishCompletion() 方法会遍历这个链表并唤醒每个节点中的线程。
设计优势
这种设计避免了使用更重的锁(如 AbstractQueuedSynchronizer,早期版本的 FutureTask 使用过它),转而使用轻量级的 CAS 操作和 LockSupport 进行线程的阻塞和唤醒,这在很多情况下能提供更好的性能。




![[ Qt ] | 与系统相关的操作(一):鼠标相关事件](https://i-blog.csdnimg.cn/direct/5a438c4ede9445b2bbe3a948bbe5577d.png)














