外观
线程池 + future 怎么实现
⭐ 题目日期:
小红书 - 2024/11/11
📝 题解:
在不使用 CountDownLatch
的情况下,可以通过 线程池(ExecutorService) 和 Future 机制实现多线程任务的同步与聚合。以下是具体实现步骤和代码示例:
1. 核心思路
- 提交任务:将多个任务提交到线程池,获取对应的
Future
对象。 - 收集 Future:通过
Future
跟踪任务状态,等待所有任务完成并获取结果。 - 结果聚合:遍历
Future
列表,通过阻塞式get()
方法等待每个任务完成,并汇总结果。
2. 实现步骤
(1) 创建线程池
ExecutorService executor = Executors.newFixedThreadPool(4); // 固定大小线程池
(2) 定义并提交任务
List<Future<Integer>> futures = new ArrayList<>();
// 提交多个任务到线程池
for (int i = 0; i < 10; i++) {
int taskId = i;
Future<Integer> future = executor.submit(() -> {
// 模拟耗时操作
Thread.sleep(1000);
return taskId * 10;
});
futures.add(future);
}
(3) 等待所有任务完成并聚合结果
List<Integer> results = new ArrayList<>();
for (Future<Integer> future : futures) {
try {
results.add(future.get()); // 阻塞等待任务完成并获取结果
} catch (InterruptedException | ExecutionException e) {
// 处理异常(如取消任务、记录日志等)
e.printStackTrace();
}
}
executor.shutdown(); // 关闭线程池
(4) 输出结果
System.out.println("聚合结果: " + results);
// 输出:聚合结果: [0, 10, 20, 30, 40, 50, 60, 70, 80, 90]
3. 关键点解析
(1) Future.get() 的阻塞特性
future.get()
会阻塞当前线程,直到任务完成并返回结果。- 遍历
Future
列表时,主线程会依次等待每个任务完成,最终实现类似CountDownLatch.await()
的效果。
(2) 异常处理
- InterruptedException:线程被中断(如调用
executor.shutdownNow()
)。 - ExecutionException:任务执行过程中抛出异常(需通过
e.getCause()
获取原始异常)。
(3) 结果聚合
- 通过遍历
Future
列表收集结果,适用于需要任务返回值的场景。 - 若无需返回值,可使用
Runnable
替代Callable
,并通过Future<?>
仅等待任务完成:
Future<?> future = executor.submit(() -> {
// 无返回值的任务逻辑
});
4. 优化方案
(1) 超时控制
通过 future.get(timeout, TimeUnit)
设置超时时间,避免无限阻塞。
try {
future.get(2, TimeUnit.SECONDS);
} catch (TimeoutException e) {
future.cancel(true); // 超时后取消任务
}
(2) 批量等待(invokeAll)
使用 executor.invokeAll()
提交所有任务并一次性等待完成。
List<Callable<Integer>> tasks = new ArrayList<>();
// 填充任务列表...
List<Future<Integer>> futures = executor.invokeAll(tasks); // 提交并等待所有任务完成
(3) 异步回调(CompletableFuture)
结合 Java 8 的 CompletableFuture
实现非阻塞式聚合。
List<CompletableFuture<Integer>> completableFutures = new ArrayList<>();
for (int i = 0; i < 10; i++) {
int taskId = i;
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
// 任务逻辑
return taskId * 10;
}, executor);
completableFutures.add(future);
}
// 等待所有任务完成
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
completableFutures.toArray(new CompletableFuture[0])
);
allFutures.join(); // 阻塞直到所有任务完成
// 收集结果
List<Integer> results = completableFutures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
5. 对比 CountDownLatch 的优劣
6. 适用场景
- 需要任务返回值:如并行计算后聚合结果。
- 精细化任务控制:如单独取消某个任务、设置超时等。
- 异步回调处理:结合
CompletableFuture
实现非阻塞流水线。