云主机

  • 云主机服务 > 产品手册 > 消息队列 RocketMQ >SDK参考(TCP版本) >Java >收发事务消息

    收发事务消息

    最近更新时间:2019-09-05 15:53:39

    本文提供使用 TCP 协议下的 Java SDK 收发事务消息的示例代码供您参考。
    消息队列 RocketMQ 提供类似 X/Open XA 的分布式事务功能,通过消息队列 RocketMQ 事务消息,能达到分布式事务的最终一致。
    说明:对于新手用户,建议在正式收发消息前,阅读 Demo 工程(TCP 版)来了解搭建消息队列 RocketMQ 工程的具体步骤。

    交互流程

    消息队列 RocketMQ 的事务消息交互流程如下图所示。

    前提条件

    您已完成以下操作:

    • 下载 Java SDK。
    • 准备环境;
    • (可选)配置日志。

    发送事务消息

    说明:具体的示例代码,请以消息队列 RocketMQ 代码库为准。
    发送事务消息包含以下两个步骤:

    1. 发送半事务消息(Half Message)及执行本地事务,示例代码如下。
      public class TransactionProducerClient {
      private final static Logger log = ClientLogger.getLog(); // 您需要设置自己的日志,便于排查问题
      public static void main(String[] args) throws InterruptedException {
        final BusinessService businessService = new BusinessService(); // 本地业务
        Properties properties = new Properties();
        // 您在控制台创建的 Group ID 注意:事务消息的 Group ID 不能与其他类型消息的 Group ID 共用
        properties.put(PropertyKeyConst.GROUP_ID, "XXX");
        // 身份验证,在消息队列管理控制台创建
        properties.put(PropertyKeyConst.AccessKey, "XXX");
        // 身份验证,在消息队列管理控制台创建
        properties.put(PropertyKeyConst.SecretKey, "XXX");
        // 设置 TCP 接入域名,进入控制台的实例管理页面的“获取接入点信息”区域查看
        properties.put(PropertyKeyConst.NAMESRV_ADDR,
          "XXX");
        TransactionProducer producer = ONSFactory.createTransactionProducer(properties,
                new LocalTransactionCheckerImpl());
        producer.start();
        Message msg = new Message("Topic", "TagA", "Hello MQ transaction===".getBytes());
        try {
                SendResult sendResult = producer.send(msg, new LocalTransactionExecuter() {
                    @Override
                    public TransactionStatus execute(Message msg, Object arg) {
                        // 消息 ID(有可能消息体一样,但消息 ID 不一样,当前消息 ID 在控制台无法查询)
                        String msgId = msg.getMsgID();
                        // 消息体内容进行 crc32,也可以使用其它的如 MD5
                        long crc32Id = HashUtil.crc32Code(msg.getBody());
                        // 消息 ID 和 crc32id 主要是用来防止消息重复
                        // 如果业务本身是幂等的,可以忽略,否则需要利用 msgId 或 crc32Id 来做幂等
                        // 如果要求消息绝对不重复,推荐做法是对消息体 body 使用 crc32 或 MD5 来防止重复消息
                        Object businessServiceArgs = new Object();
                        TransactionStatus transactionStatus = TransactionStatus.Unknow;
                        try {
                            boolean isCommit =
                                businessService.execbusinessService(businessServiceArgs);
                            if (isCommit) {
                                // 本地事务成功则提交消息
                                transactionStatus = TransactionStatus.CommitTransaction;
                            } else {
                                // 本地事务失败则回滚消息
                                transactionStatus = TransactionStatus.RollbackTransaction;
                            }
                        } catch (Exception e) {
                            log.error("Message Id:{}", msgId, e);
                        }
                        System.out.println(msg.getMsgID());
                        log.warn("Message Id:{}transactionStatus:{}", msgId, transactionStatus.name());
                        return transactionStatus;
                    }
                }, null);
            }
            catch (Exception e) {
                // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理
                System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
                e.printStackTrace();
            }
        // demo example 防止进程退出(实际使用不需要这样)
        TimeUnit.MILLISECONDS.sleep(Integer.MAX_VALUE);
      }
      }
      
    2. 提交事务消息状态
      当本地事务执行完成(执行成功或执行失败),需要通知服务器当前消息的事务状态。通知方式有以下两种:
      • 执行本地事务完成后提交。
      • 执行本地事务一直没提交状态,等待服务器回查消息的事务状态。
        事务状态有以下三种:
      • TransactionStatus.CommitTransaction 提交事务,允许订阅方消费该消息。
      • TransactionStatus.RollbackTransaction 回滚事务,消息将被丢弃不允许消费。
      • TransactionStatus.Unknow 无法判断状态,期待消息队列 RocketMQ 的 Broker 向发送方再次询问该消息对应的本地事务的状态。
        public class LocalTransactionCheckerImpl implements LocalTransactionChecker {
        private final static Logger log = ClientLogger.getLog();
        final  BusinessService businessService = new BusinessService();
        @Override
        public TransactionStatus check(Message msg) {
          //消息 ID(有可能消息体一样,但消息 ID 不一样,当前消息属于半事务消息,所以消息 ID 在控制台无法查询)
          String msgId = msg.getMsgID();
          //消息体内容进行 crc32,也可以使用其它的方法如 MD5
          long crc32Id = HashUtil.crc32Code(msg.getBody());
          //消息 ID、消息本 crc32Id 主要是用来防止消息重复
          //如果业务本身是幂等的,可以忽略,否则需要利用 msgId 或 crc32Id 来做幂等
          //如果要求消息绝对不重复,推荐做法是对消息体使用 crc32 或 MD5 来防止重复消息
          //业务自己的参数对象,这里只是一个示例,需要您根据实际情况来处理
          Object businessServiceArgs = new Object();
          TransactionStatus transactionStatus = TransactionStatus.Unknow;
          try {
              boolean isCommit = businessService.checkbusinessService(businessServiceArgs);
              if (isCommit) {
                  //本地事务已成功则提交消息
                  transactionStatus = TransactionStatus.CommitTransaction;
              } else {
                  //本地事务已失败则回滚消息
                  transactionStatus = TransactionStatus.RollbackTransaction;
              }
          } catch (Exception e) {
              log.error("Message Id:{}", msgId, e);
          }
          log.warn("Message Id:{}transactionStatus:{}", msgId, transactionStatus.name());
          return transactionStatus;
        }
        }
        
        工具类
        import java.util.zip.CRC32;
        public class HashUtil {
        public static long crc32Code(byte[] bytes) {
           CRC32 crc32 = new CRC32();
           crc32.update(bytes);
           return crc32.getValue();
        }
        }
        

    事务回查机制说明

    • 发送事务消息为什么必须要实现回查 Check 机制?
      当步骤 1 中半事务消息发送完成,但本地事务返回状态为 TransactionStatus.Unknow,或者应用退出导致本地事务未提交任何状态时,从 Broker 的角度看,这条 Half 状态的消息的状态是未知的。因此 Broker 会定期要求发送方能 Check 该 Half 状态消息,并上报其最终状态。
    • Check 被回调时,业务逻辑都需要做些什么?
      事务消息的 Check 方法里面,应该写一些检查事务一致性的逻辑。消息队列 RocketMQ 发送事务消息时需要实现 LocalTransactionChecker 接口,用来处理 Broker 主动发起的本地事务状态回查请求;因此在事务消息的 Check 方法中,需要完成两件事情:
      i. 检查该半事务消息对应的本地事务的状态(committed or rollback)。
      ii. 向 Broker 提交该半事务消息本地事务的状态。
    以上内容是否对您有帮助?
  • Icon free helper
    Close