收发事务消息
目前支持的域包括公网、华东1、华北2、华东2、华南1。
交互流程
消息队列 RocketMQ 的事务消息的交互流程如下图所示:
发送事务消息
发送事务消息包含以下两个步骤:
- 发送半消息(Half Message)及执行本地事务。 示例代码如下:
 
 using System;
 using System.Collections.Generic;
 using System.Linq;
 using System.Text;
 using System.Runtime.InteropServices;
 using ons;
 namespace ons
 {
 public class MyLocalTransactionExecuter : LocalTransactionExecuter
 {
     public MyLocalTransactionExecuter()
     {
     }
     ~MyLocalTransactionExecuter()
     {
     }
     public override TransactionStatus execute(Message value)
     {
             Console.WriteLine("execute topic: {0}, tag:{1}, key:{2}, msgId:{3},msgbody:{4}, userProperty:{5}",
             value.getTopic(), value.getTag(), value.getKey(), value.getMsgID(), value.getBody(), value.getUserProperty("VincentNoUser"));
             // 消息 ID(有可能消息体一样,但消息 ID 不一样。当前消息 ID 在控制台无法查询)
             string msgId = value.getMsgID();
             // 消息体内容进行 crc32, 也可以使用其它的如 MD5
             // 消息 ID 和 crc32id 主要是用来防止消息重复
             // 如果要求消息绝对不重复,推荐做法是对消息体 body 使用 crc32或 MD5 来防止重复消息
             TransactionStatus transactionStatus = TransactionStatus.Unknow;
             try {
                 boolean isCommit = 本地事务执行结果;
                 if (isCommit) {
                     // 本地事务成功则提交消息
                     transactionStatus = TransactionStatus.CommitTransaction;
                 } else {
                     // 本地事务失败则回滚消息
                     transactionStatus = TransactionStatus.RollbackTransaction;
                 }
             } catch (Exception e) {
                 //exception handle
             }
             return transactionStatus;
     }
 }
 class onscsharp
 {
     static void Main(string[] args)
     {
         ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
         factoryInfo.setFactoryProperty(factoryInfo.NAMESRV_ADDR, "XXX");//设置 TCP 接入域名,进入控制台的实例管理页面的“获取接入点信息”区域查看
         factoryInfo.setFactoryProperty(factoryInfo.ProducerId, "");//您在控制台创建的 Group ID
         factoryInfo.setFactoryProperty(factoryInfo.PublishTopics, "");// 您在控制台创建的 Topic
         factoryInfo.setFactoryProperty(factoryInfo.MsgContent, "");//message body
         factoryInfo.setFactoryProperty(factoryInfo.AccessKey, "");//AccessKey 身份验证,在消息队列管理控制台创建
         factoryInfo.setFactoryProperty(factoryInfo.SecretKey, "");//SecretKey 身份验证,在消息队列管理控制台创建
         //create transaction producer       
         ONSFactory onsfactory = new ONSFactory();
         LocalTransactionChecker myChecker = new MyLocalTransactionChecker();
         TransactionProducer pProducer = onsfactory.getInstance().createTransactionProducer(factoryInfo,ref myChecker);
         // 在发送消息前,必须调用 start 方法来启动 Producer,只需调用一次即可,启动之后可以多线程并发发送消息
         pProducer.start();
             Message msg = new Message(
             //Message Topic
             factoryInfo.getPublishTopics(),
             //Message Tag
             "TagA",
             //Message Body
             factoryInfo.getMessageContent()
         );
         // 设置代表消息的业务关键属性,请尽可能全局唯一
         // 以方便您在无法正常收到消息情况下,可通过控制台查询消息并补发。
         // 注意:不设置也不会影响消息正常收发
         msg.setKey("ORDERID_100");
         // 发送消息,只要不抛出异常,就代表发送成功
         try
         {
             LocalTransactionExecuter myExecuter = new MyLocalTransactionExecuter();
             SendResultONS sendResult = pProducer.send(msg, ref myExecuter);
         }
         catch(ONSClientException e)
         {
             Console.WriteLine("\nexception of sendmsg:{0}",e.what() );
         }
         // 在应用退出前,必须销毁 Producer 对象,否则会导致内存泄露等问题;
         // shutdown 之后不能重新 start 此 producer
         pProducer.shutdown();
     }
 }
 }
