RocketMQ 在商机流转系统中的解耦设计

1. 对应简历段落

对应简历中可以这样描述的一段项目经历:

在 B2B 商机管理系统中,负责客户、销售、活动、出单等核心链路的系统解耦与稳定性治理。基于 RocketMQ 建设异步事件流转机制,承接日均 3 万+ 线索、商机、活动、出单状态变更消息,通过 Topic/Tag 分层、事务消息、消费重试、幂等控制、死信补偿和监控告警,实现商机流转链路削峰填谷和最终一致性,降低系统间同步调用耦合,提升营销活动高峰期系统稳定性。

这段简历不是在强调“我用过 RocketMQ”,而是在强调三件事:

第一,业务链路足够复杂。B2B 商机从线索进入,到客户建档、销售分配、跟进记录、活动邀约、报价方案、出单状态回写,中间会穿过多个系统。如果每一步都同步调用,任何一个下游慢或者不可用,都会反向影响主流程。

第二,数据量不是互联网秒杀级,但对企业系统已经有明显峰谷。日均 3 万+ 线索商机活动出单流转,平均下来不高,但活动日、渠道批量导入、销售集中跟进、出单回写时会形成短时间尖峰。稳定性设计重点不是追求极限 TPS,而是让峰值不要把核心系统打穿。

第三,系统目标是“业务可持续流转”。在保险、企业客户或 B2B 销售场景中,商机状态、客户标签、销售待办、活动触达、出单结果都不允许长期不一致,但也不一定要求所有系统在同一个数据库事务里立即完成。因此 MQ 的价值在于把强同步链路拆成事件驱动链路,在可控时间内达成最终一致。

2. 业务背景

商机流转系统通常处在多个系统的交汇处。上游可能有官网表单、广告投放线索、合作渠道导入、客服录入、营销活动报名、老客户转介绍;中间有客户中心、商机中心、销售工作台、活动运营系统;下游还有报价、核保、出单、佣金、报表、消息触达等系统。

一个典型流程可能是:

  1. 渠道系统推送一条企业客户线索。
  2. 商机系统做去重、清洗、黑名单校验、客户归并。
  3. 客户中心创建或更新企业客户档案。
  4. 商机中心生成商机,并根据区域、行业、客户等级分配销售。
  5. 销售工作台生成待办任务。
  6. 活动系统根据客户标签加入营销活动名单。
  7. 销售跟进后提交报价或投保意向。
  8. 出单系统完成出单后回写商机阶段。
  9. 报表系统统计渠道转化率、销售跟进率、活动出单率。

如果把这些动作全部放在一个同步请求里,接口会越来越长,故障半径会越来越大。客户中心慢一点,线索入口就超时;活动系统不可用,商机创建也失败;报表写入慢,销售工作台也被拖住。这种设计在日常低峰时看起来还能跑,一到活动集中投放或者月底出单冲刺,就会暴露大量问题:接口超时、线程池耗尽、数据库连接占满、重复提交、状态错乱、运营看不到实时数据。

所以这个项目的核心不是“引入一个中间件”,而是重新划分业务边界:哪些动作必须在主流程立即完成,哪些动作可以异步完成,哪些状态需要事务保证,哪些状态接受最终一致,哪些失败必须人工介入。

3. 为什么需要 MQ

在商机流转系统里,MQ 主要解决四类问题。

第一是解耦。客户、销售、活动、出单系统都有自己的业务模型和发布节奏。商机系统不应该硬编码调用所有下游接口,也不应该因为新增一个报表订阅方就修改主流程。通过 RocketMQ,商机系统只发布“商机已创建”“商机已分配”“活动已报名”“出单已完成”等领域事件,下游系统按需订阅。新增消费者不会改变生产者主逻辑。

