CompletableFuture 到底该怎么用:从方法、技巧、踩坑到源码

很多人第一次接触 CompletableFuture,感觉它很优雅:链式调用、异步编排、异常处理、多个任务聚合,看起来比手写线程池高级得多。

但真到项目里,问题往往不是“不会用”,而是“以为自己会用”。

比如:

  • 为什么有的回调在主线程执行,有的又跑到了线程池里?
  • 为什么 thenApply()thenApplyAsync() 只差一个 Async,行为却可能完全不同?
  • 为什么我明明用了异步,结果接口还是卡住了?
  • 为什么 allOf() 执行完了,结果还得自己一个个 join() 去拿?
  • 为什么异常被包成了 CompletionException,日志看起来特别恶心?
  • 为什么线上机器 CPU 很高,最后发现是把阻塞 IO 全扔进了 ForkJoinPool.commonPool()

这篇文章不打算只列 API,而是把 CompletableFuture 真正在工程里怎么用、怎么避坑、JDK 为什么这么设计,以及核心源码怎么运作,一次讲透。

一、先建立一个正确心智模型

很多人把 CompletableFuture 理解成“异步线程工具”,这个说法不算错,但太浅了。

更准确一点,它其实同时做了两件事:

  1. 它是一个 Future,代表“未来某个时刻会完成的结果”。
  2. 它是一个 CompletionStage,代表“这个结果完成之后,还能继续挂后续动作的可编排节点”。

也就是说,CompletableFuture 不只是“拿结果”,更重要的是“描述依赖关系”。

你可以把它想成一个节点:

  • 节点里最终会有一个结果,或者一个异常。
  • 节点完成后,会触发依赖它的后续节点。
  • 节点和节点之间可以串行、并行、汇聚、竞争。

这才是它和传统 Future 最大的区别。

传统 Future 的典型写法是:

Future<Order> future = executor.submit(this::queryOrder);
Order order = future.get();

问题在于:

  • get() 是阻塞的。
  • 不能优雅地声明“拿到订单后再查用户”。
  • 多个 Future 的组合很难写。

CompletableFuture 的核心价值,就是把“等待结果”和“结果完成后的动作”拆开。

二、最常用的几类 API,到底该怎么分

1. 创建任务

最常见的是这两个:

CompletableFuture<String> future1 =
        CompletableFuture.supplyAsync(() -> "hello");

CompletableFuture<Void> future2 =
        CompletableFuture.runAsync(() -> System.out.println("task"));

区别很简单:

  • supplyAsync() 有返回值。
  • runAsync() 没有返回值。

它们默认都用 ForkJoinPool.commonPool(),也可以显式传线程池:

ExecutorService ioPool = Executors.newFixedThreadPool(16);

CompletableFuture<String> future =
        CompletableFuture.supplyAsync(() -> remoteCall(), ioPool);

这里先说结论:线上代码里,只要任务是重要业务逻辑,最好显式传线程池。

后面会解释为什么默认线程池经常踩坑。

2. 结果转换

最常见的是:

  • thenApply(): 把上一步结果映射成另一个值
  • thenAccept(): 消费上一步结果,但不返回值
  • thenRun(): 不关心上一步结果,只想接着执行一个动作
CompletableFuture<String> future = CompletableFuture
        .supplyAsync(() -> "hello")
        .thenApply(s -> s + " world")
        .thenApply(String::toUpperCase);

如果上一步返回的是 A,你想把它变成 B,优先想 thenApply()

3. 继续发起异步任务

这是最容易写错的地方。

假设你先查用户 ID,再根据用户 ID 异步查订单:

CompletableFuture<CompletableFuture<Order>> wrong = CompletableFuture
        .supplyAsync(() -> queryUserId())
        .thenApply(userId -> CompletableFuture.supplyAsync(() -> queryOrder(userId)));

你会得到一个 CompletableFuture<CompletableFuture<Order>>,也就是“套娃”。

正确写法是 thenCompose()

CompletableFuture<Order> right = CompletableFuture
        .supplyAsync(this::queryUserId)
        .thenCompose(userId ->
                CompletableFuture.supplyAsync(() -> queryOrder(userId)));

记忆方式很简单:

  • thenApply() 类似 map
  • thenCompose() 类似 flatMap

只要后续函数本身已经返回 CompletableFuture,就优先考虑 thenCompose()

4. 两个任务做组合

如果两个任务彼此独立,结果出来后再合并,常用:

  • thenCombine()
  • thenAcceptBoth()
  • runAfterBoth()
CompletableFuture<User> userFuture =
        CompletableFuture.supplyAsync(() -> queryUser(userId), ioPool);
