云主机

  • 云主机 > 使用指南 > 高级特性 >消息过滤

    消息过滤

    最近更新时间: 2019-09-05 11:28:58

    本文描述消息队列 RocketMQ 的消费者如何根据 Tag 在消息队列 RocketMQ 服务端完成消息过滤,以确保消费者最终只消费到其关注的消息类型。
    Tag,即消息标签,用于对某个 Topic 下的消息进行分类。消息队列 RocketMQ 的生产者在发送消息时,已经指定消息的 Tag,消费者需根据已经指定的 Tag 来进行订阅。

    场景示例

    以下图电商交易场景为例,从客户下单到收到商品这一过程会生产一系列消息,以以下消息为例:

    • 订单创建消息
    • 支付消息
    • 物流消息

    这些消息会发送到 Trade_Topic Topic 中,被各个不同的系统所订阅,以以下系统为例:

    • 支付系统:只需订阅支付消息
    • 物流系统:只需订阅物流消息
    • 交易成功率分析系统:需订阅订单和支付消息
    • 实时计算系统:需要订阅所有和交易相关的消息

    过滤示意图如下所示。

    示例代码

    • 发送消息
      发送消息时,每条消息必须指明 Tag:
      Message msg = new Message("MQ_TOPIC","TagA","Hello MQ".getBytes());
      
    • 订阅所有 Tag
      消费者如需订阅某 Topic 下所有类型的消息,Tag 用符号 * 表示:
      consumer.subscribe("MQ_TOPIC", "*", new MessageListener() {
          public Action consume(Message message, ConsumeContext context) {
              System.out.println(message.getMsgID());
              return Action.CommitMessage;
          }
      });
      
    • 订阅单个 Tag
      消费者如需订阅某 Topic 下某一种类型的消息,请明确标明 Tag:
      consumer.subscribe("MQ_TOPIC", "TagA", new MessageListener() {
          public Action consume(Message message, ConsumeContext context) {
              System.out.println(message.getMsgID());
              return Action.CommitMessage;
          }
      });
      
    • 订阅多个 Tag
      消费者如需订阅某 Topic 下多种类型的消息,请在多个 Tag 之间用 || 分隔:
      consumer.subscribe("MQ_TOPIC", "TagA||TagB", new MessageListener() {
          public Action consume(Message message, ConsumeContext context) {
              System.out.println(message.getMsgID());
              return Action.CommitMessage;
          }
      });
      
      - 错误示例
      同一个消费者多次订阅某个 Topic 下的 Tag,以最后一次订阅的 Tag 为准:
      //如下错误代码中,Consumer 只能订阅到 MQ_TOPIC 下 TagB 的消息,而不能订阅 TagA 的消息。
      consumer.subscribe("MQ_TOPIC", "TagA", new MessageListener() {
          public Action consume(Message message, ConsumeContext context) {
              System.out.println(message.getMsgID());
              return Action.CommitMessage;
          }
      });
      consumer.subscribe("MQ_TOPIC", "TagB", new MessageListener() {
          public Action consume(Message message, ConsumeContext context) {
              System.out.println(message.getMsgID());
              return Action.CommitMessage;
          }
      });
      
    以上内容是否对您有帮助?
  • Qvm free helper
    Close