JUC核心工具详解:CountDownLatch、Semaphore、CompletableFuture、BlockingQueue

对应简历段落

在保险销售活动、商机流转、月底结算等高并发链路中,基于 JUC 工具完成批量任务编排、外部接口并发控制、异步结果聚合和任务队列治理。熟悉 CountDownLatchSemaphoreCompletableFutureBlockingQueue 等并发工具的底层语义和适用边界,能够结合 RocketMQ、线程池、限流熔断、幂等补偿设计稳定的异步处理方案。

这段简历表面是在说“会用 JUC”,但资深面试官通常会继续追问:为什么这里用 Semaphore 而不是调大线程池?CompletableFuture.allOf 异常怎么处理?BlockingQueue 容量怎么估?CountDownLatch 卡死怎么排查?这些问题的本质不是 API 记忆,而是你是否理解并发工具背后的资源约束、失败传播和业务一致性。

业务背景

保险销售系统里经常出现“一个请求触发多类子任务”的场景。比如代理人进入客户详情页,系统需要并行查询客户画像、历史保单、活动资格、产品推荐、商机跟进记录和权益信息。如果全部串行调用,接口延迟会被所有下游耗时累加;如果无脑并行,又可能把数据库连接池、保司接口或画像服务打满。

商机流转也类似。投保意向创建后,需要完成商机分配、代理人通知、客户标签刷新、运营埋点、待办生成、活动库存预占等动作。不同动作优先级不同,有的必须成功,有的可以延迟,有的失败后要重试,有的可以降级。并发工具的价值,是把这些动作从“代码顺序调用”升级成“有边界、有预算、有补偿的并发编排”。

月底结算的压力更典型。结算任务可能按机构、产品、代理人、月份拆成大量批次。每个批次要读取订单、计算佣金、写结算明细、生成对账记录。这里既需要并发提升吞吐,又要避免同一机构或同一代理人的数据被重复计算,还要在某个批次失败时支持重跑。JUC 工具用得好,可以把复杂批处理做得清晰;用得不好,会留下卡死、线程泄漏、任务丢失和结果不一致。

核心原理

CountDownLatch 是一次性倒计时门闩。它的核心语义是:一个或多个线程等待其他线程完成指定数量的事件。它适合“主线程等待多个子任务完成后再汇总”的场景,例如结算任务拆成 10 个分片,全部处理结束后更新总任务状态。它不能重置,计数归零后就永久打开。如果需要反复使用,应考虑 CyclicBarrier 或重新创建对象。

Semaphore 是信号量,本质是许可证池。线程执行某段逻辑前先获取许可证,执行完释放许可证。它经常用于限制外部依赖并发,例如保司接口最多允许本系统同时 20 个请求,或者某个导出任务最多同时跑 3 个。它和线程池不同:线程池限制的是任务执行线程数量,Semaphore 限制的是某个临界资源的并发访问量。一个线程池内可能访问多个外部资源,每个资源应该有自己的并发预算。

CompletableFuture 是异步编排工具。它既能提交异步任务,也能表达任务之间的依赖关系,比如并行查询后聚合、前一步成功后继续、失败后兜底、任一任务完成后返回。它比裸 Future 更适合业务编排,但也更容易埋坑:默认线程池是 ForkJoinPool.commonPool,异常会被包在 CompletionException 里,allOf 只告诉你整体完成,不直接返回每个子任务结果。

BlockingQueue 是阻塞队列,是生产者消费者模型的核心组件。线程池的任务队列、异步日志队列、批量写入缓冲都依赖它。常见实现包括 ArrayBlockingQueueLinkedBlockingQueuePriorityBlockingQueueDelayQueueSynchronousQueue。面试时要能说清楚:有界还是无界、公平还是非公平、是否保持优先级、是否支持延迟、入队出队是否可能阻塞,以及阻塞后如何响应中断。

这四类工具的共同点是:它们都不直接解决业务问题,而是提供并发控制原语。真正的工程能力体现在你能把它们和业务状态机、超时控制、监控告警、失败补偿结合起来。