CompletableFuture<Account> accountFuture =
        CompletableFuture.supplyAsync(() -> queryAccount(userId), ioPool);

CompletableFuture<UserView> viewFuture = userFuture.thenCombine(
        accountFuture,
        (user, account) -> new UserView(user, account)
);

这个场景非常适合接口聚合、页面拼装、多个远程依赖并发查询。

5. 多任务汇聚或竞速

allOf()anyOf() 经常被问。

CompletableFuture<Void> all = CompletableFuture.allOf(f1, f2, f3);
CompletableFuture<Object> any = CompletableFuture.anyOf(f1, f2, f3);

语义上:

  • allOf():全部完成才算完成
  • anyOf():任意一个完成就算完成

allOf() 有个很著名的问题:它不帮你收集泛型结果

所以通常要这样写:

CompletableFuture<List<Result>> allResults = CompletableFuture
        .allOf(f1, f2, f3)
        .thenApply(v -> List.of(f1.join(), f2.join(), f3.join()));

注意这里用 join() 而不是 get(),因为此时 allOf() 已经保证都完成了,不需要再做受检异常处理。

三、thenApply()thenApplyAsync() 到底差在哪

这是 CompletableFuture 最容易让人产生错觉的地方。

先看结论:

  • thenApply() 不保证异步执行,它可能由“触发前一个阶段完成的那个线程”直接执行。
  • thenApplyAsync() 才是把回调提交到线程池异步执行。

看个例子:

CompletableFuture<String> future = CompletableFuture
        .supplyAsync(() -> {
            log.info("stage1 thread={}", Thread.currentThread().getName());
            return "A";
        })
        .thenApply(value -> {
            log.info("stage2 thread={}", Thread.currentThread().getName());
            return value + "B";
        });

第二阶段 thenApply() 很可能直接由第一阶段完成时所在的线程继续执行。也就是说,它更像“同步衔接”,而不是“重新调度”。

而下面这个:

CompletableFuture<String> future = CompletableFuture
        .supplyAsync(() -> "A", ioPool)
        .thenApplyAsync(value -> value + "B", ioPool);

第二阶段会明确提交到 ioPool

所以工程上可以这么理解:

  • thenApply():适合轻量、纯计算、无阻塞的后置处理
  • thenApplyAsync():适合明确需要线程切换,或者后续逻辑可能耗时、阻塞

这里别走极端。

不是所有地方都要加 Async。如果每一步都强制重新调度,会增加线程切换和队列开销,链路短、计算轻的时候反而更慢。

四、最实用的使用姿势

1. 接口聚合

这是 CompletableFuture 最经典的落地场景。

比如一个用户主页接口,需要并发查:

  • 用户基本信息
  • 账户信息
  • 最近订单
  • 风险标签

可以这样写:

public UserProfileView queryProfile(Long userId) {
    CompletableFuture<User> userFuture =
            CompletableFuture.supplyAsync(() -> userService.query(userId), ioPool);
    CompletableFuture<Account> accountFuture =
            CompletableFuture.supplyAsync(() -> accountService.query(userId), ioPool);
    CompletableFuture<List<Order>> orderFuture =
            CompletableFuture.supplyAsync(() -> orderService.queryRecent(userId), ioPool);
    CompletableFuture<List<String>> tagFuture =
            CompletableFuture.supplyAsync(() -> tagService.queryTags(userId), ioPool);

    CompletableFuture.allOf(userFuture, accountFuture, orderFuture, tagFuture).join();

    return new UserProfileView(
            userFuture.join(),
            accountFuture.join(),
            orderFuture.join(),
            tagFuture.join()
    );
}

这种写法的价值不在“代码更炫”,而在于:

  • 多个远程调用并发执行,降低总 RT
  • 聚合逻辑集中,依赖关系清晰

前提是这些调用彼此独立,并且线程池容量、超时、降级都要配套好。

2. 异步流水线

如果任务天然是多阶段依赖,就用链式编排。

CompletableFuture<Invoice> future = CompletableFuture
        .supplyAsync(() -> queryOrder(orderId), ioPool)
        .thenApply(this::checkOrder)
        .thenCompose(order -> CompletableFuture.supplyAsync(() -> createInvoice(order), ioPool))
        .thenApply(this::enrichInvoice);

这种场景下,thenCompose() 的价值非常明显:前一步结果出来后,继续异步发起下一步。

3. 异常兜底

经常用的异常处理方法有三个:

  • exceptionally()
  • handle()
  • whenComplete()

