RocketMQ 事务消息与最终一致性

1. 对应简历段落

对应简历中可以这样描述:

在 B2B 商机与出单流转链路中,负责基于 RocketMQ 事务消息建设最终一致性方案。针对商机创建、销售待办生成、活动入池、出单回写等跨系统状态同步场景,设计本地事务表、事务回查、消费幂等、死信补偿和对账任务,解决数据库提交成功但消息丢失、消息重复消费、消费者异常导致状态不一致等问题,保障核心业务链路在高峰期可恢复、可追踪、可补偿。

这段经历的重点不是“会用事务消息 API”,而是能解释为什么需要它、它解决哪一段一致性问题、它不能解决什么问题,以及项目如何靠监控和补偿把最终一致性真正落地。

2. 业务背景

在商机系统里,一个业务动作经常会产生多个后续影响。例如销售创建一条商机后,商机中心要落库,客户中心要更新客户标签,销售工作台要生成待办,活动系统要判断是否纳入活动名单,报表系统要统计渠道转化。同步调用看起来直接,但任何一个下游失败都会拖垮主流程。

如果完全异步,又会遇到经典问题:商机已经写入数据库,但 MQ 发送失败,下游永远不知道这条商机;或者消息已经发送出去,但本地事务回滚,消费者拿到一条不存在的商机。这个问题不是普通重试能彻底解决的,因为进程可能在数据库提交和消息发送之间宕机,也可能在发送成功后还没记录状态就崩掉。

所以项目里会把“核心状态落库”和“业务事件发布”绑定起来。事务消息关注的是源头事件不丢、不误发,消费者侧再通过幂等、重试、死信和对账达成最终一致。

3. 核心原理

RocketMQ 事务消息采用两阶段思路。生产者先发送半消息,Broker 保存但暂不投递;生产者执行本地事务;本地事务成功后提交消息,Broker 才投递给消费者;本地事务失败则回滚消息。若生产者在提交或回滚前异常,Broker 会发起事务回查,由生产者根据本地事务状态决定提交、回滚或继续未知。

这个机制解决的是“本地事务和消息发送的一致性”。它不负责消费者处理成功,也不负责跨系统强一致事务。消费者仍然可能失败、超时、重复执行或乱序执行,因此最终一致性方案至少由四部分组成:

  1. 源头可靠发布:事务消息或本地消息表保证业务事件不丢。
  2. 消费幂等:同一消息重复投递不会重复产生业务副作用。
  3. 失败可见:重试、死信、异常表和告警让失败不被吞掉。
  4. 对账补偿:定时扫描业务状态和消息状态,修复长期不一致。

面试时要强调边界。事务消息不是分布式事务的银弹,它不会让多个系统的数据库同时提交或回滚。它更适合“本系统状态已经确定,通知其他系统最终跟上”的场景。

4. 项目落地

以“出单完成后回写商机阶段”为例。出单系统收到核心出单结果后,需要更新本地保单状态,并通知商机中心把商机阶段改成已出单,同时报表系统统计成交金额。

生产者侧流程可以这样设计:

  1. 生成全局 eventId,消息体包含 policyIdopportunityIdcustomerIdamountversiontraceId
  2. 发送 RocketMQ 半消息。
  3. 在本地事务中更新保单状态为 ISSUED,插入事件表,事件状态记为 COMMIT
  4. 本地事务成功后提交半消息。
  5. 如果提交阶段异常,Broker 后续通过回查确认事件表状态。

事件表不是摆设,它是回查、审计和补偿的依据。字段通常包括 event_idbiz_idevent_typepayloadtx_statussend_statusretry_countcreated_atupdated_at

消费者侧分为商机消费者和报表消费者。商机消费者收到 POLICY_ISSUED 后,先检查 eventId + consumer_group 是否已成功处理,再根据商机当前阶段和版本号判断是否允许更新。报表消费者则使用 policyId + metric_type 做唯一键,避免重复统计。

一致性目标也要写清楚:出单完成后 30 秒内商机阶段应更新,超过 5 分钟告警;报表允许分钟级延迟;每天凌晨跑一次出单与商机状态对账,发现出单已完成但商机未更新,则重新投递事件或生成补偿工单。

5. 关键配置或伪代码

简化后的事务消息发送伪代码:

TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(
        "policy_tx_group",
        "b2b_policy_event:POLICY_ISSUED",
        MessageBuilder.withPayload(event)
                .setHeader("KEYS", event.getEventId())
                .setHeader("TRACE_ID", traceId)
                .build(),
        event
);

事务监听器示例:

public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
    PolicyIssuedEvent event = (PolicyIssuedEvent) arg;
    try {
        transactionTemplate.executeWithoutResult(status -> {
            policyRepository.markIssued(event.getPolicyId(), event.getAmount());
            eventRepository.insert(event.getEventId(), event.getPolicyId(),
                    "POLICY_ISSUED", "COMMIT", toJson(event));
        });
        return LocalTransactionState.COMMIT_MESSAGE;
    } catch (BizException e) {
        eventRepository.markRollback(event.getEventId(), e.getMessage());
        return LocalTransactionState.ROLLBACK_MESSAGE;
    } catch (Exception e) {
        return LocalTransactionState.UNKNOW;
    }
}

