云主机

  • 云主机 > 使用指南 > 高级特性 >消息类型 >事务消息

    事务消息

    最近更新时间: 2019-09-06 12:13:39

    本文介绍消息队列 RocketMQ 事务消息的概念、适用场景、交互流程以及使用过程中的注意事项。

    概念介绍

    • 事务消息:消息队列 RocketMQ 提供类似 X/Open XA 的分布式事务功能,通过消息队列 RocketMQ 事务消息能达到分布式事务的最终一致。
    • 半事务消息:暂不能投递的消息,发送方已经成功地将消息发送到了消息队列 RocketMQ 服务端,但是服务端未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,处于该种状态下的消息即半事务消息。
    • 消息回查:由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,消息队列 RocketMQ 服务端通过扫描发现某条消息长期处于“半事务消息”时,需要主动向消息生产者询问该消息的最终状态(Commit 或是 Rollback),该询问过程即消息回查。

    适用场景

    事务消息的适用场景示例:
    通过购物车进行下单的流程中,用户入口在购物车系统,交易下单入口在交易系统,两个系统之间的数据需要保持最终一致,这时可以通过事务消息进行处理。交易系统下单之后,发送一条交易下单的消息到消息队列 RocketMQ,购物车系统订阅消息队列 RocketMQ 的交易下单消息,做相应的业务处理,更新购物车数据。

    交互流程

    消息队列 RocketMQ 事务消息交互流程如下所示。

    务消息发送步骤如下:

    1. 发送方将半事务消息发送至消息队列 RocketMQ 服务端。
    2. RocketMQ 服务端将消息持久化成功之后,向发送方返回 Ack 确认消息已经发送成功,此时消息为半事务消息。
    3. 发送方开始执行本地事务逻辑。
    4. 发送方根据本地事务执行结果向服务端提交二次确认(Commit 或是 Rollback),服务端收到 Commit 状态则将半事务消息标记为可投递,订阅方最终将收到该消息;服务端收到 Rollback 状态则删除半事务消息,订阅方将不会接受该消息。

    事务消息回查步骤如下:

    1. 在断网或者是应用重启的特殊情况下,上述步骤 4 提交的二次确认最终未到达服务端,经过固定时间后服务端将对该消息发起消息回查。
    2. 发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
    3. 发送方根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤 4 对半事务消息进行操作。

    注意事项

    1. 事务消息的 Group ID 不能与其他类型消息的 Group ID 共用。与其他类型的消息不同,事务消息有回查机制,回查时消息队列 RocketMQ 服务端会根据 Group ID 去查询客户端。
    2. 通过 ONSFactory.createTransactionProducer 创建事务消息的 Producer 时必须指定 LocalTransactionChecker 的实现类,处理异常情况下事务消息的回查。
    3. 事务消息发送完成本地事务后,可在 execute 方法中返回以下三种状态:
      • TransactionStatus.CommitTransaction:提交事务,允许订阅方消费该消息。
      • TransactionStatus.RollbackTransaction:回滚事务,消息将被丢弃不允许消费。
      • TransactionStatus.Unknow:暂时无法判断状态,等待固定时间以后消息队列 RocketMQ 服务端向发送方进行消息回查。
    4. 可通过以下方式给每条消息设定第一次消息回查的最快时间:
      Message message = new Message();
      // 在消息属性中添加第一次消息回查的最快时间,单位秒。例如,以下设置实际第一次回查时间为 120 秒 ~ 125 秒之间
      message.putUserProperties(PropertyKeyConst.CheckImmunityTimeInSeconds,"120");
      // 以上方式只确定事务消息的第一次回查的最快时间,实际回查时间向后浮动 0 秒 ~ 5 秒;如第一次回查后事务仍未提交,后续每隔 5 秒回查一次
      

    示例代码

    收发事务消息的示例代码如下:

    以上内容是否对您有帮助?
  • Qvm free helper
    Close