- 提交事务消息状态
当本地事务执行完成(执行成功或执行失败),需要通知服务器当前消息的事务状态。 通知方式有以下两种:- 执行本地事务完成后提交。
 - 执行本地事务一直没提交状态,等待服务器回查消息的事务状态。
事务状态有以下三种: - TransactionStatus.CommitTransaction 提交事务,允许订阅方消费该消息。
 - TransactionStatus.RollbackTransaction 回滚事务,消息将被丢弃不允许消费。
 - TransactionStatus.Unknow 无法判断状态,期待 Broker 向发送方再次询问该消息对应的本地事务的状态。
 
 
    public class MyLocalTransactionChecker : LocalTransactionChecker
    {
        public MyLocalTransactionChecker()
        {
        }
        ~MyLocalTransactionChecker()
        {
        }
        public override TransactionStatus check(Message value)
        {
                Console.WriteLine("check topic: {0}, tag:{1}, key:{2}, msgId:{3},msgbody:{4}, userProperty:{5}",
                value.getTopic(), value.getTag(), value.getKey(), value.getMsgID(), value.getBody(), value.getUserProperty("VincentNoUser"));
                // 消息 ID(有可能消息体一样,但消息 ID 不一样。当前消息 ID 在控制台无法查询)
                string msgId = value.getMsgID();
                // 消息体内容进行 crc32, 也可以使用其它的如 MD5
                // 消息 ID 和 crc32id 主要是用来防止消息重复
                // 如果业务本身是幂等的, 可以忽略,否则需要利用 msgId 或 crc32Id 来做幂等
                // 如果要求消息绝对不重复,推荐做法是对消息体 body 使用 crc32或 MD5 来防止重复消息 
                TransactionStatus transactionStatus = TransactionStatus.Unknow;
                try {
                    boolean isCommit = 本地事务执行结果;
                    if (isCommit) {
                        // 本地事务成功、提交消息
                        transactionStatus = TransactionStatus.CommitTransaction;
                    } else {
                        // 本地事务失败、回滚消息
                        transactionStatus = TransactionStatus.RollbackTransaction;
                    }
                } catch (Exception e) {
                    //exception handle
                }
                return transactionStatus;
        }
        }
事务回查机制说明
- 发送事务消息为什么必须要实现 Check 机制?
当步骤 1 半消息发送完成,但本地事务返回状态为 TransactionStatus.Unknow 时,亦或是应用退出导致本地事务未提交任何状态时,从 Broker 的角度看,这条半状态的消息的状态是未知的,因此 Broker 会定期要求发送方能 Check 该半状态消息,并上报其最终状态。 - Check 被回调时,业务逻辑都需要做些什么?
事务消息的 Check 方法里面,应该写一些检查事务一致性的逻辑。 消息队列 RocketMQ 发送事务消息时需要实现 LocalTransactionChecker 接口,用来处理 Broker 主动发起的本地事务状态回查请求;因此在事务消息的 Check 方法中,需要完成两件事情:
(1)检查该半消息对应的本地事务的状态(committed or rollback)。
(2)向 Broker 提交该半消息本地事务的状态。 - 本地事务的不同状态对半消息的影响?
- TransactionStatus.CommitTransaction 提交事务,允许订阅方消费该消息。
 - TransactionStatus.RollbackTransaction 回滚事务,消息将被丢弃不允许消费。
 - TransactionStatus.Unknow 无法判断状态,期待 Broker 向发送方再次询问该消息对应的本地事务的状态。
具体代码详见 MyLocalTransactionChecker 的实现。 
 
    文档反馈
    (如有产品使用问题,请 提交工单)