Spring 集成
本文介绍如何在 Spring 框架下用消息队列 RocketMQ 收发消息。主要包括以下三部分内容:
- 普通消息生产者和 Spring 集成
- 事务消息生产者和 Spring 集成
消息消费者和 Spring 集成
请确保同一个 Group ID 下所有 Consumer 实例的订阅关系保持一致。详情请参见订阅关系一致。
Spring 框架下支持的配置参数和 TCP Java 一致。详情请参见 Java SDK 使用说明。
客户端(ons-client)版本的详细请参见 Java SDK 版本说明。生产者与 Spring 集成
在 producer.xml 中定义生产者 Bean 等信息。
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <bean id="producer" class="com.aliyun.openservices.ons.api.bean.ProducerBean" init-method="start" destroy-method="shutdown"> <!-- Spring 接入方式支持 Java SDK 支持的所有配置项 --> <property name="properties" > <!--生产者配置信息--> <props> <prop key="AccessKey">XXX</prop> <prop key="SecretKey">XXX</prop> <!-- ons-client 版本是 1.8.0.Final(版本说明请参见本页顶部 Java SDK 版本说明),需要配置(从实例管理页面复制 TCP 协议接入点) <prop key="NAMESRV_ADDR">XXX</prop> --> </props> </property> </bean> </beans>
通过已经与 Spring 集成好的生产者生产消息。
public class ProduceWithSpring { public static void main(String[] args) { /** * 生产者 Bean 配置在 producer.xml 中,可通过 ApplicationContext 获取或者直接注入到其他类(比如具体的 Controller)中 */ ApplicationContext context = new ClassPathXmlApplicationContext("producer.xml"); Producer producer = (Producer) context.getBean("producer"); //循环发送消息 for (int i = 0; i < 100; i++) { Message msg = new Message( // // Message 所属的 Topic "TopicTestMQ", // Message Tag 可理解为 Gmail 中的标签,对消息进行再归类,方便 Consumer 指定过滤条件在消息队列 RocketMQ 的服务器过滤 "TagA", // Message Body 可以是任何二进制形式的数据, 消息队列 RocketMQ 不做任何干预 // 需要 Producer 与 Consumer 协商好一致的序列化和反序列化方式 "Hello MQ".getBytes()); // 设置代表消息的业务关键属性,请尽可能全局唯一 // 以方便您在无法正常收到消息情况下,可通过控制台查询消息并补发 // 注意:不设置也不会影响消息正常收发 msg.setKey("ORDERID_100"); // 发送消息,只要不抛异常就是成功 try { SendResult sendResult = producer.send(msg); assert sendResult != null; System.out.println("send success: " + sendResult.getMessageId()); }catch (ONSClientException e) { System.out.println("发送失败"); } } } }
事务消息生产者与 Spring 集成
事务消息的概念详情请参见收发事务消息。
首先需要实现一个 LocalTransactionChecker,如下所示。 一个消息生产者只能有一个 LocalTransactionChecker。
public class DemoLocalTransactionChecker implements LocalTransactionChecker { public TransactionStatus check(Message msg) { System.out.println("开始回查本地事务状态"); return TransactionStatus.CommitTransaction; //根据本地事务状态检查结果返回不同的 TransactionStatus } }
在 transactionProducer.xml 中定义事务消息生产者 Bean 等信息。
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <bean id="localTransactionChecker" class="demo.DemoLocalTransactionChecker"></bean> <bean id="transactionProducer" class="com.aliyun.openservices.ons.api.bean.TransactionProducerBean" init-method="start" destroy-method="shutdown"> <property name="properties" > <!--事务消息生产者配置信息--> <props> <prop key="AccessKey">AKDEMO</prop> <prop key="SecretKey">SKDEMO</prop> <prop key="GROUP_ID">GID_DEMO</prop> <!-- ons-client 版本是 1.8.0.Final(版本说明请参见本页顶部 Java SDK 版本说明),需要配置(从实例管理页面复制 TCP 协议接入点) <prop key="NAMESRV_ADDR">XXX</prop> --> </props> </property> <property name="localTransactionChecker" ref="localTransactionChecker"></property> </bean> </beans>
通过已经与 Spring 集成好的生产者生产事务消息。
public class ProduceTransMsgWithSpring { public static void main(String[] args) { /** * 事务消息生产者 Bean 配置在 transactionProducer.xml 中,可通过 ApplicationContext 获取或者直接注入到其他类(比如具体的 Controller)中 * 请结合例子"发送事务消息" */ ApplicationContext context = new ClassPathXmlApplicationContext("transactionProducer.xml"); TransactionProducer transactionProducer = (TransactionProducer) context.getBean("transactionProducer"); Message msg = new Message("XXX", "TagA", "Hello MQ transaction===".getBytes()); SendResult sendResult = transactionProducer.send(msg, new LocalTransactionExecuter() { @Override public TransactionStatus execute(Message msg, Object arg) { System.out.println("执行本地事务"); return TransactionStatus.CommitTransaction; //根据本地事务执行结果来返回不同的 TransactionStatus } }, null); } }
消费者与 Spring 集成
创建 MessageListener,如下所示。
public class DemoMessageListener implements MessageListener { public Action consume(Message message, ConsumeContext context) { System.out.println("Receive: " + message.getMsgID()); try { //do something.. return Action.CommitMessage; }catch (Exception e) { //消费失败 return Action.ReconsumeLater; } } }
在 consumer.xml 中定义消费者 Bean 等信息。
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <bean id="msgListener" class="demo.DemoMessageListener"></bean> <!--Listener 配置--> <!-- Group ID 订阅同一个 Topic,可以创建多个 ConsumerBean--> <bean id="consumer" class="com.aliyun.openservices.ons.api.bean.ConsumerBean" init-method="start" destroy-method="shutdown"> <property name="properties" > <!--消费者配置信息--> <props> <prop key="GROUP_ID">GID_DEMO</prop> <!--请替换 XXX--> <prop key="AccessKey">AKDEMO</prop> <prop key="SecretKey">SKDEMO</prop> <!-- ons-client 版本在 1.8.0.Final(版本说明请参见本页顶部 Java SDK 版本说明),需要配置(从实例管理页面复制 TCP 协议接入点) <prop key="NAMESRV_ADDR">XXX</prop> --> <!--将消费者线程数固定为 50 个 <prop key="ConsumeThreadNums">50</prop> --> </props> </property> <property name="subscriptionTable"> <map> <entry value-ref="msgListener"> <key> <bean class="com.aliyun.openservices.ons.api.bean.Subscription"> <property name="topic" value="TopicTestMQ"/> <property name="expression" value="*"/><!--expression 即 Tag,可以设置成具体的 Tag,如 taga||tagb||tagc,也可设置成 *。 * 仅代表订阅所有 Tag,不支持通配--> </bean> </key> </entry> <!--更多的订阅添加 entry 节点即可,如下所示--> <entry value-ref="msgListener"> <key> <bean class="com.aliyun.openservices.ons.api.bean.Subscription"> <property name="topic" value="TopicTestMQ-Other"/> <!--订阅另外一个 Topic --> <property name="expression" value="taga||tagb"/> <!-- 订阅多个 Tag --> </bean> </key> </entry> </map> </property> </bean> </beans>
运行已经与 Spring 集成好的消费者,如下所示。
public class ConsumeWithSpring { public static void main(String[] args) { /** * 消费者 Bean 配置在 consumer.xml 中,可通过 ApplicationContext 获取或者直接注入到其他类(比如具体的 Controller)中 */ ApplicationContext context = new ClassPathXmlApplicationContext("consumer.xml"); System.out.println("Consumer Started"); } }
文档反馈
(如有产品使用问题,请 提交工单)