区别如下:

  • exceptionally():只在异常时触发,并返回降级值
  • handle():无论成功失败都会触发,可以统一转换结果
  • whenComplete():无论成功失败都会触发,但通常只做记录,不改结果

例如:

CompletableFuture<User> future = CompletableFuture
        .supplyAsync(() -> userService.query(userId), ioPool)
        .exceptionally(ex -> {
            log.error("query user failed, userId={}", userId, ex);
            return User.defaultUser();
        });

如果想统一把异常包装成业务对象,可以用:

CompletableFuture<Result<User>> future = CompletableFuture
        .supplyAsync(() -> userService.query(userId), ioPool)
        .handle((user, ex) -> {
            if (ex != null) {
                return Result.fail("QUERY_USER_FAIL");
            }
            return Result.ok(user);
        });

五、工程里真正高频的坑

1. 误把默认线程池当业务线程池

supplyAsync()runAsync() 不传线程池时,默认走 ForkJoinPool.commonPool()

这个线程池的问题不是“不能用”,而是它的设计目标不是帮你兜底所有业务异步任务

ForkJoinPool 更擅长:

  • CPU 密集型任务
  • 可拆分计算
  • 非阻塞任务

如果你把下面这些任务塞进去:

  • HTTP 调用
  • 数据库查询
  • Redis 阻塞等待
  • 大量 sleep / IO

就很容易把公共线程池拖垮。

而且 commonPool 是进程级共享的。你以为只是这个方法在用,实际上项目里别的地方、第三方库也可能在用。

实践建议:

  • IO 密集型任务单独线程池
  • CPU 密集型任务单独线程池
  • 不要把阻塞 IO 扔给 commonPool

2. 只写异步提交,最后还是同步阻塞

很多代码看起来很“异步”:

CompletableFuture<User> future =
        CompletableFuture.supplyAsync(() -> userService.query(userId), ioPool);

User user = future.join();

如果你提交完马上 join(),那本质上只是把同步阻塞从“当前线程执行任务”变成了“当前线程等另一个线程执行完”。

这不一定没意义,但你要清楚:

  • 如果没有并发别的任务,收益很小
  • 还多了一次线程切换开销

CompletableFuture 的价值在于“并发组合”和“非阻塞编排”,不是把同步代码硬改成异步皮肤。

3. 在线程池线程里调用 join(),把自己卡死

这是另一个经典问题。

比如线程池只有 8 个线程,你提交了 8 个任务,每个任务内部又 join() 等另一个也要用这个线程池的任务完成,就可能形成线程饥饿甚至死锁式阻塞。

示意代码:

ExecutorService pool = Executors.newFixedThreadPool(2);

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    CompletableFuture<String> nested =
            CompletableFuture.supplyAsync(() -> slowCall(), pool);
    return nested.join();
}, pool);

如果池子很小、依赖关系再复杂一点,就容易卡住。

实践建议:

  • 避免在线程池工作线程中无脑 join() 其他依赖同池任务
  • 能链式编排就别在中间阻塞等待
  • 上下游阻塞任务尽量拆到不同线程池

4. 忘了处理异常,结果整条链静默失败

如果某个阶段抛异常,后续依赖它的普通阶段默认不会继续正常执行。

很多人只在最外层写:

future.join();

然后线上看到:

  • 日志里只有 CompletionException
  • 业务结果丢了
  • 某些收尾逻辑根本没跑

建议:

  • 链路末端至少有统一异常处理
  • 日志里记 root cause
  • 区分“业务降级”和“硬失败”

5. 把线程上下文丢了

这是和 ThreadLocal、日志 MDC、链路追踪强相关的问题。

异步线程切换之后,这些上下文默认不会自动透传:

  • ThreadLocal
  • InheritableThreadLocal 在线程池场景也不可靠
  • MDC
  • TraceId / TenantId / UserContext

所以你经常会看到:

  • 异步日志丢 traceId
  • 子任务拿不到租户信息
  • 安全上下文突然变空

解决思路通常有三类:

  1. 显式传参,最清晰
  2. 包装 Executor,提交任务时捕获上下文再恢复
  3. 使用像 TransmittableThreadLocal 这样的方案

这个坑和 CompletableFuture 本身没冲突,但它非常常见,因为 CompletableFuture 会放大线程切换。

6. allOf() 完成了,不代表你拿到了可用结果

allOf() 返回的是 CompletableFuture<Void>,它只表示“这些 future 都结束了”,并不直接给你结果集合。

而且还有两个细节:

  • 其中一个失败,allOf() 也会异常完成
  • 你如果后面 join() 每个子任务,还是要面对各自异常包装