第二是削峰填谷。日均 3 万+ 流转量,按 24 小时平均只有每秒不到 1 条,但真实情况不是均匀分布。比如上午营销活动开始后 30 分钟内导入 8000 条线索,销售团队下午集中处理,晚上出单系统批量回写。如果同步调用,下游接口和数据库要承受瞬时峰值。MQ 可以把峰值先沉淀在 Broker,消费者按照自身能力平稳消费。

第三是失败隔离。同步链路里,下游失败会直接让上游失败;异步链路里,下游失败会转化为消息重试、延迟消费、死信补偿、告警处理。主流程可以先完成关键数据落库,避免用户或销售在入口处被阻塞。

第四是最终一致性。商机创建后,客户标签、销售待办、活动名单、报表明细可以在几秒到几十秒内陆续完成。只要有可追踪的消息状态、幂等消费和补偿机制,业务上是可接受的。面试时要特别说明,不是所有场景都适合 MQ。如果是支付扣款、保单核心状态强一致更新,不能简单丢给异步消息,而要结合本地事务、事务消息、对账和补偿。

4. Topic/Tag 设计

Topic/Tag 设计的关键是不要过粗,也不要过细。过粗会导致消费者过滤困难、权限边界模糊、监控指标不好看;过细会导致 Topic 数量膨胀、运维复杂度上升、业务演进困难。

在商机流转场景中,可以按照领域边界设计 Topic,按照事件类型设计 Tag:

TopicTag说明典型消费者
b2b_lead_eventLEAD_CREATED线索创建商机中心、客户中心、报表
b2b_lead_eventLEAD_CLEANED线索清洗完成商机中心、运营系统
b2b_opportunity_eventOPP_CREATED商机创建销售工作台、活动系统、报表
b2b_opportunity_eventOPP_ASSIGNED商机分配销售销售工作台、通知中心
b2b_opportunity_eventOPP_STAGE_CHANGED商机阶段变化报表、客户标签、风控
b2b_activity_eventACTIVITY_JOINED客户加入活动活动系统、触达系统
b2b_policy_eventPOLICY_ISSUED出单完成商机中心、销售绩效、报表
b2b_policy_eventPOLICY_CANCELLED撤单或退保商机中心、佣金、报表

消息体不建议直接塞数据库整行,也不建议只发一个 ID 后让消费者大量回查。更稳妥的做法是传递“事件最小闭环信息”:事件 ID、业务主键、事件类型、发生时间、状态版本、关键业务字段、traceId、来源系统。消费者如果只需要轻量信息,可以直接消费;如果需要完整详情,再回查主系统。

一个简化消息体示例:

{
  "eventId": "EVT202605010001",
  "eventType": "OPP_ASSIGNED",
  "opportunityId": "OPP202605010009",
  "customerId": "CUST9988",
  "salesId": "S10086",
  "stage": "ASSIGNED",
  "version": 3,
  "occurredAt": "2026-05-01T10:15:30+08:00",
  "source": "opportunity-service",
  "traceId": "9f2b1c..."
}

这里的 eventId 用于消息幂等,opportunityId 是业务聚合根,version 用于乱序保护,traceId 用于链路追踪。对于需要顺序消费的商机状态变更,可以用 opportunityId 作为 sharding key,把同一个商机的状态消息路由到同一个队列,降低乱序概率。

5. 生产者消费者设计

生产者侧的设计原则是:先保证本地业务状态正确,再可靠地把事件发布出去。以商机创建为例,主流程应该完成线索去重、客户归并、商机落库等核心动作,然后发送 OPP_CREATED 事件。对下游非核心动作,比如生成销售待办、加入活动名单、刷新报表,不再同步调用。

生产者要关注几个点:

  1. 发送时设置业务 key,便于 RocketMQ 控制台按 key 查询。
  2. 消息体中带上 traceId,串起入口请求、数据库记录、消息和消费者日志。
  3. 对关键事件使用事务消息或本地消息表,避免“数据库提交成功但消息没发出去”。
  4. 对非关键通知类事件可以使用普通消息,但也要记录发送失败日志和补偿任务。

