RocketMQ 在商机流转系统中的解耦设计
1. 对应简历段落
对应简历中可以这样描述的一段项目经历:
在 B2B 商机管理系统中,负责客户、销售、活动、出单等核心链路的系统解耦与稳定性治理。基于 RocketMQ 建设异步事件流转机制,承接日均 3 万+ 线索、商机、活动、出单状态变更消息,通过 Topic/Tag 分层、事务消息、消费重试、幂等控制、死信补偿和监控告警,实现商机流转链路削峰填谷和最终一致性,降低系统间同步调用耦合,提升营销活动高峰期系统稳定性。
这段简历不是在强调“我用过 RocketMQ”,而是在强调三件事:
第一,业务链路足够复杂。B2B 商机从线索进入,到客户建档、销售分配、跟进记录、活动邀约、报价方案、出单状态回写,中间会穿过多个系统。如果每一步都同步调用,任何一个下游慢或者不可用,都会反向影响主流程。
第二,数据量不是互联网秒杀级,但对企业系统已经有明显峰谷。日均 3 万+ 线索商机活动出单流转,平均下来不高,但活动日、渠道批量导入、销售集中跟进、出单回写时会形成短时间尖峰。稳定性设计重点不是追求极限 TPS,而是让峰值不要把核心系统打穿。
第三,系统目标是“业务可持续流转”。在保险、企业客户或 B2B 销售场景中,商机状态、客户标签、销售待办、活动触达、出单结果都不允许长期不一致,但也不一定要求所有系统在同一个数据库事务里立即完成。因此 MQ 的价值在于把强同步链路拆成事件驱动链路,在可控时间内达成最终一致。
2. 业务背景
商机流转系统通常处在多个系统的交汇处。上游可能有官网表单、广告投放线索、合作渠道导入、客服录入、营销活动报名、老客户转介绍;中间有客户中心、商机中心、销售工作台、活动运营系统;下游还有报价、核保、出单、佣金、报表、消息触达等系统。
一个典型流程可能是:
- 渠道系统推送一条企业客户线索。
- 商机系统做去重、清洗、黑名单校验、客户归并。
- 客户中心创建或更新企业客户档案。
- 商机中心生成商机,并根据区域、行业、客户等级分配销售。
- 销售工作台生成待办任务。
- 活动系统根据客户标签加入营销活动名单。
- 销售跟进后提交报价或投保意向。
- 出单系统完成出单后回写商机阶段。
- 报表系统统计渠道转化率、销售跟进率、活动出单率。
如果把这些动作全部放在一个同步请求里,接口会越来越长,故障半径会越来越大。客户中心慢一点,线索入口就超时;活动系统不可用,商机创建也失败;报表写入慢,销售工作台也被拖住。这种设计在日常低峰时看起来还能跑,一到活动集中投放或者月底出单冲刺,就会暴露大量问题:接口超时、线程池耗尽、数据库连接占满、重复提交、状态错乱、运营看不到实时数据。
所以这个项目的核心不是“引入一个中间件”,而是重新划分业务边界:哪些动作必须在主流程立即完成,哪些动作可以异步完成,哪些状态需要事务保证,哪些状态接受最终一致,哪些失败必须人工介入。
3. 为什么需要 MQ
在商机流转系统里,MQ 主要解决四类问题。
第一是解耦。客户、销售、活动、出单系统都有自己的业务模型和发布节奏。商机系统不应该硬编码调用所有下游接口,也不应该因为新增一个报表订阅方就修改主流程。通过 RocketMQ,商机系统只发布“商机已创建”“商机已分配”“活动已报名”“出单已完成”等领域事件,下游系统按需订阅。新增消费者不会改变生产者主逻辑。
第二是削峰填谷。日均 3 万+ 流转量,按 24 小时平均只有每秒不到 1 条,但真实情况不是均匀分布。比如上午营销活动开始后 30 分钟内导入 8000 条线索,销售团队下午集中处理,晚上出单系统批量回写。如果同步调用,下游接口和数据库要承受瞬时峰值。MQ 可以把峰值先沉淀在 Broker,消费者按照自身能力平稳消费。
第三是失败隔离。同步链路里,下游失败会直接让上游失败;异步链路里,下游失败会转化为消息重试、延迟消费、死信补偿、告警处理。主流程可以先完成关键数据落库,避免用户或销售在入口处被阻塞。
第四是最终一致性。商机创建后,客户标签、销售待办、活动名单、报表明细可以在几秒到几十秒内陆续完成。只要有可追踪的消息状态、幂等消费和补偿机制,业务上是可接受的。面试时要特别说明,不是所有场景都适合 MQ。如果是支付扣款、保单核心状态强一致更新,不能简单丢给异步消息,而要结合本地事务、事务消息、对账和补偿。
4. Topic/Tag 设计
Topic/Tag 设计的关键是不要过粗,也不要过细。过粗会导致消费者过滤困难、权限边界模糊、监控指标不好看;过细会导致 Topic 数量膨胀、运维复杂度上升、业务演进困难。
在商机流转场景中,可以按照领域边界设计 Topic,按照事件类型设计 Tag:
| Topic | Tag | 说明 | 典型消费者 |
|---|---|---|---|
b2b_lead_event | LEAD_CREATED | 线索创建 | 商机中心、客户中心、报表 |
b2b_lead_event | LEAD_CLEANED | 线索清洗完成 | 商机中心、运营系统 |
b2b_opportunity_event | OPP_CREATED | 商机创建 | 销售工作台、活动系统、报表 |
b2b_opportunity_event | OPP_ASSIGNED | 商机分配销售 | 销售工作台、通知中心 |
b2b_opportunity_event | OPP_STAGE_CHANGED | 商机阶段变化 | 报表、客户标签、风控 |
b2b_activity_event | ACTIVITY_JOINED | 客户加入活动 | 活动系统、触达系统 |
b2b_policy_event | POLICY_ISSUED | 出单完成 | 商机中心、销售绩效、报表 |
b2b_policy_event | POLICY_CANCELLED | 撤单或退保 | 商机中心、佣金、报表 |
消息体不建议直接塞数据库整行,也不建议只发一个 ID 后让消费者大量回查。更稳妥的做法是传递“事件最小闭环信息”:事件 ID、业务主键、事件类型、发生时间、状态版本、关键业务字段、traceId、来源系统。消费者如果只需要轻量信息,可以直接消费;如果需要完整详情,再回查主系统。
一个简化消息体示例:
{
"eventId": "EVT202605010001",
"eventType": "OPP_ASSIGNED",
"opportunityId": "OPP202605010009",
"customerId": "CUST9988",
"salesId": "S10086",
"stage": "ASSIGNED",
"version": 3,
"occurredAt": "2026-05-01T10:15:30+08:00",
"source": "opportunity-service",
"traceId": "9f2b1c..."
}
这里的 eventId 用于消息幂等,opportunityId 是业务聚合根,version 用于乱序保护,traceId 用于链路追踪。对于需要顺序消费的商机状态变更,可以用 opportunityId 作为 sharding key,把同一个商机的状态消息路由到同一个队列,降低乱序概率。
5. 生产者消费者设计
生产者侧的设计原则是:先保证本地业务状态正确,再可靠地把事件发布出去。以商机创建为例,主流程应该完成线索去重、客户归并、商机落库等核心动作,然后发送 OPP_CREATED 事件。对下游非核心动作,比如生成销售待办、加入活动名单、刷新报表,不再同步调用。
生产者要关注几个点:
- 发送时设置业务 key,便于 RocketMQ 控制台按 key 查询。
- 消息体中带上 traceId,串起入口请求、数据库记录、消息和消费者日志。
- 对关键事件使用事务消息或本地消息表,避免“数据库提交成功但消息没发出去”。
- 对非关键通知类事件可以使用普通消息,但也要记录发送失败日志和补偿任务。
消费者侧的设计原则是:消费逻辑尽量短、可重入、可重试。消费者不应该在一次消费里做过多跨系统调用,也不应该依赖内存状态。每个消费者只处理自己边界内的事情。例如销售工作台消费者收到 OPP_ASSIGNED 后,只负责生成或更新待办;活动系统消费者收到 OPP_CREATED 后,只负责判断是否进入活动池;报表消费者只写宽表或明细表。
简化伪代码如下:
public ConsumeResult consume(OpportunityEvent event) {
String eventId = event.getEventId();
if (idempotentRepository.exists(eventId, "sales-task")) {
return ConsumeResult.SUCCESS;
}
OpportunityTask task = taskRepository.findByOpportunityId(event.getOpportunityId());
if (task == null) {
taskRepository.insert(buildTask(event));
} else if (event.getVersion() > task.getOpportunityVersion()) {
taskRepository.updateOwnerAndStage(task.getId(), event.getSalesId(), event.getStage(), event.getVersion());
}
idempotentRepository.markSuccess(eventId, "sales-task");
return ConsumeResult.SUCCESS;
}
这里有两个细节。第一,幂等记录的维度不是单纯 eventId,而是 eventId + consumerName,因为同一条事件会被多个系统消费。第二,更新时比较版本号,避免旧消息覆盖新状态。
消费者线程数需要结合业务能力设置。不是线程越多越好。如果报表消费者每条消息都写数据库,线程开太大只会把数据库打满。更稳妥的做法是按消费者组拆分,分别设置线程池、批量消费参数、限流阈值和告警阈值。对于日均 3 万+ 的规模,通常瓶颈不在 RocketMQ 本身,而在消费者下游数据库、第三方接口和业务锁冲突。
6. 幂等与重试
MQ 的消费语义通常要按“至少一次”理解。也就是说,只要系统异常、网络抖动、消费者超时、Broker 认为没有成功确认,就可能重复投递。因此业务消费者必须做幂等。
常见幂等方案有四种:
- 唯一索引:例如销售待办表对
opportunity_id + task_type建唯一索引,重复插入时转为更新或忽略。 - 幂等表:记录
event_id + consumer_group + status,消费前检查,消费成功后落表。 - 状态机校验:只有当前状态允许流转时才更新,例如商机不能从“已出单”退回“已分配”。
- 版本号控制:消息里带业务版本,数据库更新时加
where version < newVersion。
在商机系统里,推荐组合使用。比如销售待办适合唯一索引,商机阶段适合状态机和版本号,报表明细适合幂等表或唯一业务键。单靠 Redis 去重不够稳,因为 Redis 过期、缓存丢失或主从切换后仍可能重复消费;可以用 Redis 做短期加速,但最终幂等最好落到数据库约束或持久化记录。
重试设计要区分“可恢复失败”和“不可恢复失败”。数据库短暂超时、下游服务临时不可用、连接池抖动,适合重试;消息字段缺失、业务状态非法、客户不存在且无法补偿,盲目重试只会反复失败。
消费者处理失败时,不建议直接吞异常返回成功。这样消息看起来消费成功,但业务实际丢了。更合理的方式是:
- 对可恢复异常抛出,让 RocketMQ 进入重试。
- 对明确不可恢复异常记录失败原因,写入异常表,必要时返回成功避免无意义重试。
- 重试次数耗尽后进入死信队列,由补偿任务或人工处理。
- 对重点链路设置失败率和堆积告警,不能等业务方发现数据不对才排查。
重试间隔要考虑业务承受能力。商机待办晚几分钟生成可以接受,但出单回写长期失败会影响销售绩效和客户服务。可以对不同 Topic 设置不同告警等级,对出单类消息给更短的响应时间。
7. 事务与一致性
商机流转最容易被追问的是:如果商机创建数据库提交成功,但 MQ 消息发送失败怎么办?如果 MQ 消息发送成功,但本地事务回滚怎么办?
RocketMQ 事务消息解决的是本地事务和消息发送之间的一致性问题。基本流程是:
- 生产者先发送半消息到 Broker。
- Broker 保存半消息,但不投递给消费者。
- 生产者执行本地事务,例如创建商机、写客户归并关系、写事件表。
- 本地事务成功后提交消息,Broker 才投递。
- 本地事务失败后回滚消息,Broker 删除半消息。
- 如果生产者在提交或回滚前宕机,Broker 会回查生产者本地事务状态。
在项目里可以把本地事务状态落到事件表:
| 字段 | 说明 |
|---|---|
event_id | 全局事件 ID |
biz_id | 商机 ID 或出单 ID |
event_type | 事件类型 |
tx_status | INIT / COMMIT / ROLLBACK |
payload | 消息体 |
created_at | 创建时间 |
updated_at | 更新时间 |
事务回查时,根据 event_id 查询本地事件表和业务表。如果商机记录存在且事件状态为 COMMIT,就提交消息;如果明确失败或不存在,就回滚;如果数据库暂时不可用,可以返回未知,让 RocketMQ 后续继续回查。
但要注意,事务消息不是分布式事务万能药。它只能保证“本地事务成功后消息最终能被投递”,不能保证消费者处理一定成功。消费者失败仍然要靠重试、幂等、死信和补偿。对于“客户中心创建客户”和“商机中心创建商机”这种跨系统强依赖,也要谨慎拆分。如果商机必须绑定客户 ID,可以把客户建档作为主流程内的强依赖;而客户标签、销售待办、报表更新则异步化。
最终一致性的关键是定义清楚一致性的时间窗口和补偿机制。比如:
- 商机创建后 5 秒内销售待办可见,超过 1 分钟告警。
- 出单完成后 30 秒内商机阶段更新为“已出单”,超过 5 分钟进入补偿任务。
- 报表数据允许 T+0 延迟,但核心经营看板必须在 10 分钟内收敛。
- 每晚对商机、出单、销售待办、报表明细做对账,发现缺失事件自动补发。
面试时讲最终一致性,最好不要停留在“异步消息最终会成功”。真正可靠的最终一致性,是事务消息保证源头事件不丢,幂等消费保证重复不乱,重试和死信保证失败可见,对账补偿保证长期不漏。
8. 监控告警
RocketMQ 上线后,如果没有监控告警,只是把同步问题换成了异步黑盒。商机流转系统至少要监控四层指标。
第一层是 Broker 和 Topic 指标。包括消息写入 TPS、消费 TPS、消息堆积量、消费延迟、队列分布、Broker 磁盘水位、Broker CPU 和网络。对于日均 3 万+ 的规模,平时指标可能很平稳,真正要关注的是活动导入和出单批处理时是否出现堆积陡增。
第二层是消费者组指标。每个消费者组要看消费成功率、失败率、平均耗时、最大耗时、重试次数、死信数量。比如 sales-task-consumer 堆积会影响销售工作台,policy-sync-consumer 堆积会影响出单回写,告警优先级不能一样。
第三层是业务指标。包括当天线索量、商机创建量、分配成功量、待办生成量、活动入池量、出单回写量、各阶段转化量。如果 MQ 指标正常但业务指标断崖式下降,说明可能是生产者没有发消息,或者过滤规则、Tag、消费者逻辑出了问题。
第四层是链路追踪和日志。每条关键消息需要能通过 traceId 或 eventId 查到入口请求日志、生产者发送日志、Broker 查询结果、消费者处理日志、业务表落库记录。排查时可以按顺序问:业务事件是否生成?消息是否发送成功?Broker 是否存在?消费者是否收到?幂等是否误判?业务更新是否成功?
告警阈值可以按业务分层设置:
| 链路 | 告警示例 |
|---|---|
| 商机创建事件 | 5 分钟无消息写入但入口有线索量,P1 |
| 销售待办消费 | 堆积超过 1000 或延迟超过 1 分钟,P2 |
| 出单回写消费 | 任意死信或延迟超过 5 分钟,P1 |
| 报表消费 | 堆积超过 5000 或延迟超过 10 分钟,P3 |
| 消息发送失败 | 连续失败超过 10 次或失败率超过 1%,P1 |
监控的目标不是追求图表好看,而是让问题在业务投诉前暴露出来。
9. 常见坑
第一个坑是把 MQ 当成异步线程池。很多系统只是把原来的同步 HTTP 调用搬到消费者里,但消费者内部仍然串行调用多个下游接口,失败也没有幂等和补偿。这样只是把慢从入口挪到了后台,稳定性并没有真正提升。
第二个坑是消息粒度设计不清。发送“商机发生变化”这种过于笼统的事件,会导致消费者不知道到底该做什么,只能回查大量数据并猜测状态变化。更好的方式是明确事件语义,如 OPP_ASSIGNED、OPP_STAGE_CHANGED、POLICY_ISSUED。
第三个坑是忽视重复消费。只在测试环境跑一次成功就上线,生产环境遇到消费者重启、网络抖动、超时重投后,就会出现重复待办、重复短信、重复报表明细。所有消费者都要默认消息会重复。
第四个坑是忽视乱序。RocketMQ 只能在特定队列内保证顺序,普通消息跨队列不保证全局顺序。商机状态如果先消费“已出单”,后消费“已分配”,就可能状态回退。解决方式是按商机 ID 选择队列、数据库版本号控制、状态机拒绝非法回退。
第五个坑是死信没人看。死信队列不是垃圾桶,而是故障收敛点。必须有死信告警、查询页面、重放工具和人工处理流程。否则业务数据会长期卡在死信里,直到月末对账才发现。
第六个坑是事务消息理解过度。事务消息解决生产者本地事务和消息发送的一致性,不解决消费者业务一定成功,也不替代对账。它是链路可靠性的起点,不是终点。
第七个坑是消费者无节制扩容。堆积一出现就加线程、加实例,可能会把数据库、缓存或第三方接口压垮。扩容前要先看消费耗时分布、慢 SQL、连接池、锁等待和下游限流。
第八个坑是消息体缺少版本和来源。没有版本号,乱序时无法判断新旧;没有来源系统,排查时不知道是谁发的;没有 traceId,日志串不起来;没有事件时间,无法判断延迟到底发生在哪里。
10. 面试追问
面试官围绕这段经历,常见追问会集中在这些方向:
- 你们为什么选择 RocketMQ,而不是直接同步调用或使用定时任务?
- 日均 3 万+ 流转量其实不算特别大,为什么还需要 MQ?
- Topic 和 Tag 是怎么拆的?有没有遇到 Topic 过多或过少的问题?
- 商机状态变更如何保证顺序?
- 消费者重复消费怎么处理?幂等表怎么设计?
- 如果消费者处理一半失败了,怎么保证不会产生脏数据?
- 事务消息具体解决什么问题?本地事务回查怎么实现?
- 如果消息进入死信队列,你们怎么处理?
- MQ 堆积时你怎么排查?是先扩容消费者吗?
- 出单系统回写商机状态,如果消息延迟 10 分钟,业务上怎么兜底?
- 如何证明最终一致性真的达成了?
- 线上有没有出现过重复待办、状态乱序、消息丢失之类的问题?怎么复盘?
这些追问的核心不在 RocketMQ API,而在工程判断。面试官想确认你是否真的理解异步化后的风险,而不是只会说“削峰填谷、系统解耦”八个字。
11. 推荐回答
可以按下面的思路回答:
我们当时的 B2B 商机流转链路比较长,上游有线索导入和活动报名,中间有客户中心、商机中心、销售工作台,下游还有出单、报表和通知。最早如果用同步调用,商机创建接口会串多个系统,下游慢或者不可用会直接影响入口。后来我们把主流程收敛为客户归并、商机落库、核心状态更新,把销售待办、活动入池、报表统计、部分通知触达拆成 RocketMQ 事件。
然后讲设计:
Topic 按领域拆,比如线索事件、商机事件、活动事件、出单事件;Tag 按具体事件类型拆,比如商机创建、商机分配、阶段变化、出单完成。消息里会带 eventId、业务 ID、版本号、事件时间、source 和 traceId。消费者按系统拆消费者组,每个消费者只处理自己边界内的事情。
接着讲可靠性:
对关键事件,我们用事务消息或本地事件表保证本地事务和消息发送一致。消费者侧默认至少一次投递,所以必须做幂等。比如销售待办用唯一索引防重复,商机状态更新用状态机和版本号防乱序,报表明细用 eventId 加 consumerGroup 做幂等。失败时可恢复异常进入重试,不可恢复异常记录异常表;重试耗尽进入死信队列,并有告警和重放工具。
再讲削峰:
日均 3 万+ 不代表压力均匀,活动导入、销售集中跟进、出单批量回写会有明显峰值。MQ 的作用是把瞬时峰值沉淀下来,让消费者按数据库和下游系统能承受的速度消费。堆积时不会第一时间盲目加线程,而是先看消费耗时、失败率、慢 SQL、数据库连接池和下游接口响应。
最后讲一致性边界:
我们没有把所有事情都做成强一致。客户建档和商机落库是主流程强依赖;销售待办、报表、活动入池是最终一致。最终一致不是放任不管,而是有时间窗口、告警、死信和对账补偿。比如出单回写超过几分钟未完成会告警,每晚也会按商机和出单数据做对账,发现缺失再补发事件。
这个回答的重点是把“为什么拆”“怎么拆”“失败怎么办”“如何证明可靠”讲完整。
12. 延伸学习路线
第一阶段,先把 RocketMQ 基础概念吃透。重点理解 Producer、Consumer、Broker、NameServer、Topic、Queue、ConsumerGroup、Tag、普通消息、顺序消息、延迟消息、事务消息。不要只背定义,要结合商机流转思考每个概念解决什么问题。
第二阶段,深入消息可靠性。重点学习至少一次投递、重复消费、消费确认、重试队列、死信队列、消息堆积、消息过滤、消息轨迹。可以用一个“商机分配生成销售待办”的小 demo,故意让消费者抛异常、超时、重复启动,观察消息如何重试。
第三阶段,学习事务消息和本地消息表模式。对比两种方案的优缺点:事务消息链路更直接,但依赖 RocketMQ 事务回查机制;本地消息表更通用,可控性强,但需要定时扫描和状态维护。面试时能说出取舍,比只会说一个方案更有说服力。
第四阶段,补齐分布式一致性知识。理解强一致、最终一致、TCC、Saga、本地事务、补偿事务、对账机制。商机流转不是资金扣款场景,很多环节适合最终一致;但出单、佣金、保单核心状态要更谨慎。
第五阶段,做稳定性演练。模拟 Broker 短暂不可用、消费者宕机、数据库慢 SQL、消息重复投递、消息乱序、死信堆积。每一种故障都要能回答:业务影响是什么,监控怎么看,临时怎么止血,长期怎么修复。
第六阶段,把 MQ 和可观测性结合起来。学习日志 traceId、MDC、链路追踪、Prometheus 指标、Grafana 看板、告警分级。真正的生产经验不是“我接入了 MQ”,而是“我能在消息链路出问题时快速定位是生产端、Broker、消费端还是业务数据问题”。
对于这篇文章对应的简历经历,最终要形成一句能讲清楚的总结:
RocketMQ 在这个项目里不是为了追求高并发炫技,而是把 B2B 商机从线索、客户、销售、活动到出单的长链路拆成可观测、可重试、可补偿的领域事件流,在日均 3 万+ 流转和活动峰值下,通过削峰填谷、事务消息、幂等消费、死信处理和对账补偿,实现系统解耦与最终一致。