所以实际工程里,往往要自己做一层结果模型,比如 Result<T>,把成功和失败都收进来,而不是让一个失败直接打断整组汇聚。

7. 超时不是自动存在的

很多人以为用了异步就天然“不会卡”。这当然不对。

如果远程调用一直不返回,CompletableFuture 也会一直等。

JDK 9 以后可以用:

  • orTimeout()
  • completeOnTimeout()

例如:

CompletableFuture<User> future = CompletableFuture
        .supplyAsync(() -> userService.query(userId), ioPool)
        .orTimeout(300, TimeUnit.MILLISECONDS)
        .exceptionally(ex -> User.defaultUser());

但要注意:orTimeout() 只是让 future 超时完成,不等于底层任务真的被取消了。

如果底层 HTTP 客户端、数据库驱动、RPC 框架没配自己的超时,那实际资源占用可能还在继续。

六、几个非常实用的小技巧

1. 聚合结果时,优先统一成领域对象

不要在 controller 或 service 里到处散落:

f1.join();
f2.join();
f3.join();

更推荐集中封装:

private UserProfileView buildView(
        CompletableFuture<User> userFuture,
        CompletableFuture<Account> accountFuture,
        CompletableFuture<List<Order>> orderFuture) {
    return new UserProfileView(
            userFuture.join(),
            accountFuture.join(),
            orderFuture.join()
    );
}

这样异常处理、聚合逻辑、空值语义更容易统一。

2. 区分 CPU 池和 IO 池

这是非常值钱的工程习惯。

比如:

  • 查远程服务、查数据库:ioPool
  • 本地复杂计算、规则引擎:cpuPool

不要图省事全用一个线程池。混用之后,阻塞任务会拖慢计算任务,计算任务也会挤压 IO 回调。

3. 给线程起名字

如果你排查过线上问题,就知道这个有多重要。

比如自定义线程工厂,把线程名命成:

  • user-io-pool-1
  • order-io-pool-3
  • profile-cpu-pool-2

这样日志、jstack、监控里一眼就能看出卡在哪类线程。

4. 末端统一 join(),中间尽量不要阻塞

比较推荐的姿势是:

  • 中间全靠 thenApply/thenCompose/thenCombine
  • 最末端统一 join() 或交给框架适配层处理

这样依赖链最清楚,也最不容易把异步编排写成一团阻塞式意大利面代码。

七、它的原理到底是什么

CompletableFuture 看起来像“回调链”,但内部实现并不是简单的 listener list。

你可以抓住三个核心点:

  1. 一个字段存最终结果
  2. 一个 Treiber stack 存等待触发的后继动作
  3. 完成时触发 postComplete(),把整条依赖链往下推

1. 结果字段 result

CompletableFuture 最核心的状态基本都围绕 result 展开。

在 JDK 源码里,它大致是这样的:

volatile Object result;

为什么不是泛型 T

因为它要同时表示很多状态:

  • 尚未完成:result == null
  • 正常完成:result 里放真实值,或者一个表示 null 结果的特殊对象
  • 异常完成:result 里放异常包装对象

也就是说,CompletableFuture 不是用一堆布尔值来描述状态,而是用“result 是否为空,以及里面装的是什么”来统一表达。

2. 后继动作栈 stack

每次你调用:

  • thenApply
  • thenCompose
  • whenComplete
  • thenCombine

本质上都是创建一个新的依赖节点,并把这个依赖动作挂到当前 CompletableFuturestack 上。

源码里这部分抽象叫 Completion

可以粗略理解成:

volatile Completion stack;

每个 Completion 节点都描述了一件事:

  • 依赖哪个前置 future
  • 完成后该执行什么函数
  • 执行完后如何把结果写入下一个 future

3. 完成传播 postComplete()

当一个 CompletableFuture 被正常完成或异常完成后,会调用 postComplete()

这个方法干的事情非常关键:

  • 从当前节点的 stack 里不断弹出等待的 Completion
  • 尝试执行这些 completion
  • 如果执行成功,就把结果写进依赖的下游 future
  • 下游 future 完成后,再继续触发它自己的 postComplete()

所以整个链式调用,本质上就是“前一个节点完成后,驱动后一个节点尝试完成”。

八、源码里几个值得盯住的关键点

1. UniApplyUniComposeBiApply

JDK 里不同 API 对应不同 completion 实现。

比如单输入转换常见是 UniApply,双输入组合常见是 BiApply,继续展开异步任务有 UniCompose

名字虽然多,但套路很统一:

  • 先检查依赖 future 是否已经完成
  • 从依赖 future 提取结果或异常
  • 执行用户传入函数
  • 把结果写到下游 future

