云主机

  • 云主机 > 使用文档 > 发送消息 (多线程)

    发送消息 (多线程)

    最近更新时间: 2019-09-05 15:31:06

    消息队列 RocketMQ 的消费者和生产者客户端对象是线程安全的,可以在多个线程之间共享使用。
    您可以在服务器上(或者多台服务器)部署多个生产者和消费者实例,也可以在同一个生产者或消费者实例里采用多线程发送或接收消息,从而提高消息发送或接收 TPS。 请避免为每个线程创建一个客户端实例。
    在多线程之间共享 Producer 的示例代码如下:

    public class SharedProducer {
        public static void main(String[] args) {
            // producer 实例配置初始化
            Properties properties = new Properties();
            //您在控制台创建的 Group ID
            properties.put(PropertyKeyConst.GROUP_ID, "XXX");
            // AccessKey 身份验证,在消息队列管理控制台创建
            properties.put(PropertyKeyConst.AccessKey,"XXX");
            // SecretKey 身份验证,在消息队列管理控制台创建
            properties.put(PropertyKeyConst.SecretKey, "XXX");
            //设置发送超时时间,单位毫秒 
            properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
            // 设置 TCP 接入域名,进入控制台的实例管理页面的“获取接入点信息”区域查看
            properties.put(PropertyKeyConst.NAMESRV_ADDR,
              "XXX");
            final Producer producer = ONSFactory.createProducer(properties);
            // 在发送消息前,必须调用 start 方法来启动 Producer,只需调用一次即可
            producer.start();
            //创建的 Producer 和 Consumer 对象为线程安全的,可以在多线程间进行共享,避免每个线程创建一个实例。
            //在 thread 和 anotherThread 中共享 Producer 对象,并发地发送消息至消息队列 RocketMQ。
            Thread thread = new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        Message msg = new Message( //
                        // Message 所属的 Topic
                        "TopicTestMQ",
                        // Message Tag 可理解为 Gmail 中的标签,对消息进行再归类,方便 Consumer 指定过滤条件在消息队列 RocketMQ 的服务器过滤
                        "TagA",
                        // Message Body 可以是任何二进制形式的数据, 消息队列 RocketMQ 不做任何干预,
                        // 需要 Producer 与 Consumer 协商好一致的序列化和反序列化方式
                        "Hello MQ".getBytes());
                        SendResult sendResult = producer.send(msg);
                        // 同步发送消息,只要不抛异常就是成功
                        if (sendResult != null) {
                            System.out.println(new Date() + " Send mq message success. Topic is:" + MqConfig.TOPIC + " msgId is: " + sendResult.getMessageId());
                        }
                    } catch (Exception e) {
                        // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理
                        System.out.println(new Date() + " Send mq message failed. Topic is:" + MqConfig.TOPIC);
                        e.printStackTrace();
                    }
                }
            });
            thread.start();
            Thread anotherThread = new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        Message msg = new Message("TopicTestMQ", "TagA", "Hello MQ".getBytes());
                        SendResult sendResult = producer.send(msg);
                        // 同步发送消息,只要不抛异常就是成功
                        if (sendResult != null) {
                            System.out.println(new Date() + " Send mq message success. Topic is:" + MqConfig.TOPIC + " msgId is: " + sendResult.getMessageId());
                        }
                    } catch (Exception e) {
                        // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理
                        System.out.println(new Date() + " Send mq message failed. Topic is:" + MqConfig.TOPIC);
                        e.printStackTrace();
                    }
                }
            });
            anotherThread.start();
            // Producer 实例若不再使用时,可将 Producer 关闭,进行资源释放
            // producer.shutdown();
        }
    }
    
    以上内容是否对您有帮助?
  • Qvm free helper
    Close