收发事务消息
本文提供使用 TCP 协议下的 Java SDK 收发事务消息的示例代码供您参考。
消息队列 RocketMQ 提供类似 X/Open XA 的分布式事务功能,通过消息队列 RocketMQ 事务消息,能达到分布式事务的最终一致。
说明:对于新手用户,建议在正式收发消息前,阅读 Demo 工程(TCP 版)来了解搭建消息队列 RocketMQ 工程的具体步骤。
交互流程
消息队列 RocketMQ 的事务消息交互流程如下图所示。
前提条件
您已完成以下操作:
- 下载 Java SDK。
- 准备环境;
- (可选)配置日志。
发送事务消息
说明:具体的示例代码,请以消息队列 RocketMQ 代码库为准。
发送事务消息包含以下两个步骤:
- 发送半事务消息(Half Message)及执行本地事务,示例代码如下。
public class TransactionProducerClient { private final static Logger log = ClientLogger.getLog(); // 您需要设置自己的日志,便于排查问题 public static void main(String[] args) throws InterruptedException { final BusinessService businessService = new BusinessService(); // 本地业务 Properties properties = new Properties(); // 您在控制台创建的 Group ID 注意:事务消息的 Group ID 不能与其他类型消息的 Group ID 共用 properties.put(PropertyKeyConst.GROUP_ID, "XXX"); // 身份验证,在消息队列管理控制台创建 properties.put(PropertyKeyConst.AccessKey, "XXX"); // 身份验证,在消息队列管理控制台创建 properties.put(PropertyKeyConst.SecretKey, "XXX"); // 设置 TCP 接入域名,进入控制台的实例管理页面的“获取接入点信息”区域查看 properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXX"); TransactionProducer producer = ONSFactory.createTransactionProducer(properties, new LocalTransactionCheckerImpl()); producer.start(); Message msg = new Message("Topic", "TagA", "Hello MQ transaction===".getBytes()); try { SendResult sendResult = producer.send(msg, new LocalTransactionExecuter() { @Override public TransactionStatus execute(Message msg, Object arg) { // 消息 ID(有可能消息体一样,但消息 ID 不一样,当前消息 ID 在控制台无法查询) String msgId = msg.getMsgID(); // 消息体内容进行 crc32,也可以使用其它的如 MD5 long crc32Id = HashUtil.crc32Code(msg.getBody()); // 消息 ID 和 crc32id 主要是用来防止消息重复 // 如果业务本身是幂等的,可以忽略,否则需要利用 msgId 或 crc32Id 来做幂等 // 如果要求消息绝对不重复,推荐做法是对消息体 body 使用 crc32 或 MD5 来防止重复消息 Object businessServiceArgs = new Object(); TransactionStatus transactionStatus = TransactionStatus.Unknow; try { boolean isCommit = businessService.execbusinessService(businessServiceArgs); if (isCommit) { // 本地事务成功则提交消息 transactionStatus = TransactionStatus.CommitTransaction; } else { // 本地事务失败则回滚消息 transactionStatus = TransactionStatus.RollbackTransaction; } } catch (Exception e) { log.error("Message Id:{}", msgId, e); } System.out.println(msg.getMsgID()); log.warn("Message Id:{}transactionStatus:{}", msgId, transactionStatus.name()); return transactionStatus; } }, null); } catch (Exception e) { // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理 System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic()); e.printStackTrace(); } // demo example 防止进程退出(实际使用不需要这样) TimeUnit.MILLISECONDS.sleep(Integer.MAX_VALUE); } }
- 提交事务消息状态
当本地事务执行完成(执行成功或执行失败),需要通知服务器当前消息的事务状态。通知方式有以下两种:- 执行本地事务完成后提交。
- 执行本地事务一直没提交状态,等待服务器回查消息的事务状态。
事务状态有以下三种: - TransactionStatus.CommitTransaction 提交事务,允许订阅方消费该消息。
- TransactionStatus.RollbackTransaction 回滚事务,消息将被丢弃不允许消费。
- TransactionStatus.Unknow 无法判断状态,期待消息队列 RocketMQ 的 Broker 向发送方再次询问该消息对应的本地事务的状态。
工具类public class LocalTransactionCheckerImpl implements LocalTransactionChecker { private final static Logger log = ClientLogger.getLog(); final BusinessService businessService = new BusinessService(); @Override public TransactionStatus check(Message msg) { //消息 ID(有可能消息体一样,但消息 ID 不一样,当前消息属于半事务消息,所以消息 ID 在控制台无法查询) String msgId = msg.getMsgID(); //消息体内容进行 crc32,也可以使用其它的方法如 MD5 long crc32Id = HashUtil.crc32Code(msg.getBody()); //消息 ID、消息本 crc32Id 主要是用来防止消息重复 //如果业务本身是幂等的,可以忽略,否则需要利用 msgId 或 crc32Id 来做幂等 //如果要求消息绝对不重复,推荐做法是对消息体使用 crc32 或 MD5 来防止重复消息 //业务自己的参数对象,这里只是一个示例,需要您根据实际情况来处理 Object businessServiceArgs = new Object(); TransactionStatus transactionStatus = TransactionStatus.Unknow; try { boolean isCommit = businessService.checkbusinessService(businessServiceArgs); if (isCommit) { //本地事务已成功则提交消息 transactionStatus = TransactionStatus.CommitTransaction; } else { //本地事务已失败则回滚消息 transactionStatus = TransactionStatus.RollbackTransaction; } } catch (Exception e) { log.error("Message Id:{}", msgId, e); } log.warn("Message Id:{}transactionStatus:{}", msgId, transactionStatus.name()); return transactionStatus; } }
import java.util.zip.CRC32; public class HashUtil { public static long crc32Code(byte[] bytes) { CRC32 crc32 = new CRC32(); crc32.update(bytes); return crc32.getValue(); } }
事务回查机制说明
- 发送事务消息为什么必须要实现回查 Check 机制?
当步骤 1 中半事务消息发送完成,但本地事务返回状态为 TransactionStatus.Unknow,或者应用退出导致本地事务未提交任何状态时,从 Broker 的角度看,这条 Half 状态的消息的状态是未知的。因此 Broker 会定期要求发送方能 Check 该 Half 状态消息,并上报其最终状态。 - Check 被回调时,业务逻辑都需要做些什么?
事务消息的 Check 方法里面,应该写一些检查事务一致性的逻辑。消息队列 RocketMQ 发送事务消息时需要实现 LocalTransactionChecker 接口,用来处理 Broker 主动发起的本地事务状态回查请求;因此在事务消息的 Check 方法中,需要完成两件事情:
i. 检查该半事务消息对应的本地事务的状态(committed or rollback)。
ii. 向 Broker 提交该半事务消息本地事务的状态。
文档反馈
(如有产品使用问题,请 提交工单)