订阅消息
本文介绍如何通过消息队列 RocketMQ 的 Java SDK 订阅消息。
订阅方式
消息队列 RocketMQ 支持以下两种订阅方式:
集群订阅
同一个 Group ID 所标识的所有 Consumer 平均分摊消费消息。例如某个 Topic 有 9 条消息,一个 Group ID 有 3 个 Consumer 实例,那么在集群消费模式下每个实例平均分摊,只消费其中的 3 条消息。// 集群订阅方式设置(不设置的情况下,默认为集群订阅方式) properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
广播订阅
同一个 Group ID 所标识的所有 Consumer 都会各自消费某条消息一次。例如某个 Topic 有 9 条消息,一个 Group ID 有 3 个 Consumer 实例,那么在广播消费模式下每个实例都会各自消费 9 条消息。// 广播订阅方式设置 properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
注意事项
请确保同一个 Group ID 下所有 Consumer 实例的订阅关系保持一致,详情请参见订阅关系一致。
两种不同的订阅方式有着不同的功能限制,例如,广播模式不支持顺序消息、不维护消费进度、不支持重置消费位点等,详情请参见集群消费和广播消费。
示例代码
具体的示例代码,请以消息队列 RocketMQ 代码库为准。
public class ConsumerTest {
public static void main(String[] args) {
Properties properties = new Properties();
// 您在控制台创建的 Group ID
properties.put(PropertyKeyConst.GROUP_ID, "XXX");
// 身份验证,在消息队列管理控制台创建
properties.put(PropertyKeyConst.AccessKey, "XXX");
// 身份验证,在消息队列管理控制台创建
properties.put(PropertyKeyConst.SecretKey, "XXX");
// 设置 TCP 接入域名,进入控制台的实例管理页面的“获取接入点信息”区域查看
properties.put(PropertyKeyConst.NAMESRV_ADDR,
"XXX");
// 集群订阅方式 (默认)
// properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
// 广播订阅方式
// properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
Consumer consumer = ONSFactory.createConsumer(properties);
consumer.subscribe("TopicTestMQ", "TagA||TagB", new MessageListener() { //订阅多个 Tag
public Action consume(Message message, ConsumeContext context) {
System.out.println("Receive: " + message);
return Action.CommitMessage;
}
});
//订阅另外一个 Topic
consumer.subscribe("TopicTestMQ-Other", "*", new MessageListener() { //订阅全部 Tag
public Action consume(Message message, ConsumeContext context) {
System.out.println("Receive: " + message);
return Action.CommitMessage;
}
});
consumer.start();
System.out.println("Consumer Started");
}
}
文档反馈
(如有产品使用问题,请 提交工单)