Skip to content

线程池 + future 怎么实现

约 769 字大约 3 分钟

多线程与并发小红书

2025-03-14

⭐ 题目日期:

小红书 - 2024/11/11

📝 题解:

在不使用 CountDownLatch 的情况下,可以通过 线程池(ExecutorService)Future 机制实现多线程任务的同步与聚合。以下是具体实现步骤和代码示例:


1. 核心思路

  • 提交任务:将多个任务提交到线程池,获取对应的 Future 对象。
  • 收集 Future:通过 Future 跟踪任务状态,等待所有任务完成并获取结果。
  • 结果聚合:遍历 Future 列表,通过阻塞式 get() 方法等待每个任务完成,并汇总结果。

2. 实现步骤

(1) 创建线程池

ExecutorService executor = Executors.newFixedThreadPool(4);  // 固定大小线程池

(2) 定义并提交任务

List<Future<Integer>> futures = new ArrayList<>();
// 提交多个任务到线程池
for (int i = 0; i < 10; i++) {
    int taskId = i;
    Future<Integer> future = executor.submit(() -> {
        // 模拟耗时操作
        Thread.sleep(1000);
        return taskId * 10;
    });
    futures.add(future);
}

(3) 等待所有任务完成并聚合结果

List<Integer> results = new ArrayList<>();
for (Future<Integer> future : futures) {
    try {
        results.add(future.get());  // 阻塞等待任务完成并获取结果
    } catch (InterruptedException | ExecutionException e) {
        // 处理异常(如取消任务、记录日志等)
        e.printStackTrace();
    }
}
executor.shutdown();  // 关闭线程池

(4) 输出结果

System.out.println("聚合结果: " + results);
// 输出:聚合结果: [0, 10, 20, 30, 40, 50, 60, 70, 80, 90]

3. 关键点解析

(1) Future.get() 的阻塞特性

  • future.get() 会阻塞当前线程,直到任务完成并返回结果。
  • 遍历 Future 列表时,主线程会依次等待每个任务完成,最终实现类似 CountDownLatch.await() 的效果。

(2) 异常处理

  • InterruptedException:线程被中断(如调用 executor.shutdownNow())。
  • ExecutionException:任务执行过程中抛出异常(需通过 e.getCause() 获取原始异常)。

(3) 结果聚合

  • 通过遍历 Future 列表收集结果,适用于需要任务返回值的场景。
  • 若无需返回值,可使用 Runnable 替代 Callable,并通过 Future<?> 仅等待任务完成:
Future<?> future = executor.submit(() -> {
    // 无返回值的任务逻辑
});

4. 优化方案

(1) 超时控制

通过 future.get(timeout, TimeUnit) 设置超时时间,避免无限阻塞。

try {
    future.get(2, TimeUnit.SECONDS);
} catch (TimeoutException e) {
    future.cancel(true);  // 超时后取消任务
}

(2) 批量等待(invokeAll)

使用 executor.invokeAll() 提交所有任务并一次性等待完成。

List<Callable<Integer>> tasks = new ArrayList<>();
// 填充任务列表...
List<Future<Integer>> futures = executor.invokeAll(tasks);  // 提交并等待所有任务完成

(3) 异步回调(CompletableFuture)

结合 Java 8 的 CompletableFuture 实现非阻塞式聚合。

List<CompletableFuture<Integer>> completableFutures = new ArrayList<>();
for (int i = 0; i < 10; i++) {
    int taskId = i;
    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
        // 任务逻辑
        return taskId * 10;
    }, executor);
    completableFutures.add(future);
}
// 等待所有任务完成
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
    completableFutures.toArray(new CompletableFuture[0])
);
allFutures.join();  // 阻塞直到所有任务完成
// 收集结果
List<Integer> results = completableFutures.stream()
    .map(CompletableFuture::join)
    .collect(Collectors.toList());

5. 对比 CountDownLatch 的优劣

img


6. 适用场景

  • 需要任务返回值:如并行计算后聚合结果。
  • 精细化任务控制:如单独取消某个任务、设置超时等。
  • 异步回调处理:结合 CompletableFuture 实现非阻塞流水线。