云主机

  • 云主机 > 使用指南 > 最佳实践 >订阅关系一致

    订阅关系一致

    最近更新时间: 2019-09-05 14:17:35

    订阅关系一致指的是同一个消费者 Group ID 下所有 Consumer 实例的处理逻辑必须完全一致。一旦订阅关系不一致,消息消费的逻辑就会混乱,甚至导致消息丢失。消息队列 RocketMQ 里的一个消费者 Group ID 代表一个 Consumer 实例群组。对于大多数分布式应用来说,一个消费者 Group ID 下通常会挂载多个 Consumer 实例。
    由于消息队列 RocketMQ 的订阅关系主要由 Topic + Tag 共同组成,因此,保持订阅关系一致意味着同一个消费者 Group ID 下所有的实例需在以下两方面均保持一致:

    • 订阅的 Topic 必须一致
    • 订阅的 Topic 中的 Tag 必须一致

    订阅关系图片示例

    正确订阅关系图片示例

    在下图中,多个 Group ID 订阅了多个 Topic,并且每个 Group ID 里的多个消费者实例的订阅关系保持了一致。

    错误订阅关系图片示例

    在下图中,单个 Group ID 订阅了多个 Topic,但是该 Group ID 里的多个消费者实例的订阅关系并没有保持一致。

    订阅关系代码示例

    错误订阅关系代码示例

    【例一】
    以下例子中,同一个 Group ID 下的两个实例订阅的 Topic 不一致。
    Consumer 实例 1-1:

    Properties properties = new Properties();
    properties.put(PropertyKeyConst.GROUP_ID, "GID_jodie_test_1");
    Consumer consumer = ONSFactory.createConsumer(properties);
    consumer.subscribe("jodie_test_A", "*", new MessageListener() {
        public Action consume(Message message, ConsumeContext context) {
            System.out.println(message.getMsgID());
            return Action.CommitMessage;
        }
    });
    

    Consumer 实例 1-2:

    Properties properties = new Properties();
    properties.put(PropertyKeyConst.GROUP_ID, "GID_jodie_test_1");
    Consumer consumer = ONSFactory.createConsumer(properties);
    consumer.subscribe("jodie_test_B ", "*", new MessageListener() {
        public Action consume(Message message, ConsumeContext context) {
            System.out.println(message.getMsgID());
            return Action.CommitMessage;
        }
    });
    

    【例二】
    以下例子中,同一个 Group ID 下订阅 Topic 的 Tag 不一致。Consumer 实例 2-1 订阅了 TagA,而 Consumer 实例 2-2 未指定 Tag。
    Consumer 实例 2-1:

    Properties properties = new Properties();
    properties.put(PropertyKeyConst.GROUP_ID, "GID_jodie_test_2");
    Consumer consumer = ONSFactory.createConsumer(properties);
    consumer.subscribe("jodie_test_A", "TagA", new MessageListener() {
        public Action consume(Message message, ConsumeContext context) {
            System.out.println(message.getMsgID());
            return Action.CommitMessage;
        }
    });
    

    Consumer 实例 2-2:

    Properties properties = new Properties();
    properties.put(PropertyKeyConst.GROUP_ID, "GID_jodie_test_2");
    Consumer consumer = ONSFactory.createConsumer(properties);
    consumer.subscribe("jodie_test_A", "*", new MessageListener() {
        public Action consume(Message message, ConsumeContext context) {
            System.out.println(message.getMsgID());
            return Action.CommitMessage;
        }
    });
    

    【例三】
    此例中,错误的原因有俩个:
    同一个 Group ID 下订阅 Topic 个数不一致。
    同一个 Group ID 下订阅 Topic 的 Tag 不一致。
    Consumer 实例 3-1:

    Properties properties = new Properties();
    properties.put(PropertyKeyConst.GROUP_ID, "GID_jodie_test_3");
    Consumer consumer = ONSFactory.createConsumer(properties);
    consumer.subscribe("jodie_test_A", "TagA", new MessageListener() {
        public Action consume(Message message, ConsumeContext context) {
            System.out.println(message.getMsgID());
            return Action.CommitMessage;
        }
    });
    consumer.subscribe("jodie_test_B", "TagB", new MessageListener() {
        public Action consume(Message message, ConsumeContext context) {
            System.out.println(message.getMsgID());
            return Action.CommitMessage;
        }
    });
    

    Consumer 实例 3-2:

    Properties properties = new Properties();
    properties.put(PropertyKeyConst.GROUP_ID, "GID_jodie_test_3");
    Consumer consumer = ONSFactory.createConsumer(properties);
    consumer.subscribe("jodie_test_A", "TagB", new MessageListener() {
        public Action consume(Message message, ConsumeContext context) {
            System.out.println(message.getMsgID());
            return Action.CommitMessage;
        }
    });
    
    以上内容是否对您有帮助?
  • Qvm free helper
    Close