消费者侧的设计原则是:消费逻辑尽量短、可重入、可重试。消费者不应该在一次消费里做过多跨系统调用,也不应该依赖内存状态。每个消费者只处理自己边界内的事情。例如销售工作台消费者收到 OPP_ASSIGNED 后,只负责生成或更新待办;活动系统消费者收到 OPP_CREATED 后,只负责判断是否进入活动池;报表消费者只写宽表或明细表。

简化伪代码如下:

public ConsumeResult consume(OpportunityEvent event) {
    String eventId = event.getEventId();

    if (idempotentRepository.exists(eventId, "sales-task")) {
        return ConsumeResult.SUCCESS;
    }

    OpportunityTask task = taskRepository.findByOpportunityId(event.getOpportunityId());
    if (task == null) {
        taskRepository.insert(buildTask(event));
    } else if (event.getVersion() > task.getOpportunityVersion()) {
        taskRepository.updateOwnerAndStage(task.getId(), event.getSalesId(), event.getStage(), event.getVersion());
    }

    idempotentRepository.markSuccess(eventId, "sales-task");
    return ConsumeResult.SUCCESS;
}

这里有两个细节。第一,幂等记录的维度不是单纯 eventId,而是 eventId + consumerName,因为同一条事件会被多个系统消费。第二,更新时比较版本号,避免旧消息覆盖新状态。

消费者线程数需要结合业务能力设置。不是线程越多越好。如果报表消费者每条消息都写数据库,线程开太大只会把数据库打满。更稳妥的做法是按消费者组拆分,分别设置线程池、批量消费参数、限流阈值和告警阈值。对于日均 3 万+ 的规模,通常瓶颈不在 RocketMQ 本身,而在消费者下游数据库、第三方接口和业务锁冲突。

6. 幂等与重试

MQ 的消费语义通常要按“至少一次”理解。也就是说,只要系统异常、网络抖动、消费者超时、Broker 认为没有成功确认,就可能重复投递。因此业务消费者必须做幂等。

常见幂等方案有四种:

  1. 唯一索引:例如销售待办表对 opportunity_id + task_type 建唯一索引,重复插入时转为更新或忽略。
  2. 幂等表:记录 event_id + consumer_group + status,消费前检查,消费成功后落表。
  3. 状态机校验:只有当前状态允许流转时才更新,例如商机不能从“已出单”退回“已分配”。
  4. 版本号控制:消息里带业务版本,数据库更新时加 where version < newVersion

在商机系统里,推荐组合使用。比如销售待办适合唯一索引,商机阶段适合状态机和版本号,报表明细适合幂等表或唯一业务键。单靠 Redis 去重不够稳,因为 Redis 过期、缓存丢失或主从切换后仍可能重复消费;可以用 Redis 做短期加速,但最终幂等最好落到数据库约束或持久化记录。

重试设计要区分“可恢复失败”和“不可恢复失败”。数据库短暂超时、下游服务临时不可用、连接池抖动,适合重试;消息字段缺失、业务状态非法、客户不存在且无法补偿,盲目重试只会反复失败。

消费者处理失败时,不建议直接吞异常返回成功。这样消息看起来消费成功,但业务实际丢了。更合理的方式是:

  1. 对可恢复异常抛出,让 RocketMQ 进入重试。
  2. 对明确不可恢复异常记录失败原因,写入异常表,必要时返回成功避免无意义重试。
  3. 重试次数耗尽后进入死信队列,由补偿任务或人工处理。
  4. 对重点链路设置失败率和堆积告警,不能等业务方发现数据不对才排查。

重试间隔要考虑业务承受能力。商机待办晚几分钟生成可以接受,但出单回写长期失败会影响销售绩效和客户服务。可以对不同 Topic 设置不同告警等级,对出单类消息给更短的响应时间。

7. 事务与一致性

商机流转最容易被追问的是:如果商机创建数据库提交成功,但 MQ 消息发送失败怎么办?如果 MQ 消息发送成功,但本地事务回滚怎么办?

