MQ削峰填谷在商机流转系统中的落地

对应简历段落

在保险销售活动系统中,针对活动高峰期投保意向集中创建、代理人批量跟进、保司状态回写和商机流转压力突增的问题,引入 RocketMQ 完成核心链路异步解耦与削峰填谷。负责消息模型设计、消费端线程池隔离、幂等处理、重试补偿、积压监控和故障恢复,保障商机在高并发下稳定流转。

这段简历的重点不是“用了 MQ”,而是你能否讲清楚:为什么这里需要 MQ?削峰削的是哪个峰?填谷填到哪里?消息失败、重复、乱序、积压时怎么保证业务正确?面试官通常会沿着“可靠性、一致性、可观测性、容量规划”连续追问。

业务背景

商机流转是保险销售链路中的核心后置流程。用户在活动页提交投保意向后,系统要创建商机、分配代理人、生成跟进待办、推送企微或短信、刷新客户标签、同步 CRM、触发后续营销策略。活动高峰时,入口请求具有明显脉冲:一分钟内可能涌入大量投保意向,但代理人通知、标签刷新、CRM 同步并不需要全部在用户请求内完成。

如果同步执行这些动作,用户提交接口会被下游耗时拖慢。短信通道慢、CRM 接口抖动、标签服务超时都会影响用户提交成功率。更严重的是,同步链路中每多一个外部依赖,就多一个高峰期雪崩点。入口线程被慢调用占满后,即使数据库和核心业务还健康,用户也会看到大量超时。

MQ 的价值在于把瞬时高峰变成可控消费。用户请求内只完成必要校验和主单落库,然后发送商机流转消息。后续动作由消费者按自身能力慢慢处理。这样入口系统承接的是写主单和投递消息的压力,商机处理系统承接的是可调速的消费压力。

核心原理

MQ 削峰填谷的核心是生产速度和消费速度解耦。没有 MQ 时,生产者和消费者在同一个调用栈里,消费者慢会直接拖慢生产者。有 MQ 后,生产者把事件写入队列即可返回,消费者按照配置的线程数、批量大小和下游承载能力消费。

削峰不是消灭流量,而是把峰值暂存在消息系统里。填谷是指在业务低峰时继续消费积压,把高峰期没处理完的任务逐步补齐。因此,设计时不能只看入口 QPS,还要看消息积压容量、可接受延迟、消费者吞吐和失败重试成本。

消息系统带来的新问题是可靠性和一致性。投保意向主单落库成功但消息发送失败,商机就不会流转;消息发送成功但消费者处理失败,需要重试;消费者处理成功但提交 offset 失败,消息可能重复;同一客户多条消息并发消费,可能导致状态乱序。因此,业务必须天然接受“至少一次投递”,通过幂等和状态机保证重复消息不会产生错误结果。

常见可靠性方案包括事务消息、本地消息表、消费幂等表、业务唯一索引、失败重试队列和人工补偿入口。对保险销售系统来说,消息可以延迟,但关键商机不能丢;通知可以重复风险低,但佣金、权益、分配结果不能重复。

项目落地

商机流转消息设计时,应避免把所有业务动作塞进一个大消息。更合理的是按领域事件拆分:OpportunityCreatedEvent 表示投保意向创建;OpportunityAssignedEvent 表示商机已分配;FollowTaskCreatedEvent 表示跟进任务已生成;InsurerStatusChangedEvent 表示保司状态变化。事件之间通过业务状态推进,而不是消费者互相调用形成隐式链路。

生产端要保证主单和消息的一致性。可以使用 RocketMQ 事务消息:先发送半消息,再执行本地事务创建投保意向,最后提交或回滚消息。也可以使用本地消息表:在同一个数据库事务中写主单和待发送消息,由后台任务可靠投递。两种方式都可以,选择取决于团队基础设施和运维经验。

消费端要按业务优先级拆分消费组和线程池。商机分配属于核心后置链路,应使用独立消费组和线程池;短信通知、运营埋点可以单独消费,允许更长延迟;CRM 同步依赖外部系统,必须设置并发上限和熔断策略;月底结算消息不能和在线商机消息共用线程池。

幂等设计是落地重点。消费者处理 OpportunityCreatedEvent 时,应以 eventIdopportunityId + actionType 作为幂等键。先尝试插入消费记录,插入成功才执行业务;如果唯一键冲突,说明已经处理过或正在处理,需要根据状态返回成功或稍后重试。业务表本身也要有条件更新,例如只有 WAIT_ASSIGN 状态才能分配代理人。

重试策略要区分临时失败和永久失败。数据库死锁、RPC 超时、线程池满属于临时失败,可以让 MQ 延迟重试;参数缺失、业务状态非法、客户不存在属于永久失败,应记录失败原因并进入人工补偿或死信处理。不能所有异常都无限重试,否则会制造重试风暴。

关键伪代码或流程

生产端本地消息表:

@Transactional
public SubmitResult submitOpportunity(SubmitCommand command) {
    Opportunity opportunity = opportunityRepository.create(command);

    DomainEvent event = DomainEvent.builder()
            .eventId(IdGenerator.next())
            .bizKey(opportunity.id())
            .eventType("OPPORTUNITY_CREATED")
            .payload(toJson(opportunity))
            .status("INIT")
            .build();
    localMessageRepository.insert(event);

    return SubmitResult.success(opportunity.id());
}

可靠投递任务:

public void publishLocalMessages() {
    List<DomainEvent> events = localMessageRepository.scanInit(200);
    for (DomainEvent event : events) {
        try {
            rocketTemplate.send(event.topic(), event.key(), event.payload());
            localMessageRepository.markSent(event.eventId());
        } catch (Exception ex) {
            localMessageRepository.increaseRetry(event.eventId(), ex.getMessage());
        }
    }
}

消费端幂等处理:

public ConsumeConcurrentlyStatus consume(MessageExt message) {
    OpportunityCreatedEvent event = parse(message);
    try {
        boolean first = consumeLogRepository.tryStart(event.eventId(), "ASSIGN_OPPORTUNITY");
        if (!first) {
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }

        assignService.assignIfWaiting(event.opportunityId());
        consumeLogRepository.markSuccess(event.eventId());
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    } catch (TemporaryException ex) {
        consumeLogRepository.markRetry(event.eventId(), ex.getMessage());
        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
    } catch (Exception ex) {
        consumeLogRepository.markFailed(event.eventId(), ex.getMessage());
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}

状态条件更新:

update opportunity
   set status = 'ASSIGNED',
       agent_id = ?,
       assign_time = now()
 where id = ?
   and status = 'WAIT_ASSIGN';

整体流程可以概括为:入口限流;创建投保意向和本地消息;后台可靠投递;MQ 暂存高峰;消费者按优先级和容量消费;业务幂等推进状态;临时失败延迟重试;永久失败进入补偿;监控积压和消费延迟。

常见坑与排查

第一个坑是消息发送和数据库事务不一致。主单落库后发送消息失败,或者消息发出后事务回滚,都会导致业务缺口。排查时要对比主单表、本地消息表、MQ 投递日志和消费者日志。

第二个坑是消费端没有幂等。MQ 至少一次投递意味着重复消息是正常现象,不是异常现象。没有幂等时,重复消息会导致重复分配、重复通知、重复生成待办。

第三个坑是把 MQ 当数据库。消息体过大、保存太多快照、依赖消息长期保存做查询,都会让系统难以维护。消息应该承载事件事实和必要上下文,详细数据可按业务 ID 回查。

第四个坑是消费线程数盲目调大。消费快不代表系统稳,可能把数据库连接池、CRM 接口或保司接口打爆。消费者吞吐要受下游最弱资源约束。

第五个坑是重试风暴。下游服务大面积故障时,消费者持续失败又快速重试,会放大压力。应配置延迟级别、最大重试次数、熔断降级和死信处理。

第六个坑是积压不可见。只看接口成功率,不看 topic 积压、消费延迟、重试次数、死信数量,就无法判断削峰是否正在变成风险。

面试追问

追问一:MQ 如何保证消息不丢?

追问二:如何处理重复消费?

追问三:如何保证消息顺序?

追问四:消费积压了怎么办?

追问五:事务消息和本地消息表怎么选?

追问六:哪些业务适合同步,哪些适合异步?

推荐回答

我会先说明 MQ 在商机流转里解决的是入口峰值和下游处理能力不匹配的问题。用户请求内只完成核心单据落库和消息可靠投递,商机分配、通知、CRM 同步等动作异步消费。这样即使活动瞬时流量很高,也不会让用户请求等待所有后置动作完成。

可靠性上,我会用事务消息或本地消息表保证主单和事件一致;消费端按事件 ID 和业务动作做幂等;业务状态推进使用条件更新;临时异常交给 MQ 延迟重试,永久异常进入失败表和补偿流程。顺序问题上,如果同一商机强依赖顺序,可以按 opportunityId 做消息 key,保证同一队列内顺序消费;但更推荐通过状态机容忍乱序,例如只有当前状态满足条件才推进。

如果出现积压,我不会第一反应就加消费者线程。先看积压来自生产突增、消费者异常、下游变慢还是线程池拒绝。再决定是临时扩容消费者、降低非核心消费、增加延迟重试、隔离慢下游,还是暂停低优先级 topic。削峰填谷的本质是稳定处理,而不是把队列尽快清空。

延伸学习路线

第一阶段,掌握 MQ 基础概念:topic、tag、consumer group、offset、重试、死信、顺序消息、事务消息。

第二阶段,学习可靠消息方案:本地消息表、事务消息、Outbox Pattern、幂等消费、消息去重表。

第三阶段,学习消费端治理:线程池隔离、批量消费、延迟重试、死信补偿、积压告警、消费限速。

第四阶段,结合业务建模学习领域事件。不要把消息当 RPC 的替代品,而要表达业务已经发生的事实。

第五阶段,做一次容量推演:入口峰值 QPS、消息大小、broker 存储、消费者 TPS、最大可接受延迟、重试比例和补偿能力。能讲出这些数字,简历里的 MQ 才真正可信。