云主机

  • 云主机服务 > 产品手册 > 消息队列 RocketMQ >SDK参考(TCP版本) >.NET >订阅消息

    订阅消息

    最近更新时间:2019-09-05 19:54:28

    本文介绍如何通过消息队列 RocketMQ 的 SDK 使用.NET 语言进行消息订阅。
    说明:请确保同一个 Group ID 下所有 Consumer 实例的订阅关系保持一致,详见订阅关系一致。

    订阅方式

    • 消息队列 RocketMQ 支持以下两种订阅方式:
      集群订阅:同一个 Group ID 所标识的所有 Consumer 平均分摊消费消息。 例如某个 Topic 有 9 条消息,一个 Group ID 有 3 个 Consumer 实例,那么在集群消费模式下每个实例平均分摊,只消费其中的 3 条消息。
      // 集群订阅方式设置(不设置的情况下,默认为集群订阅方式)
      factoryInfo.setFactoryProperty(ONSFactoryProperty.MessageModel, ONSFactoryProperty.CLUSTERING);
      
    • 广播订阅:同一个 Group ID 所标识的所有 Consumer 都会各自消费某条消息一次。 例如某个 Topic 有 9 条消息,一个 Group ID 有 3 个 Consumer 实例,那么在广播消费模式下每个实例都会各自消费 9 条消息。
      // 广播订阅方式设置
      factoryInfo.setFactoryProperty(ONSFactoryProperty.MessageModel, ONSFactoryProperty.BROADCASTING);
      

    示例代码

    using System;
    using System.Threading;
    using System.Text;
    using ons;
    // 从 Broker 拉取消息时要执行的回调函数
    public class MyMsgListener : MessageListener
    {
        public MyMsgListener()
        {
        }
        ~MyMsgListener()
        {
        }
        public override ons.Action consume(Message value, ConsumeContext context)
        {
            Byte[] text = Encoding.Default.GetBytes(value.getBody());
            Console.WriteLine(Encoding.UTF8.GetString(text));
            return ons.Action.CommitMessage;
        }
    }
    public class ConsumerExampleForEx
    {
        public ConsumerExampleForEx()
        {
        }
        static void Main(string[] args) {
            // 配置您的账号,以下设置均可从控制台获取
            ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
            // AccessKey 身份验证,在消息队列管理控制台创建
            factoryInfo.setFactoryProperty(ONSFactoryProperty.AccessKey, "Your access key");
            // SecretKey 身份验证,在消息队列管理控制台创建
            factoryInfo.setFactoryProperty(ONSFactoryProperty.SecretKey, "Your access secret");
            // 您在控制台创建的 Group ID
            factoryInfo.setFactoryProperty(ONSFactoryProperty.ConsumerId, "GID_example");
            // 您在控制台创建的 Topic
            factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, "T_example_topic_name");
            // 设置 TCP 接入域名,进入控制台的实例管理页面的“获取接入点信息”区域查看
            factoryInfo.setFactoryProperty(ONSFactoryProperty.NAMESRV_ADDR, "NameSrv_Addr");
            // 设置日志路径
            factoryInfo.setFactoryProperty(ONSFactoryProperty.LogPath, "C://log");
            // 集群消费
            // factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty.CLUSTERING);
            // 广播消费
            // factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty.BROADCASTING);
            // 创建消费者实例
            PushConsumer consumer = ONSFactory.getInstance().createPushConsumer(factoryInfo);
            // 订阅 Topics
            consumer.subscribe(factoryInfo.getPublishTopics(), "*", new MyMsgListener());
            // 启动客户端实例
            consumer.start();
            //该设置仅供 demo 使用,实际生产中请保证进程不退出
            Thread.Sleep(300000);
            // 在进程即将退出时,关闭消费者实例
            consumer.shutdown();
        }
    }
    
    以上内容是否对您有帮助?
  • Icon free helper
    Close