RocketMQ 事务消息解决的是本地事务和消息发送之间的一致性问题。基本流程是:

  1. 生产者先发送半消息到 Broker。
  2. Broker 保存半消息,但不投递给消费者。
  3. 生产者执行本地事务,例如创建商机、写客户归并关系、写事件表。
  4. 本地事务成功后提交消息,Broker 才投递。
  5. 本地事务失败后回滚消息,Broker 删除半消息。
  6. 如果生产者在提交或回滚前宕机,Broker 会回查生产者本地事务状态。

在项目里可以把本地事务状态落到事件表:

字段说明
event_id全局事件 ID
biz_id商机 ID 或出单 ID
event_type事件类型
tx_statusINIT / COMMIT / ROLLBACK
payload消息体
created_at创建时间
updated_at更新时间

事务回查时,根据 event_id 查询本地事件表和业务表。如果商机记录存在且事件状态为 COMMIT,就提交消息;如果明确失败或不存在,就回滚;如果数据库暂时不可用,可以返回未知,让 RocketMQ 后续继续回查。

但要注意,事务消息不是分布式事务万能药。它只能保证“本地事务成功后消息最终能被投递”,不能保证消费者处理一定成功。消费者失败仍然要靠重试、幂等、死信和补偿。对于“客户中心创建客户”和“商机中心创建商机”这种跨系统强依赖,也要谨慎拆分。如果商机必须绑定客户 ID,可以把客户建档作为主流程内的强依赖;而客户标签、销售待办、报表更新则异步化。

最终一致性的关键是定义清楚一致性的时间窗口和补偿机制。比如:

  1. 商机创建后 5 秒内销售待办可见,超过 1 分钟告警。
  2. 出单完成后 30 秒内商机阶段更新为“已出单”,超过 5 分钟进入补偿任务。
  3. 报表数据允许 T+0 延迟,但核心经营看板必须在 10 分钟内收敛。
  4. 每晚对商机、出单、销售待办、报表明细做对账,发现缺失事件自动补发。

面试时讲最终一致性,最好不要停留在“异步消息最终会成功”。真正可靠的最终一致性,是事务消息保证源头事件不丢,幂等消费保证重复不乱,重试和死信保证失败可见,对账补偿保证长期不漏。

8. 监控告警

RocketMQ 上线后,如果没有监控告警,只是把同步问题换成了异步黑盒。商机流转系统至少要监控四层指标。

第一层是 Broker 和 Topic 指标。包括消息写入 TPS、消费 TPS、消息堆积量、消费延迟、队列分布、Broker 磁盘水位、Broker CPU 和网络。对于日均 3 万+ 的规模,平时指标可能很平稳,真正要关注的是活动导入和出单批处理时是否出现堆积陡增。

第二层是消费者组指标。每个消费者组要看消费成功率、失败率、平均耗时、最大耗时、重试次数、死信数量。比如 sales-task-consumer 堆积会影响销售工作台,policy-sync-consumer 堆积会影响出单回写,告警优先级不能一样。

第三层是业务指标。包括当天线索量、商机创建量、分配成功量、待办生成量、活动入池量、出单回写量、各阶段转化量。如果 MQ 指标正常但业务指标断崖式下降,说明可能是生产者没有发消息,或者过滤规则、Tag、消费者逻辑出了问题。

第四层是链路追踪和日志。每条关键消息需要能通过 traceIdeventId 查到入口请求日志、生产者发送日志、Broker 查询结果、消费者处理日志、业务表落库记录。排查时可以按顺序问:业务事件是否生成?消息是否发送成功?Broker 是否存在?消费者是否收到?幂等是否误判?业务更新是否成功?

告警阈值可以按业务分层设置:

链路告警示例
商机创建事件5 分钟无消息写入但入口有线索量,P1
销售待办消费堆积超过 1000 或延迟超过 1 分钟,P2
出单回写消费任意死信或延迟超过 5 分钟,P1
报表消费堆积超过 5000 或延迟超过 10 分钟,P3
消息发送失败连续失败超过 10 次或失败率超过 1%,P1

监控的目标不是追求图表好看,而是让问题在业务投诉前暴露出来。

9. 常见坑

第一个坑是把 MQ 当成异步线程池。很多系统只是把原来的同步 HTTP 调用搬到消费者里,但消费者内部仍然串行调用多个下游接口,失败也没有幂等和补偿。这样只是把慢从入口挪到了后台,稳定性并没有真正提升。

第二个坑是消息粒度设计不清。发送“商机发生变化”这种过于笼统的事件,会导致消费者不知道到底该做什么,只能回查大量数据并猜测状态变化。更好的方式是明确事件语义,如 OPP_ASSIGNEDOPP_STAGE_CHANGEDPOLICY_ISSUED

第三个坑是忽视重复消费。只在测试环境跑一次成功就上线,生产环境遇到消费者重启、网络抖动、超时重投后,就会出现重复待办、重复短信、重复报表明细。所有消费者都要默认消息会重复。

第四个坑是忽视乱序。RocketMQ 只能在特定队列内保证顺序,普通消息跨队列不保证全局顺序。商机状态如果先消费“已出单”,后消费“已分配”,就可能状态回退。解决方式是按商机 ID 选择队列、数据库版本号控制、状态机拒绝非法回退。

第五个坑是死信没人看。死信队列不是垃圾桶,而是故障收敛点。必须有死信告警、查询页面、重放工具和人工处理流程。否则业务数据会长期卡在死信里,直到月末对账才发现。

第六个坑是事务消息理解过度。事务消息解决生产者本地事务和消息发送的一致性,不解决消费者业务一定成功,也不替代对账。它是链路可靠性的起点,不是终点。

第七个坑是消费者无节制扩容。堆积一出现就加线程、加实例,可能会把数据库、缓存或第三方接口压垮。扩容前要先看消费耗时分布、慢 SQL、连接池、锁等待和下游限流。

第八个坑是消息体缺少版本和来源。没有版本号,乱序时无法判断新旧;没有来源系统,排查时不知道是谁发的;没有 traceId,日志串不起来;没有事件时间,无法判断延迟到底发生在哪里。

10. 面试追问

面试官围绕这段经历,常见追问会集中在这些方向:

  1. 你们为什么选择 RocketMQ,而不是直接同步调用或使用定时任务?
  2. 日均 3 万+ 流转量其实不算特别大,为什么还需要 MQ?
  3. Topic 和 Tag 是怎么拆的?有没有遇到 Topic 过多或过少的问题?
  4. 商机状态变更如何保证顺序?
  5. 消费者重复消费怎么处理?幂等表怎么设计?
  6. 如果消费者处理一半失败了,怎么保证不会产生脏数据?
  7. 事务消息具体解决什么问题?本地事务回查怎么实现?
  8. 如果消息进入死信队列,你们怎么处理?
  9. MQ 堆积时你怎么排查?是先扩容消费者吗?
  10. 出单系统回写商机状态,如果消息延迟 10 分钟,业务上怎么兜底?
  11. 如何证明最终一致性真的达成了?
  12. 线上有没有出现过重复待办、状态乱序、消息丢失之类的问题?怎么复盘?

这些追问的核心不在 RocketMQ API,而在工程判断。面试官想确认你是否真的理解异步化后的风险,而不是只会说“削峰填谷、系统解耦”八个字。

11. 推荐回答

可以按下面的思路回答:

我们当时的 B2B 商机流转链路比较长,上游有线索导入和活动报名,中间有客户中心、商机中心、销售工作台,下游还有出单、报表和通知。最早如果用同步调用,商机创建接口会串多个系统,下游慢或者不可用会直接影响入口。后来我们把主流程收敛为客户归并、商机落库、核心状态更新,把销售待办、活动入池、报表统计、部分通知触达拆成 RocketMQ 事件。

