订阅消息
本文介绍如何通过消息队列 RocketMQ 的 SDK 使用.NET 语言进行消息订阅。
说明:请确保同一个 Group ID 下所有 Consumer 实例的订阅关系保持一致,详见订阅关系一致。
订阅方式
- 消息队列 RocketMQ 支持以下两种订阅方式:
集群订阅:同一个 Group ID 所标识的所有 Consumer 平均分摊消费消息。 例如某个 Topic 有 9 条消息,一个 Group ID 有 3 个 Consumer 实例,那么在集群消费模式下每个实例平均分摊,只消费其中的 3 条消息。// 集群订阅方式设置(不设置的情况下,默认为集群订阅方式) factoryInfo.setFactoryProperty(ONSFactoryProperty.MessageModel, ONSFactoryProperty.CLUSTERING);
- 广播订阅:同一个 Group ID 所标识的所有 Consumer 都会各自消费某条消息一次。 例如某个 Topic 有 9 条消息,一个 Group ID 有 3 个 Consumer 实例,那么在广播消费模式下每个实例都会各自消费 9 条消息。
// 广播订阅方式设置 factoryInfo.setFactoryProperty(ONSFactoryProperty.MessageModel, ONSFactoryProperty.BROADCASTING);
示例代码
using System;
using System.Threading;
using System.Text;
using ons;
// 从 Broker 拉取消息时要执行的回调函数
public class MyMsgListener : MessageListener
{
public MyMsgListener()
{
}
~MyMsgListener()
{
}
public override ons.Action consume(Message value, ConsumeContext context)
{
Byte[] text = Encoding.Default.GetBytes(value.getBody());
Console.WriteLine(Encoding.UTF8.GetString(text));
return ons.Action.CommitMessage;
}
}
public class ConsumerExampleForEx
{
public ConsumerExampleForEx()
{
}
static void Main(string[] args) {
// 配置您的账号,以下设置均可从控制台获取
ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
// AccessKey 身份验证,在消息队列管理控制台创建
factoryInfo.setFactoryProperty(ONSFactoryProperty.AccessKey, "Your access key");
// SecretKey 身份验证,在消息队列管理控制台创建
factoryInfo.setFactoryProperty(ONSFactoryProperty.SecretKey, "Your access secret");
// 您在控制台创建的 Group ID
factoryInfo.setFactoryProperty(ONSFactoryProperty.ConsumerId, "GID_example");
// 您在控制台创建的 Topic
factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, "T_example_topic_name");
// 设置 TCP 接入域名,进入控制台的实例管理页面的“获取接入点信息”区域查看
factoryInfo.setFactoryProperty(ONSFactoryProperty.NAMESRV_ADDR, "NameSrv_Addr");
// 设置日志路径
factoryInfo.setFactoryProperty(ONSFactoryProperty.LogPath, "C://log");
// 集群消费
// factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty.CLUSTERING);
// 广播消费
// factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty.BROADCASTING);
// 创建消费者实例
PushConsumer consumer = ONSFactory.getInstance().createPushConsumer(factoryInfo);
// 订阅 Topics
consumer.subscribe(factoryInfo.getPublishTopics(), "*", new MyMsgListener());
// 启动客户端实例
consumer.start();
//该设置仅供 demo 使用,实际生产中请保证进程不退出
Thread.Sleep(300000);
// 在进程即将退出时,关闭消费者实例
consumer.shutdown();
}
}
文档反馈
(如有产品使用问题,请 提交工单)