云主机

  • 云主机服务 > 产品手册 > 消息队列 RocketMQ >SDK参考(TCP版本) >C/C++ >订阅消息

    订阅消息

    最近更新时间:2019-09-05 19:22:26

    本文介绍如何通过消息队列 RocketMQ 的 C/C++ SDK 订阅消息。

    订阅方式

    消息队列 RocketMQ 支持以下两种订阅方式:

    • 集群订阅
      同一个 Group ID 所标识的所有 Consumer 平均分摊消费消息。例如某个 Topic 有 9 条消息,一个 Group ID 有 3 个 Consumer 实例,那么在集群消费模式下每个实例平均分摊,只消费其中的 3 条消息。
      // 集群订阅方式设置(不设置的情况下,默认为集群订阅方式)
      factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty::CLUSTERING);
      
    • 广播订阅
      同一个 Group ID 所标识的所有 Consumer 都会各自消费某条消息一次。例如某个 Topic 有 9 条消息,一个 Group ID 有 3 个 Consumer 实例,那么在广播消费模式下每个实例都会各自消费 9 条消息。
      // 广播订阅方式设置
      factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty::BROADCASTING);
      
      注意事项
    • 请确保同一个 Group ID 下所有 Consumer 实例的订阅关系保持一致,详情请参见订阅关系一致。
    • 两种不同的订阅方式有着不同的功能限制,例如,广播模式不支持顺序消息、不维护消费进度、不支持重置消费位点等,详情请参见集群消费和广播消费。

    示例代码

    具体的示例代码,请以消息队列 RocketMQ 代码库为准。

    #include "ONSFactory.h"
    using namespace ons;
    // MyMsgListener:创建消费消息的实例
    //pushConsumer 拉取到消息后,会主动调用该实例的 consumer 函数
    class MyMsgListener : public MessageListener
    {
        public:
            MyMsgListener()
            {
            }
            virtual ~MyMsgListener()
            {
            }
            virtual Action consume(Message &message, ConsumeContext &context)
            {
                //自定义消息处理细节
                return CommitMessage; //CONSUME_SUCCESS;
            }
    };
    int main(int argc, char* argv[])
    {
        //pushConsumer 创建和工作需要的参数,必须输入
        ONSFactoryProperty factoryInfo;
        factoryInfo.setFactoryProperty(ONSFactoryProperty::ConsumerId, "XXX");//您在控制台创建的 Group ID
        factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "XXX"); //设置 TCP 接入域名,进入控制台的实例管理页面的“获取接入点信息”区域查看
        factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX" );//您在控制台创建的 Topic
        factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "XXX");//身份验证,在消息队列管理控制台创建
        factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey,  "XXX");//身份验证,在消息队列管理控制台创建
          // 集群订阅方式 (默认)
          // factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty::CLUSTERING);
          // 广播订阅方式
          // factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty::BROADCASTING);
        //create pushConsumer
        PushConsumer* pushConsumer = ONSFactory::getInstance()->createPushConsumer(factoryInfo);
        //指定 pushConsumer 订阅的消息 Topic 和 Tag,注册消息回调函数
        MyMsgListener  msglistener;
        pushConsumer->subscribe(factoryInfo.getPublishTopics(), "*",&msglistener );
        //start pushConsumer
        pushConsumer->start();
        //注意:直到不再接收消息,才能调用 shutdown;调用 shutdown 之后,Consumer 退出,不能接收到任何消息
        //销毁 pushConsumer,在应用退出前,必须销毁 Consumer 对象,否则会导致内存泄露等问题
        pushConsumer->shutdown();
        return 0;
    }
    
    以上内容是否对您有帮助?
  • Icon free helper
    Close