然后讲设计:

Topic 按领域拆,比如线索事件、商机事件、活动事件、出单事件;Tag 按具体事件类型拆,比如商机创建、商机分配、阶段变化、出单完成。消息里会带 eventId、业务 ID、版本号、事件时间、source 和 traceId。消费者按系统拆消费者组,每个消费者只处理自己边界内的事情。

接着讲可靠性:

对关键事件,我们用事务消息或本地事件表保证本地事务和消息发送一致。消费者侧默认至少一次投递,所以必须做幂等。比如销售待办用唯一索引防重复,商机状态更新用状态机和版本号防乱序,报表明细用 eventId 加 consumerGroup 做幂等。失败时可恢复异常进入重试,不可恢复异常记录异常表;重试耗尽进入死信队列,并有告警和重放工具。

再讲削峰:

日均 3 万+ 不代表压力均匀,活动导入、销售集中跟进、出单批量回写会有明显峰值。MQ 的作用是把瞬时峰值沉淀下来,让消费者按数据库和下游系统能承受的速度消费。堆积时不会第一时间盲目加线程,而是先看消费耗时、失败率、慢 SQL、数据库连接池和下游接口响应。

最后讲一致性边界:

我们没有把所有事情都做成强一致。客户建档和商机落库是主流程强依赖;销售待办、报表、活动入池是最终一致。最终一致不是放任不管,而是有时间窗口、告警、死信和对账补偿。比如出单回写超过几分钟未完成会告警,每晚也会按商机和出单数据做对账,发现缺失再补发事件。

这个回答的重点是把“为什么拆”“怎么拆”“失败怎么办”“如何证明可靠”讲完整。

12. 延伸学习路线

第一阶段,先把 RocketMQ 基础概念吃透。重点理解 Producer、Consumer、Broker、NameServer、Topic、Queue、ConsumerGroup、Tag、普通消息、顺序消息、延迟消息、事务消息。不要只背定义,要结合商机流转思考每个概念解决什么问题。

第二阶段,深入消息可靠性。重点学习至少一次投递、重复消费、消费确认、重试队列、死信队列、消息堆积、消息过滤、消息轨迹。可以用一个“商机分配生成销售待办”的小 demo,故意让消费者抛异常、超时、重复启动,观察消息如何重试。

第三阶段,学习事务消息和本地消息表模式。对比两种方案的优缺点:事务消息链路更直接,但依赖 RocketMQ 事务回查机制;本地消息表更通用,可控性强,但需要定时扫描和状态维护。面试时能说出取舍,比只会说一个方案更有说服力。

第四阶段,补齐分布式一致性知识。理解强一致、最终一致、TCC、Saga、本地事务、补偿事务、对账机制。商机流转不是资金扣款场景,很多环节适合最终一致;但出单、佣金、保单核心状态要更谨慎。

第五阶段,做稳定性演练。模拟 Broker 短暂不可用、消费者宕机、数据库慢 SQL、消息重复投递、消息乱序、死信堆积。每一种故障都要能回答:业务影响是什么,监控怎么看,临时怎么止血,长期怎么修复。

第六阶段,把 MQ 和可观测性结合起来。学习日志 traceId、MDC、链路追踪、Prometheus 指标、Grafana 看板、告警分级。真正的生产经验不是“我接入了 MQ”,而是“我能在消息链路出问题时快速定位是生产端、Broker、消费端还是业务数据问题”。

对于这篇文章对应的简历经历,最终要形成一句能讲清楚的总结:

RocketMQ 在这个项目里不是为了追求高并发炫技,而是把 B2B 商机从线索、客户、销售、活动到出单的长链路拆成可观测、可重试、可补偿的领域事件流,在日均 3 万+ 流转和活动峰值下,通过削峰填谷、事务消息、幂等消费、死信处理和对账补偿,实现系统解耦与最终一致。