也就是说,JDK 并没有为每个 API 发明完全不同的机制,只是把“单输入、双输入、消费型、转换型、异常型”做成不同 completion 模板。

2. CAS 是主角,不是锁

CompletableFuture 整体高度依赖 CAS。

典型场景包括:

  • 抢占写入 result
  • 把 completion 压入 stack
  • stack 弹出 completion

谁先 CAS 成功,谁就完成状态推进。

这样做的好处是:

  • 无锁或少锁
  • 多线程完成竞争开销低
  • 更适合高并发状态转换

这也是它性能和扩展性比较好的重要原因。

3. “同步执行”并不是偷懒,而是刻意设计

为什么 thenApply() 默认可能直接由当前线程执行,而不是强制扔线程池?

因为如果每个阶段都无脑线程切换:

  • 开销大
  • 延迟高
  • 小任务会被调度成本淹没

所以 JDK 采用了一个很务实的策略:

  • Async 版本,尽量由当前完成线程顺手执行
  • Async 版本,再提交给 executor

这就是为什么我们前面说:thenApply() 适合轻量逻辑,thenApplyAsync() 适合明确需要异步边界的逻辑。

4. 异常为什么总被包装

很多人讨厌这个:

try {
    future.join();
} catch (CompletionException ex) {
    ...
}

原因很简单:

  • join() 不想抛受检异常
  • 所以它把底层异常包装成 CompletionException
  • get() 则保留 ExecutionException 这套传统 Future 风格

工程上要记住一点:真正重要的是 ex.getCause()

九、为什么它经常和 ForkJoinPool 一起出现

因为 JDK 默认异步执行器就是它。

ForkJoinPool 的核心理念是工作窃取,适合大量小任务拆分执行。CompletableFuture 默认选它,主要是为了提供一个通用、轻量、共享的异步执行环境。

但这不代表:

  • 你的所有业务异步都该交给它
  • 它适合阻塞 IO
  • 它适合所有链路场景

很多人学习 CompletableFuture 时踩坑,本质不是 CompletableFuture 难,而是把“编排模型”和“执行器模型”混在了一起。

一定要分开看:

  • CompletableFuture 解决的是任务依赖编排
  • Executor 解决的是任务在哪跑

这两个维度都要单独设计。

十、什么时候适合用,什么时候别硬上

适合用 CompletableFuture 的场景:

  • 多个独立调用并发聚合
  • 明确的异步依赖链
  • 需要结果组合、竞速、超时、降级
  • 不想引入更重的响应式框架

不太适合硬上 CompletableFuture 的场景:

  • 只有一步同步调用,没组合需求
  • 任务天然是长时间阻塞型,并且缺少独立线程池治理
  • 链路特别复杂,已经接近流式反压、持续事件处理场景

如果你的问题已经从“几个异步任务编排”升级到:

  • 持续数据流
  • 背压控制
  • 大规模事件处理

那可能该考虑 ReactorRxJava 或者更明确的消息驱动模型,而不是继续把所有东西都塞进 CompletableFuture 链里。

十一、最后给一套实战建议

如果你在项目里要落 CompletableFuture,我建议直接记下面这些原则:

  1. 先画依赖关系,再写代码。不要一上来就开始链式调用。
  2. 业务代码里尽量显式传线程池,不要默认依赖 commonPool
  3. thenApply 做轻量转换,thenCompose 处理异步嵌套,thenCombine 处理并行汇聚。
  4. 中间尽量不阻塞,末端再统一 join()
  5. 一开始就设计异常、超时、降级,不要等线上报错再补。
  6. 注意 ThreadLocal、MDC、TraceId 这类上下文透传。
  7. IO 池、CPU 池分离,不要混用。
  8. 如果链路已经复杂得难以理解,先停下来重构,而不是继续堆 API。

十二、总结

CompletableFuture 真正难的地方,从来不是记住二三十个方法名,而是理解它背后的执行模型:

  • Async 方法很多时候是“谁完成谁执行”
  • Async 方法才意味着重新调度
  • 它的核心价值是“依赖编排”,不是“把同步代码包一层异步外衣”
  • 它的核心风险通常在线程池、阻塞、异常传播、上下文透传

如果只把它当成一个“高级 Future”,你会经常踩坑。

如果把它理解成“基于结果完成事件驱动的依赖编排框架”,很多设计就都顺了。

最后留一句最实在的话:

CompletableFuture 很强,但别迷信。能直接传参数解决的,不要上 ThreadLocal;能简单串行完成的,不要硬改异步;该拆线程池的时候,别偷懒用默认池。

这才是它在工程里真正好用的方式。