发送消息 (多线程)
消息队列 RocketMQ 的消费者和生产者客户端对象是线程安全的,可以在多个线程之间共享使用。
您可以在服务器上(或者多台服务器)部署多个生产者和消费者实例,也可以在同一个生产者或消费者实例里采用多线程发送或接收消息,从而提高消息发送或接收 TPS。 请避免为每个线程创建一个客户端实例。
在多线程之间共享 Producer 的示例代码如下:
public class SharedProducer {
public static void main(String[] args) {
// producer 实例配置初始化
Properties properties = new Properties();
//您在控制台创建的 Group ID
properties.put(PropertyKeyConst.GROUP_ID, "XXX");
// AccessKey 身份验证,在消息队列管理控制台创建
properties.put(PropertyKeyConst.AccessKey,"XXX");
// SecretKey 身份验证,在消息队列管理控制台创建
properties.put(PropertyKeyConst.SecretKey, "XXX");
//设置发送超时时间,单位毫秒
properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
// 设置 TCP 接入域名,进入控制台的实例管理页面的“获取接入点信息”区域查看
properties.put(PropertyKeyConst.NAMESRV_ADDR,
"XXX");
final Producer producer = ONSFactory.createProducer(properties);
// 在发送消息前,必须调用 start 方法来启动 Producer,只需调用一次即可
producer.start();
//创建的 Producer 和 Consumer 对象为线程安全的,可以在多线程间进行共享,避免每个线程创建一个实例。
//在 thread 和 anotherThread 中共享 Producer 对象,并发地发送消息至消息队列 RocketMQ。
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
try {
Message msg = new Message( //
// Message 所属的 Topic
"TopicTestMQ",
// Message Tag 可理解为 Gmail 中的标签,对消息进行再归类,方便 Consumer 指定过滤条件在消息队列 RocketMQ 的服务器过滤
"TagA",
// Message Body 可以是任何二进制形式的数据, 消息队列 RocketMQ 不做任何干预,
// 需要 Producer 与 Consumer 协商好一致的序列化和反序列化方式
"Hello MQ".getBytes());
SendResult sendResult = producer.send(msg);
// 同步发送消息,只要不抛异常就是成功
if (sendResult != null) {
System.out.println(new Date() + " Send mq message success. Topic is:" + MqConfig.TOPIC + " msgId is: " + sendResult.getMessageId());
}
} catch (Exception e) {
// 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理
System.out.println(new Date() + " Send mq message failed. Topic is:" + MqConfig.TOPIC);
e.printStackTrace();
}
}
});
thread.start();
Thread anotherThread = new Thread(new Runnable() {
@Override
public void run() {
try {
Message msg = new Message("TopicTestMQ", "TagA", "Hello MQ".getBytes());
SendResult sendResult = producer.send(msg);
// 同步发送消息,只要不抛异常就是成功
if (sendResult != null) {
System.out.println(new Date() + " Send mq message success. Topic is:" + MqConfig.TOPIC + " msgId is: " + sendResult.getMessageId());
}
} catch (Exception e) {
// 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理
System.out.println(new Date() + " Send mq message failed. Topic is:" + MqConfig.TOPIC);
e.printStackTrace();
}
}
});
anotherThread.start();
// Producer 实例若不再使用时,可将 Producer 关闭,进行资源释放
// producer.shutdown();
}
}
文档反馈
(如有产品使用问题,请 提交工单)