RocketMQ 事务消息与最终一致性
1. 对应简历段落
对应简历中可以这样描述:
在 B2B 商机与出单流转链路中,负责基于 RocketMQ 事务消息建设最终一致性方案。针对商机创建、销售待办生成、活动入池、出单回写等跨系统状态同步场景,设计本地事务表、事务回查、消费幂等、死信补偿和对账任务,解决数据库提交成功但消息丢失、消息重复消费、消费者异常导致状态不一致等问题,保障核心业务链路在高峰期可恢复、可追踪、可补偿。
这段经历的重点不是“会用事务消息 API”,而是能解释为什么需要它、它解决哪一段一致性问题、它不能解决什么问题,以及项目如何靠监控和补偿把最终一致性真正落地。
2. 业务背景
在商机系统里,一个业务动作经常会产生多个后续影响。例如销售创建一条商机后,商机中心要落库,客户中心要更新客户标签,销售工作台要生成待办,活动系统要判断是否纳入活动名单,报表系统要统计渠道转化。同步调用看起来直接,但任何一个下游失败都会拖垮主流程。
如果完全异步,又会遇到经典问题:商机已经写入数据库,但 MQ 发送失败,下游永远不知道这条商机;或者消息已经发送出去,但本地事务回滚,消费者拿到一条不存在的商机。这个问题不是普通重试能彻底解决的,因为进程可能在数据库提交和消息发送之间宕机,也可能在发送成功后还没记录状态就崩掉。
所以项目里会把“核心状态落库”和“业务事件发布”绑定起来。事务消息关注的是源头事件不丢、不误发,消费者侧再通过幂等、重试、死信和对账达成最终一致。
3. 核心原理
RocketMQ 事务消息采用两阶段思路。生产者先发送半消息,Broker 保存但暂不投递;生产者执行本地事务;本地事务成功后提交消息,Broker 才投递给消费者;本地事务失败则回滚消息。若生产者在提交或回滚前异常,Broker 会发起事务回查,由生产者根据本地事务状态决定提交、回滚或继续未知。
这个机制解决的是“本地事务和消息发送的一致性”。它不负责消费者处理成功,也不负责跨系统强一致事务。消费者仍然可能失败、超时、重复执行或乱序执行,因此最终一致性方案至少由四部分组成:
- 源头可靠发布:事务消息或本地消息表保证业务事件不丢。
- 消费幂等:同一消息重复投递不会重复产生业务副作用。
- 失败可见:重试、死信、异常表和告警让失败不被吞掉。
- 对账补偿:定时扫描业务状态和消息状态,修复长期不一致。
面试时要强调边界。事务消息不是分布式事务的银弹,它不会让多个系统的数据库同时提交或回滚。它更适合“本系统状态已经确定,通知其他系统最终跟上”的场景。
4. 项目落地
以“出单完成后回写商机阶段”为例。出单系统收到核心出单结果后,需要更新本地保单状态,并通知商机中心把商机阶段改成已出单,同时报表系统统计成交金额。
生产者侧流程可以这样设计:
- 生成全局
eventId,消息体包含policyId、opportunityId、customerId、amount、version、traceId。 - 发送 RocketMQ 半消息。
- 在本地事务中更新保单状态为
ISSUED,插入事件表,事件状态记为COMMIT。 - 本地事务成功后提交半消息。
- 如果提交阶段异常,Broker 后续通过回查确认事件表状态。
事件表不是摆设,它是回查、审计和补偿的依据。字段通常包括 event_id、biz_id、event_type、payload、tx_status、send_status、retry_count、created_at、updated_at。
消费者侧分为商机消费者和报表消费者。商机消费者收到 POLICY_ISSUED 后,先检查 eventId + consumer_group 是否已成功处理,再根据商机当前阶段和版本号判断是否允许更新。报表消费者则使用 policyId + metric_type 做唯一键,避免重复统计。
一致性目标也要写清楚:出单完成后 30 秒内商机阶段应更新,超过 5 分钟告警;报表允许分钟级延迟;每天凌晨跑一次出单与商机状态对账,发现出单已完成但商机未更新,则重新投递事件或生成补偿工单。
5. 关键配置或伪代码
简化后的事务消息发送伪代码:
TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(
"policy_tx_group",
"b2b_policy_event:POLICY_ISSUED",
MessageBuilder.withPayload(event)
.setHeader("KEYS", event.getEventId())
.setHeader("TRACE_ID", traceId)
.build(),
event
);
事务监听器示例:
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
PolicyIssuedEvent event = (PolicyIssuedEvent) arg;
try {
transactionTemplate.executeWithoutResult(status -> {
policyRepository.markIssued(event.getPolicyId(), event.getAmount());
eventRepository.insert(event.getEventId(), event.getPolicyId(),
"POLICY_ISSUED", "COMMIT", toJson(event));
});
return LocalTransactionState.COMMIT_MESSAGE;
} catch (BizException e) {
eventRepository.markRollback(event.getEventId(), e.getMessage());
return LocalTransactionState.ROLLBACK_MESSAGE;
} catch (Exception e) {
return LocalTransactionState.UNKNOW;
}
}
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
String eventId = msg.getKeys();
EventRecord record = eventRepository.findByEventId(eventId);
if (record == null) {
return LocalTransactionState.UNKNOW;
}
if ("COMMIT".equals(record.getTxStatus())) {
return LocalTransactionState.COMMIT_MESSAGE;
}
if ("ROLLBACK".equals(record.getTxStatus())) {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
return LocalTransactionState.UNKNOW;
}
消费者幂等伪代码:
public ConsumeConcurrentlyStatus consume(PolicyIssuedEvent event) {
String consumer = "opportunity-stage-consumer";
if (consumeLogRepository.success(event.getEventId(), consumer)) {
return CONSUME_SUCCESS;
}
transactionTemplate.executeWithoutResult(status -> {
consumeLogRepository.tryStart(event.getEventId(), consumer);
Opportunity opp = opportunityRepository.lockById(event.getOpportunityId());
if (opp.getVersion() < event.getVersion()
&& opp.canTransferTo("POLICY_ISSUED")) {
opportunityRepository.markPolicyIssued(opp.getId(), event.getPolicyId(), event.getVersion());
}
consumeLogRepository.markSuccess(event.getEventId(), consumer);
});
return CONSUME_SUCCESS;
}
6. 常见坑
第一个坑是以为事务消息等于分布式事务。它只保证本地事务与消息投递的一致,不保证消费者一定成功。消费者失败后仍要靠重试和补偿。
第二个坑是回查逻辑依赖内存。Broker 回查可能发生在生产者重启后,必须查数据库事件表,不能靠本地 Map 判断事务状态。
第三个坑是本地事务已经成功,但事件表没有记录完整 payload。后续排查、补发、审计都会缺信息。事件表至少要能重建消息。
第四个坑是消费者没有幂等。事务消息解决的是“不丢、不误发”,不解决“只消费一次”。RocketMQ 的消费语义仍然要按至少一次理解。
第五个坑是无限重试不可恢复错误。比如消息字段缺失、商机不存在且确认无法补偿,反复重试只会制造堆积。应该记录异常表并告警。
第六个坑是没有对账。最终一致性不是相信消息迟早成功,而是能发现没有成功的那一小部分,并且有明确修复手段。
7. 面试追问
面试官可能会问:
- 为什么不用本地消息表,而要用 RocketMQ 事务消息?
- Broker 回查时生产者已经重启怎么办?
- 消费者执行成功但 ack 失败,会不会重复处理?
- 事务消息能不能保证商机系统和报表系统同时成功?
- 如果消息进入死信队列,你们怎么恢复?
- 如何判断最终一致性的时间窗口是否合理?
这些问题都在考察你是否理解事务消息的能力边界。回答时不要把所有问题都推给 MQ,要把数据库事务、幂等表、状态机、监控、补偿一起讲出来。
8. 推荐回答
可以这样回答:
“我们当时的核心矛盾是本地业务状态和跨系统事件之间的一致性。比如出单系统把保单标记为已出单后,必须让商机和报表最终收到事件。如果先提交数据库再发 MQ,中间宕机会丢事件;如果先发 MQ 再提交数据库,又可能消费者看到一条本地回滚的消息。所以关键事件用了 RocketMQ 事务消息。生产者先发半消息,再执行本地事务,本地事务里更新业务表并写事件表,成功后提交消息。Broker 如果没收到提交结果,会根据 eventId 回查事件表。”
“但我不会把它说成分布式事务。它只保证事件可靠发布,消费者仍然按至少一次处理,所以每个消费者都有幂等。商机阶段更新用 eventId 加 consumerGroup 记录消费日志,同时用商机版本号和状态机防止旧消息覆盖新状态。报表侧用业务唯一键避免重复统计。失败分可恢复和不可恢复,可恢复异常交给 RocketMQ 重试,不可恢复异常落异常表。重试耗尽进入死信后,会有补偿任务和人工看板处理。”
“最终一致性上,我们定义了业务 SLA,比如出单回写 30 秒内完成,超过 5 分钟告警。每天还有出单表、商机表和事件表的对账任务,发现状态缺口可以按事件表 payload 补发。这样设计的重点不是依赖 MQ 永不出错,而是让每个失败都有状态、有日志、有告警、有补偿。”
9. 延伸学习路线
第一步,把 RocketMQ 普通消息、顺序消息、延迟消息、事务消息的适用场景区分清楚,重点理解半消息、提交、回滚和事务回查。
第二步,学习本地消息表、Outbox Pattern、CDC 投递和事务消息的差异。面试中能说出为什么某些团队选择本地消息表,是很加分的。
第三步,补齐一致性基础,包括 CAP、BASE、TCC、Saga、可靠事件模式、幂等设计和状态机建模。
第四步,做一次完整演练:模拟数据库提交后进程宕机、消费者重复消费、死信堆积、补偿补发。能讲出故障时怎么查,比只讲正常流程更像真实项目经验。