项目落地

在活动资格聚合接口中,可以用 CompletableFuture 并行查询多个下游。同步链路只保留用户最关心的信息:活动资格、产品可售状态、保费试算结果。客户画像、推荐标签、营销权益可以设置更短超时,失败后返回默认值,避免弱依赖拖慢主链路。

在保司接口调用中,可以用 Semaphore 控制并发。即使业务线程池有 64 个线程,也不代表保司接口能承载 64 个并发。对外部系统要按 SLA 设置独立许可证数,并配合 tryAcquire(timeout)。拿不到许可证时快速降级或进入稍后重试,而不是无限等待。

在月底结算中,可以用 CountDownLatch 等待分片任务结束,但要注意不能让主调度线程永久等待。每个分片任务必须在 finallycountDown,并且总等待要带超时。更稳妥的方式是任务状态落库,调度线程只负责派发,完成状态由分片任务异步更新;CountDownLatch 适合单进程内短生命周期编排,不适合跨机器、长时间批处理的最终一致性。

在商机流转消费者中,可以用 BlockingQueue 做本地缓冲,但必须有容量和拒绝策略。高峰期 MQ 消费速度大于本地处理速度时,如果本地队列无界,堆内存会被任务对象慢慢撑满。更合理的做法是有界队列配合 MQ 重试:队列满时返回稍后消费,让压力留在 MQ,而不是压在 JVM 堆里。

关键伪代码或流程

CompletableFuture 聚合活动页信息:

public ActivityView loadActivityView(String customerId, String activityId) {
    CompletableFuture<Qualification> qualificationFuture =
            CompletableFuture.supplyAsync(() -> qualificationService.check(customerId, activityId), bizExecutor);

    CompletableFuture<ProductStatus> productFuture =
            CompletableFuture.supplyAsync(() -> productService.queryStatus(activityId), bizExecutor);

    CompletableFuture<CustomerTags> tagsFuture =
            CompletableFuture.supplyAsync(() -> tagService.query(customerId), bizExecutor)
                    .completeOnTimeout(CustomerTags.empty(), 120, TimeUnit.MILLISECONDS)
                    .exceptionally(ex -> CustomerTags.empty());

    CompletableFuture.allOf(qualificationFuture, productFuture, tagsFuture)
            .orTimeout(800, TimeUnit.MILLISECONDS)
            .join();

    return ActivityView.builder()
            .qualification(qualificationFuture.join())
            .productStatus(productFuture.join())
            .tags(tagsFuture.join())
            .build();
}

Semaphore 控制保司接口并发:

public InsurerResult callInsurer(InsurerRequest request) {
    boolean acquired = insurerSemaphore.tryAcquire(200, TimeUnit.MILLISECONDS);
    if (!acquired) {
        throw new BizBusyException("insurer channel busy");
    }
    try {
        return insurerClient.submit(request);
    } finally {
        insurerSemaphore.release();
    }
}

CountDownLatch 处理分片任务:

CountDownLatch latch = new CountDownLatch(shards.size());
for (SettlementShard shard : shards) {
    settlementExecutor.execute(() -> {
        try {
            settlementService.calculate(shard);
        } catch (Exception ex) {
            shardRepository.markFailed(shard.id(), ex.getMessage());
        } finally {
            latch.countDown();
        }
    });
}

boolean completed = latch.await(30, TimeUnit.MINUTES);
if (!completed) {
    taskRepository.markTimeout(taskId);
}

BlockingQueue 与 MQ 消费反压:

public ConsumeConcurrentlyStatus consume(List<MessageExt> messages) {
    for (MessageExt message : messages) {
        boolean offered = localQueue.offer(parse(message));
        if (!offered) {
            metrics.recordQueueFull("opportunity-flow");
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
    }
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}

推荐流程是:入口限流先挡住超过承载能力的请求;同步接口只做核心状态落库;异步消息进入 MQ;消费者按队列容量和外部依赖许可证稳定处理;失败任务落库并支持补偿;监控持续观察队列长度、等待时间、拒绝次数、下游耗时和重试次数。

常见坑与排查

第一个坑是 CountDownLatch 没有在 finally 里倒计数。只要某个子任务抛异常且没有 countDown,主线程就可能一直等。排查时看线程栈,如果大量线程卡在 CountDownLatch.await,再结合业务日志找未完成分片。

第二个坑是 Semaphore 获取后没有释放,或者异常分支提前返回。许可证泄漏后系统表现为并发越来越低,最终所有请求都拿不到许可证。排查重点是许可证剩余数量、接口异常日志和 finally 释放逻辑。

第三个坑是 CompletableFuture 使用默认线程池。默认公共线程池被其他业务占满后,异步任务会互相影响。生产项目应显式传入业务线程池,并按链路拆分。

第四个坑是 allOf 后直接 join,但没处理某个子任务异常。一个子任务失败会导致整体异常,弱依赖本来可以降级,却把整个接口打失败。关键依赖和弱依赖要分开设计异常策略。

第五个坑是 BlockingQueue 使用无界队列。短期看没有拒绝,长期看延迟越来越高,最终 Full GC 或 OOM。排查时要看队列长度趋势、对象大小、生产消费速率差。

第六个坑是吞掉中断。takeputawaittryAcquire 都可能响应中断。捕获 InterruptedException 后至少要恢复中断标记:Thread.currentThread().interrupt(),否则线程池关闭、任务取消会变得不可靠。

面试追问

追问一:Semaphore 和线程池限流有什么区别?

追问二:CompletableFuture 异常怎么处理?exceptionallyhandlewhenComplete 有什么区别?

追问三:为什么不建议使用无界 LinkedBlockingQueue

追问四:CountDownLatch 卡住了你怎么排查?

追问五:CompletableFuture.allOf 如何拿到每个任务结果?

追问六:本地阻塞队列满了,为什么让 MQ 重试比阻塞消费线程更合理?

推荐回答

可以这样回答:我在项目中不会把 JUC 工具当成孤立 API 使用,而是先判断业务目标是并发聚合、资源限速、生产消费解耦还是任务完成等待。比如活动页多下游查询用 CompletableFuture,每个弱依赖有独立超时和降级;保司接口用 Semaphore 控制并发,因为限制的是外部通道资源,不只是本地线程;商机流转用有界 BlockingQueue 和 MQ 重试做反压,避免把峰值压到 JVM 堆里;结算分片短周期等待可以用 CountDownLatch,但长周期任务最终还是靠任务表状态来保证可恢复。

如果面试官继续问异常处理,我会强调三点。第一,所有异步任务必须显式线程池,避免公共线程池污染。第二,强依赖失败要让主流程失败,弱依赖失败要返回默认值或降级结果。第三,所有等待都要有超时,所有失败都要能被指标、日志和补偿任务追踪。

如果问排查,我会从线程池指标、队列长度、许可证数量、线程栈、MQ 重试次数、下游耗时这几个维度定位。并发问题很少只看一段代码就能判断,必须结合运行时指标看生产和消费是否失衡、资源是否泄漏、失败是否被吞掉。

延伸学习路线

第一阶段,掌握 JUC 基础工具的语义:CountDownLatchCyclicBarrierSemaphoreExchangerBlockingQueueFutureCompletableFuture。重点不是背 API,而是画出线程等待、唤醒、异常传播和超时路径。

第二阶段,阅读 ThreadPoolExecutor 和常见阻塞队列源码,理解 AQS、条件队列、锁竞争和内存可见性。能说清楚为什么队列容量会影响最大线程数,为什么阻塞方法要处理中断。

第三阶段,把并发工具放回业务场景。设计一个活动提交链路、一个商机 MQ 消费链路、一个月底结算批处理链路,分别说明用哪些并发工具、怎么限速、怎么补偿、怎么监控。

第四阶段,结合线上稳定性治理学习。把 JUC 与 Sentinel、RocketMQ、数据库连接池、RPC 超时、熔断降级、监控告警结合起来,形成完整的高并发治理方法论。