public LocalTransactionState checkLocalTransaction(MessageExt msg) {
    String eventId = msg.getKeys();
    EventRecord record = eventRepository.findByEventId(eventId);
    if (record == null) {
        return LocalTransactionState.UNKNOW;
    }
    if ("COMMIT".equals(record.getTxStatus())) {
        return LocalTransactionState.COMMIT_MESSAGE;
    }
    if ("ROLLBACK".equals(record.getTxStatus())) {
        return LocalTransactionState.ROLLBACK_MESSAGE;
    }
    return LocalTransactionState.UNKNOW;
}

消费者幂等伪代码:

public ConsumeConcurrentlyStatus consume(PolicyIssuedEvent event) {
    String consumer = "opportunity-stage-consumer";
    if (consumeLogRepository.success(event.getEventId(), consumer)) {
        return CONSUME_SUCCESS;
    }

    transactionTemplate.executeWithoutResult(status -> {
        consumeLogRepository.tryStart(event.getEventId(), consumer);
        Opportunity opp = opportunityRepository.lockById(event.getOpportunityId());
        if (opp.getVersion() < event.getVersion()
                && opp.canTransferTo("POLICY_ISSUED")) {
            opportunityRepository.markPolicyIssued(opp.getId(), event.getPolicyId(), event.getVersion());
        }
        consumeLogRepository.markSuccess(event.getEventId(), consumer);
    });
    return CONSUME_SUCCESS;
}

6. 常见坑

第一个坑是以为事务消息等于分布式事务。它只保证本地事务与消息投递的一致,不保证消费者一定成功。消费者失败后仍要靠重试和补偿。

第二个坑是回查逻辑依赖内存。Broker 回查可能发生在生产者重启后,必须查数据库事件表,不能靠本地 Map 判断事务状态。

第三个坑是本地事务已经成功,但事件表没有记录完整 payload。后续排查、补发、审计都会缺信息。事件表至少要能重建消息。

第四个坑是消费者没有幂等。事务消息解决的是“不丢、不误发”,不解决“只消费一次”。RocketMQ 的消费语义仍然要按至少一次理解。

第五个坑是无限重试不可恢复错误。比如消息字段缺失、商机不存在且确认无法补偿,反复重试只会制造堆积。应该记录异常表并告警。

第六个坑是没有对账。最终一致性不是相信消息迟早成功,而是能发现没有成功的那一小部分,并且有明确修复手段。

7. 面试追问

面试官可能会问:

  1. 为什么不用本地消息表,而要用 RocketMQ 事务消息?
  2. Broker 回查时生产者已经重启怎么办?
  3. 消费者执行成功但 ack 失败,会不会重复处理?
  4. 事务消息能不能保证商机系统和报表系统同时成功?
  5. 如果消息进入死信队列,你们怎么恢复?
  6. 如何判断最终一致性的时间窗口是否合理?

这些问题都在考察你是否理解事务消息的能力边界。回答时不要把所有问题都推给 MQ,要把数据库事务、幂等表、状态机、监控、补偿一起讲出来。

8. 推荐回答

可以这样回答:

“我们当时的核心矛盾是本地业务状态和跨系统事件之间的一致性。比如出单系统把保单标记为已出单后,必须让商机和报表最终收到事件。如果先提交数据库再发 MQ,中间宕机会丢事件;如果先发 MQ 再提交数据库,又可能消费者看到一条本地回滚的消息。所以关键事件用了 RocketMQ 事务消息。生产者先发半消息,再执行本地事务,本地事务里更新业务表并写事件表,成功后提交消息。Broker 如果没收到提交结果,会根据 eventId 回查事件表。”

“但我不会把它说成分布式事务。它只保证事件可靠发布,消费者仍然按至少一次处理,所以每个消费者都有幂等。商机阶段更新用 eventId 加 consumerGroup 记录消费日志,同时用商机版本号和状态机防止旧消息覆盖新状态。报表侧用业务唯一键避免重复统计。失败分可恢复和不可恢复,可恢复异常交给 RocketMQ 重试,不可恢复异常落异常表。重试耗尽进入死信后,会有补偿任务和人工看板处理。”

“最终一致性上,我们定义了业务 SLA,比如出单回写 30 秒内完成,超过 5 分钟告警。每天还有出单表、商机表和事件表的对账任务,发现状态缺口可以按事件表 payload 补发。这样设计的重点不是依赖 MQ 永不出错,而是让每个失败都有状态、有日志、有告警、有补偿。”

9. 延伸学习路线

第一步,把 RocketMQ 普通消息、顺序消息、延迟消息、事务消息的适用场景区分清楚,重点理解半消息、提交、回滚和事务回查。

第二步,学习本地消息表、Outbox Pattern、CDC 投递和事务消息的差异。面试中能说出为什么某些团队选择本地消息表,是很加分的。

第三步,补齐一致性基础,包括 CAP、BASE、TCC、Saga、可靠事件模式、幂等设计和状态机建模。

第四步,做一次完整演练:模拟数据库提交后进程宕机、消费者重复消费、死信堆积、补偿补发。能讲出故障时怎么查,比只讲正常流程更像真实项目经验。