RocketMQ
简介
RocketMQ 主要用于削峰限流、异步解耦等场景。
RocketMQ 在 5.0 之前,消息的计算、存储都在 Broker 中进行,生产者和消费者从 NameServer 中拉取到路由信息,之后直接与 Broker 交互进行消息的生产与消费;

5.0 以后新引入了 Proxy 层,对 Broker 的计算、存储进行了拆分,Broker 专注数据的存储,以便更好的适应云原生环境,如果需要提高计算层的能力,只需要增加 Proxy 层,如果需要提高存储层的能力,增加 Broker 的部署即可。

安装
访问 官方下载地址 下载安装资源、客户端 SDK、控制面板等。以下示例基于 5.x 版本,其他版本参考官网。
提示
注意,在资源目录中的 bin 目录下,有 runserver.sh、runbroker.sh、tools.sh 三个文件,根据服务器配置修改其中的 JVM 启动参数(主要调整堆空间,默认占用了 4~8G,通常建议-Xmx 不要超过物理内存的 80%。)。
Local 模式
Broker 和 Proxy 是同进程部署,主要的集群配置仍然以 Broker 为基础进行即可。参考地址
- 启动 NameServer
# 1. 启动 NameServer,集群中各节点的启动命令相同
nohup sh mqnamesrv > ../my_namesrv.log & # 日志输出到指定文件
nohup sh mqnamesrv > /dev/null 2>&1 & # 日志不输出到任何文件启动 Broker+Proxy
- 单节点
一旦 Broker 重启或者宕机,整个服务不可用,可用于本地测试学习。
nohup sh bin/mqbroker -n localhost:9876 --enable-proxy > /dev/null 2>&1 &- 多节点单副本
集群中全部为 Master,配置文件需保证 brokerName 属性不同。
单台机器宕机期间,这台机器上的消息不可用,但是消息保存在磁盘不会丢失,比较可靠。
# A 机器启动 Master nohup sh bin/mqbroker -n 192.168.1.1:9876;192.168.2:9876 -c \ $ROCKETMQ_HOME/conf/2m-noslave/broker-a.properties --enable-proxy & # B 机器启动 Master nohup sh bin/mqbroker -n 192.168.1.1:9876;192.168.2:9876 -c \ $ROCKETMQ_HOME/conf/2m-noslave/broker-b.properties --enable-proxy &- 多节点多副本-异步复制
多副本指多个从节点,具体配置内容看一下默认配置就懂了。
由于采用异步复制方式,主备有短暂消息延迟(毫秒级),Master 宕机,磁盘损坏情况下会丢失少量消息。
# A 机器启动 Master nohup sh bin/mqbroker -n 192.168.1.1:9876;192.168.2:9876 -c \ $ROCKETMQ_HOME/conf/2m-2s-async/broker-a.properties --enable-proxy & # B 机器启动 Slave nohup sh bin/mqbroker -n 192.168.1.1:9876;192.168.2:9876 -c \ $ROCKETMQ_HOME/conf/2m-2s-async/broker-a-s.properties --enable-proxy &提示
Broker 与 Slave 配对是通过指定相同的 BrokerName 参数来配对,Master 的 BrokerId 必须是 0,Slave 的 BrokerId 必须是大于 0 的数。一个 Master 下面可以挂载多个 Slave,同一 Master 下的多个 Slave 通过指定不同的 BrokerId 来区分。
- 多节点多副本-同步双写
同步双写方式,即只有主备都写成功才返回成功,性能略低,目前版本(5.x)在主节点宕机后,备机不能自动切换为主机。
# A 机器启动 Master nohup sh bin/mqbroker -n 192.168.1.1:9876;192.168.2:9876 -c \ $ROCKETMQ_HOME/conf/2m-2s-sync/broker-a.properties --enable-proxy & # B 机器启动 Slave nohup sh bin/mqbroker -n 192.168.1.1:9876;192.168.2:9876 -c \ $ROCKETMQ_HOME/conf/2m-2s-sync/broker-a-s.properties --enable-proxy &
Cluster 模式
Broker 和 Proxy 分别部署,可以在 NameServer 和 Broker 都启动完成之后再部署 Proxy。参考地址
在 Cluster 模式下,一个 Proxy 集群和 Broker 集群为一一对应的关系,可以在 Proxy 的配置文件 rmq-proxy.json 中使用 rocketMQClusterName 进行配置。
- 启动 NameServer
# 1. 启动 NameServer,集群中各节点的启动命令相同
nohup sh mqnamesrv > ../my_namesrv.log & # 日志输出到指定文件
nohup sh mqnamesrv > /dev/null 2>&1 & # 日志不输出到任何文件启动 Broker
- 单节点
一旦 Broker 重启或者宕机,整个服务不可用,可用于本地测试学习。
nohup sh bin/mqbroker -n localhost:9876 > /dev/null 2>&1 &- 多节点单副本
集群中全部为 Master,配置文件需保证 brokerName 属性不同。
单台机器宕机期间,这台机器上的消息不可用,但是消息保存在磁盘不会丢失,比较可靠。
# A 机器启动 Master nohup sh bin/mqbroker -n 192.168.1.1:9876;192.168.2:9876 -c \ $ROCKETMQ_HOME/conf/2m-noslave/broker-a.properties & # B 机器启动 Master nohup sh bin/mqbroker -n 192.168.1.1:9876;192.168.2:9876 -c \ $ROCKETMQ_HOME/conf/2m-noslave/broker-b.properties &- 多节点多副本-异步复制
具体配置内容看一下默认配置就会了。
由于采用异步复制方式,主备有短暂消息延迟(毫秒级),Master 宕机,磁盘损坏情况下会丢失少量消息。
# A 机器启动 Master nohup sh bin/mqbroker -n 192.168.1.1:9876;192.168.2:9876 -c \ $ROCKETMQ_HOME/conf/2m-2s-async/broker-a.properties & # B 机器启动 Slave nohup sh bin/mqbroker -n 192.168.1.1:9876;192.168.2:9876 -c \ $ROCKETMQ_HOME/conf/2m-2s-async/broker-a-s.properties &- 多节点多副本-同步双写
同步双写方式,即只有主备都写成功才返回成功,性能略低,目前版本(5.x)在主节点宕机后,备机不能自动切换为主机。
# A 机器启动 Master nohup sh bin/mqbroker -n 192.168.1.1:9876;192.168.2:9876 -c \ $ROCKETMQ_HOME/conf/2m-2s-sync/broker-a.properties & # B 机器启动 Slave nohup sh bin/mqbroker -n 192.168.1.1:9876;192.168.2:9876 -c \ $ROCKETMQ_HOME/conf/2m-2s-sync/broker-a-s.properties &启动 Proxy
可以在多台机器启动多个 Proxy,注意必须等 Broker 启动完成后再启动 Proxy,否则可能启动失败!
# A 机器启动 Proxy nohup sh bin/mqproxy -n 192.168.1.1:9876;192.168.2:9876 & # B 机器启动 Proxy nohup sh bin/mqproxy -n 192.168.1.1:9876;192.168.2:9876 & # C 机器启动 Proxy,并指定配置文件路径 nohup sh bin/mqproxy -n 192.168.1.1:9876;192.168.2:9876 \ -pc $ROCKETMQ_HOME/conf/rmq-proxy.json &
RocketMQ Dashboard
RocketMQ 的管控利器,为用户提供客户端和应用程序的各种事件、性能的统计信息,支持以可视化工具代替 Topic 配置、Broker 管理等命令行操作。参考地址
docker pull apacherocketmq/rocketmq-dashboard:1.0.0 &&
docker run -d --name rocketmq-dashboard \
-e "JAVA_OPTS=-Drocketmq.namesrv.addr=192.168.1.1:9876;192.168.2:9876" \
-p 8001:8001 -t apacherocketmq/rocketmq-dashboard:1.0.0mvn spring-boot:run
mvn clean package -Dmaven.test.skip=true
nohup java -jar rocketmq-dashboard-1.0.0-SNAPSHOT.jar \
--server.port=8001 --rocketmq.config.namesrvAddr=192.168.1.1:9876;192.168.2:9876 &提示
开放端口号:8001(dashboard),9876(NameServer),10911(Broker),8080(Proxy) 端口
快速启动
以下示例采用 Local 模式单机运行,适用于开发环境,可一键启动 RocketMQ,生产环境可自行参考改进。
RocketMQ 运行需要依赖 Java 环境,参考 [脚本配置 Java 环境变量](/back/java/java 工具。html#脚本配置环境变量)一键配置
REM 关闭命令显示
@echo off
REM 检查是否以管理员权限运行,因为 setx 命令需要管理员权限
net session >nul 2>&1
if %errorlevel% neq 0 (
echo runing with administrator...
powershell start-process '%0' -verb runas
exit /b
)
setx ROCKETMQ_HOME "D:\cloud_server\rocketmq\5.2.0\rocketmq-all-5.2.0-bin-release" /M
timeout /t 1 >nul
echo RocketMQ 环境变量更新成功!
echo NameServer 开始启动!
start %ROCKETMQ_HOME%\bin\mqnamesrv.cmd
timeout /t 3 >nul
echo Broker 开始启动!
start %ROCKETMQ_HOME%\bin\mqbroker.cmd -n 127.0.0.1:9876
timeout /t 8 >nul
echo Proxy 开始启动!
start %ROCKETMQ_HOME%\bin\mqproxy.cmd -n 127.0.0.1:9876
timeout /t 5 >nul
echo dashboard 开始启动!
:: %~dp0 表示当前 bat 文件所在的目录;
java -jar %~dp0/rocketmq-dashboard-1.0.0.jar --server.port=8001 ^
--rocketmq.config.namesrvAddr=127.0.0.1:9876/usr/local/java/jdk-17基本概念
主题(Topic)
最顶层容器,标识同一类业务或同一类消息类型(顺序消息、事务消息等);
消息标签(MessageTag)
主题层级之下做消息的细分(用 Tag 标识),消费者可通过 Tag 过滤细粒度消息。
消费者分组(ConsumerGroup)
通过消费者分组,可使多个消费行为一致的消费者实现消息负载均衡。
消息组(MessageGroup)
这是 5.x 版本关于顺序消息新引入的概念,顺序消息发送必须要设置消息组。
相同消息组的消息按照先后顺序被存储在同一个队列;不同消息组的消息可以混合在同一个队列中,但不保证连续,即消息组之间没有顺序关系;
将订单 ID、用户 ID 作为消息组关键字,可实现同一终端,用户的消息按照顺序处理,不同用户的消息无需保证顺序。
5.x 版本有点多懒得写了,查看官网了解一下即可。参考地址
功能特性
本节简单介绍不同消息的实现,具体内容可 参考官网。
接入准备
客户端 SDK
根据底层通信协议,主要支持两个系列的客户端 SDK(Remoting 协议和 gRPC 协议)参考地址
gRPC 协议 SDK 自 5.0 版本全新推出,仅支持 5.x 版本服务端,更加轻量、标准、易扩展,支持多种语言,以 rocketmq-client-{language}的格式命名。
Remoting 协议 SDK 兼容 4.x、5.x 版本服务端,作为早期间默认通信协议,版本迭代绑定服务端同步更新,主要支持 Java 语言;
gRPC 协议<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client-java --> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client-java</artifactId> <version>${rocketmq-client-java-version}</version> </dependency>Remoting 协议<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client --> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>${rocketmq-client-version}</version> </dependency>构建单例生产者和消费者(只适用于 5.x 版本)
主要用于和服务器建立连接。
生产者import org.apache.rocketmq.client.apis.*; import org.apache.rocketmq.client.apis.producer.Producer; import org.apache.rocketmq.client.apis.producer.ProducerBuilder; import org.apache.rocketmq.client.apis.producer.TransactionChecker; /** * 生产者单例 * 如果需要多个连接,请考虑适当增加客户端的数量。 */ public class ProducerSingleton { private static volatile Producer PRODUCER; private static volatile Producer TRANSACTIONAL_PRODUCER; private static final String ACCESS_KEY = "yourAccessKey"; private static final String SECRET_KEY = "yourSecretKey"; private static final String ENDPOINTS = "127.0.0.1:8080"; // proxy 地址 private ProducerSingleton() { } private static Producer buildProducer(TransactionChecker checker, String... topics) throws ClientException { ClientConfiguration clientConfiguration = ClientConfiguration .newBuilder() // 某些 Windows 平台上,可能会遇到 SSL 兼容性问题。若 SSL 不是必需的,请尝试关闭以解决问题。 // .enableSsl(false) .setEndpoints(ENDPOINTS) // 如果是云端 RocketMQ 则需要凭证信息,本地可忽略配置凭证 // .setCredentialProvider(new StaticSessionCredentialsProvider(ACCESS_KEY, SECRET_KEY)) .build(); final ProducerBuilder builder = ClientServiceProvider.loadService().newProducerBuilder() .setClientConfiguration(clientConfiguration) // topics 可选,但建议配置 .setTopics(topics); if (checker != null) { // 事务消息配置 builder.setTransactionChecker(checker); } return builder.build(); } // 普通消息 public static Producer getInstance(String... topics) throws ClientException { if (null == PRODUCER) { synchronized (ProducerSingleton.class) { if (null == PRODUCER) PRODUCER = buildProducer(null, topics); } } return PRODUCER; } // 事务消息 public static Producer getTransactionalInstance(TransactionChecker checker, String... topics) throws ClientException { if (null == TRANSACTIONAL_PRODUCER) { synchronized (ProducerSingleton.class) { if (null == TRANSACTIONAL_PRODUCER) TRANSACTIONAL_PRODUCER = buildProducer(checker, topics); } } return TRANSACTIONAL_PRODUCER; } }消费者import org.apache.rocketmq.client.apis.*; import org.apache.rocketmq.client.apis.consumer.*; import java.time.Duration; import java.util.Collections; // 消费者单例 public class ConsumerSingleton { private static volatile SimpleConsumer SIMPLE_CONSUMER; private static volatile PushConsumer PUSH_CONSUMER; private static final String ACCESS_KEY = "yourAccessKey"; private static final String SECRET_KEY = "yourSecretKey"; private static final String ENDPOINTS = "127.0.0.1:8080"; // proxy 地址 private ConsumerSingleton() { } // SimpleConsumer 可灵活自定义,要自己主动调用接口返回消费结果,适用于高度自定义流程的业务 private static SimpleConsumer buildSimpleConsumer(String topic, String tag, String consumerGroup) throws ClientException { ClientConfiguration clientConfiguration = bulidClientConfiguration(); FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG); final ClientServiceProvider provider = ClientServiceProvider.loadService(); return provider.newSimpleConsumerBuilder() .setClientConfiguration(clientConfiguration) .setConsumerGroup(consumerGroup) .setAwaitDuration(Duration.ofSeconds(30)) // 设置订阅规则,同一消费者组中可设置不同的过滤规则 .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)) .build(); } // PushConsumer 被高度封装,使用监听器回调接口返回消费结果,适用于无自定义流程的业务 private static PushConsumer buildPushConsumer(String topic, String tag, String consumerGroup, MessageListener listener) throws ClientException { ClientConfiguration clientConfiguration = bulidClientConfiguration(); FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG); final ClientServiceProvider provider = ClientServiceProvider.loadService(); return provider.newPushConsumerBuilder() .setClientConfiguration(clientConfiguration) .setConsumerGroup(consumerGroup) .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)) .setMessageListener(listener) .build(); } // 设置 端点 和 凭证;云端服务需使用凭证信息,本地测试则不需要 private static ClientConfiguration bulidClientConfiguration() { return ClientConfiguration .newBuilder() .setEndpoints(ENDPOINTS) // 如果是云端 RocketMQ 则需要凭证信息 // .setCredentialProvider(new StaticSessionCredentialsProvider(ACCESS_KEY, SECRET_KEY)) .build(); } /** * 创建 SimpleConsumer 实例 * 在大多数情况下,不需要创建太多的消费者,单例模式是推荐的 */ public static SimpleConsumer getSimpleConsumer(String topic, String tag, String consumerGroup) throws ClientException { if (null == SIMPLE_CONSUMER) { synchronized (ProducerSingleton.class) { if (null == SIMPLE_CONSUMER) SIMPLE_CONSUMER = buildSimpleConsumer(topic, tag, consumerGroup); } } return SIMPLE_CONSUMER; } /** * 创建 PushConsumer 实例 * 在大多数情况下,不需要创建太多的消费者,单例模式是推荐的 */ public static PushConsumer getPushConsumer(String topic, String tag, String consumerGroup, MessageListener listener) throws ClientException { if (null == PUSH_CONSUMER) { synchronized (ProducerSingleton.class) { if (null == PUSH_CONSUMER) PUSH_CONSUMER = buildPushConsumer(topic, tag, consumerGroup, listener); } } return PUSH_CONSUMER; } }
普通消息
普通消息一般应用于微服务解耦、事件驱动、数据集成等场景;具有可靠传输的能力,且对消息的处理时机、处理顺序没有特别要求。参考地址
创建主题
Linuxsh mqadmin updateTopic -n <nameserver_address> -t <topic_name> \ -c <cluster_name> -a +message.type=NORMALWindowsstart %ROCKETMQ_HOME%\bin\mqadmin.cmd updateTopic -n <nameserver_address> ^ -t <topic_name> -c <cluster_name> -a +message.type=NORMAL注意
消息类型有 Normal/FIFO/Delay/Transaction。如果不设置,默认为 Normal 类型。
自动创建主题功能建议仅在测试环境使用,生产环境请勿打开,避免产生大量垃圾主题,无法管理和回收并浪费系统资源。
代码示例
- gRPC 协议 (适用于 5.x)
生产者// 普通消息;具有可靠传输的能力,且对消息的处理时机、处理顺序没有特别要求。 public static void sendNormalMessage() throws ClientException { final String topic = "testTopicNormal"; final String tag = "testTagNormal"; final Producer producer = ProducerSingleton.getInstance(topic); for (int i = 0; i < 20; i++) { byte[] messageBody = ("发送普通消息--" + i).getBytes(StandardCharsets.UTF_8); final Message message = ClientServiceProvider.loadService().newMessageBuilder() .setTopic(topic) .setTag(tag) // 消息主键,可方便查找;一般设为业务相关联的唯一 id,如用户 id、订单 id 等 .setKeys("yourMessageKey-" + i) .setBody(messageBody) .build(); // SendReceipt sendReceipt = producer.send(message); // 同步发送 // 异步发送 CompletableFuture<SendReceipt> future = producer.sendAsync(message); CompletableFuture<SendReceipt> result = future.whenComplete((sendReceipt, throwable) -> { if (throwable == null) log.info("Send message successfully, messageId={}", sendReceipt.getMessageId()); else log.info("执行失败:{}", throwable.getMessage()); }); result.join(); // 阻塞式获取处理结果,阻塞住是为了看到上面的日志信息; } }简单消费者// SimpleConsumer 可灵活自定义,要自己主动调用接口返回消费结果,适用于高度自定义流程的业务 public static void getNormalMessageBySimpleConsumer() throws ClientException { final String topic = "testTopicNormal"; final String tag = "testTagNormal"; // 通过消费者分组,可使多个消费行为一致的消费者实现消息负载均衡。 final String consumerGroup = "testGroup1"; SimpleConsumer consumer = ConsumerSingleton.getSimpleConsumer(topic, tag, consumerGroup); // 每个长轮询的最大消息数 int maxMessageNum = 5; // 设置消息接收后的不可见持续时间 Duration invisibleDuration = Duration.ofSeconds(15); // 接收消息,推荐使用多线程 do { // 接收消息;如果遇到系统流控等原因造成拉取失败,需要重新发起获取消息请求。 final List<MessageView> messages = consumer.receive(maxMessageNum, invisibleDuration); log.info("已收到 {} 条消息", messages.size()); for (MessageView message : messages) { final MessageId messageId = message.getMessageId(); try { consumer.ack(message); // 确认收到消息 log.info("消息已成功确认,messageId={}", messageId); } catch (Throwable t) { log.error("无法确认消息,messageId={}", messageId, t); } } } while (true); // Close the simple consumer when you don't need it anymore. // You could close it manually or add this into the JVM shutdown hook. // 由于是单例,JVM 停止 貌似就被销毁了,无需停止 // consumer.close(); }Push 消费者// PushConsumer 被高度封装,使用监听器回调接口返回消费结果,适用于无自定义流程的业务 public static void getNormalMessageByPushConsumer() throws ClientException { final String topic = "testTopicNormal"; final String tag = "testTagNormal"; // 通过消费者分组,可使多个消费行为一致的消费者实现消息负载均衡。 final String consumerGroup = "testGroup1"; PushConsumer consumer = ConsumerSingleton.getPushConsumer(topic, tag, consumerGroup, messageView -> { log.info("收到消息={}", messageView); return ConsumeResult.SUCCESS; }); // consumer.close(); }测试@Test void testNormalMessageBySimpleConsumer() throws Exception { ProducerExample.sendNormalMessage(); ConsumerExample.getNormalMessageBySimpleConsumer(); // 防止 main 线程关太快,导致消费者来不及消费程序就停了 Thread.sleep(Duration.ofSeconds(5).toMillis()); }- Remoting 协议(兼容 4.x、5.x 版本)
懒得写了,推荐直接看官网 参考地址。
定时消息
创建主题
Linuxsh mqadmin updateTopic -n <nameserver_address> -t <topic_name> \ -c <cluster_name> -a +message.type=DELAYWindowsstart %ROCKETMQ_HOME%\bin\mqadmin.cmd updateTopic -n <nameserver_address> ^ -t <topic_name> -c <cluster_name> -a +message.type=DELAY示例代码
生产者// 定时消息 public static void sendDelayMessage() throws ClientException { final String topic = "testDelayTopic"; final String tag = "testDelayTag"; final Producer producer = ProducerSingleton.getInstance(topic); byte[] messageBody = "发送定时消息".getBytes(StandardCharsets.UTF_8); final Message message = ClientServiceProvider.loadService().newMessageBuilder() .setTopic(topic) .setTag(tag) .setKeys("yourMessageKey-22") // 消息会立马发送至服务端,但是 10 秒后才能被消费者消费 .setDeliveryTimestamp(System.currentTimeMillis() + Duration.ofSeconds(10).toMillis()) .setBody(messageBody) .build(); SendReceipt sendReceipt = producer.send(message); // 同步发送 log.info("Send delayMessage successfully, messageId={}", sendReceipt.getMessageId()); }消费者// 定时消息 public static void getDelayMessageByPushConsumer() throws ClientException { final String topic = "testDelayTopic"; final String tag = "testDelayTag"; // 通过消费者分组,可使多个消费行为一致的消费者实现消息负载均衡。 final String consumerGroup = "testGroup2"; PushConsumer consumer = ConsumerSingleton.getPushConsumer(topic, tag, consumerGroup, messageView -> { log.info("收到定时消息={}", messageView.getDeliveryTimestamp()); return ConsumeResult.SUCCESS; }); // consumer.close(); }测试// 定时消息 @Test void testDelayMessageByPushConsumer() throws ClientException, InterruptedException { ProducerExample.sendDelayMessage(); ConsumerExample.getDelayMessageByPushConsumer(); // 第一次获取,时间未到,获取失败 Thread.sleep(Duration.ofSeconds(15).toMillis()); // 模拟延迟 ConsumerExample.getDelayMessageByPushConsumer(); // 第二次获取,成功 // 防止 main 线程关太快,导致消费者来不及消费程序就停了 Thread.sleep(Duration.ofSeconds(5).toMillis()); }
顺序消息
创建主题
Linux# 创建主题 sh mqadmin updateTopic -n <nameserver_address> -t <topic_name> \ -c <cluster_name> -a +message.type=FIFO # 创建订阅消费组,使其消息被有序消费(-o,order,表示有序) sh mqadmin updateSubGroup -c <cluster_name> -g <consumer_group_name> \ -n <nameserver_address> -o trueWindows# 创建主题 start %ROCKETMQ_HOME%\bin\mqadmin.cmd updateTopic -n <nameserver_address> ^ -t <topic_name> -c <cluster_name> -a +message.type=FIFO # 创建订阅消费组,使其消息被有序消费(-o,order,表示有序) start %ROCKETMQ_HOME%\bin\mqadmin.cmd updateSubGroup -c <cluster_name> ^ -g <consumer_group_name> -n <nameserver_address> -o true示例代码
生产者// 顺序消息;严格按照顺序处理;顺序消息发送必须要设置消息组; public static void sendFIFOMessage() throws ClientException { final String topic = "testFIFOTopic"; final String tag = "testFIFOTag"; // 将订单 ID、用户 ID 作为消息组关键字,实现相同组消息的强有序,不同组则无需保证顺序 // 对消费者而言无需关注消息组,因为它虽然会拿到不同组的消息,但对于同一消息组而言肯定是有序的 final String messageGroup = "testFIFOGroup"; final Producer producer = ProducerSingleton.getInstance(topic); for (int i = 0; i < 10; i++) { byte[] messageBody = ("发送顺序消息--" + i).getBytes(StandardCharsets.UTF_8); final Message message = ClientServiceProvider.loadService().newMessageBuilder() .setTopic(topic) .setTag(tag) .setKeys("yourMessageKey-" + i) .setMessageGroup(messageGroup) .setBody(messageBody) .build(); SendReceipt sendReceipt = producer.send(message); // 同步发送 log.info("Send FIFOMessage successfully, messageId={}", sendReceipt.getMessageId()); } }消费者// 顺序消息 public static void getFIFOMessageByPushConsumer() throws ClientException { final String topic = "testFIFOTopic"; final String tag = "testFIFOTag"; // 通过消费者分组,可使多个消费行为一致的消费者实现消息负载均衡。 final String consumerGroup = "testGroup3"; PushConsumer consumer = ConsumerSingleton.getPushConsumer(topic, tag, consumerGroup, messageView -> { log.info("收到顺序消息={}", StandardCharsets.UTF_8.decode(messageView.getBody())); return ConsumeResult.SUCCESS; }); // consumer.close(); }测试// 顺序消息 @Test void testFIFOMessageByPushConsumer() throws Exception { ProducerExample.sendFIFOMessage(); ConsumerExample.getFIFOMessageByPushConsumer(); // 防止 main 线程关太快,导致消费者来不及消费程序就停了 Thread.sleep(Duration.ofSeconds(5).toMillis()); }
事务消息
用于分布式事务的诉求,一个核心业务逻辑的执行,同时需要调用多个下游业务进行处理,支持最终一致性。
在普通消息基础上,支持二阶段的提交能力。事务消息可以保证和上游服务的事务一致性(不关注消费端的事务)。
如上游订单业务(消息生产者),涉及下游物流、积分服务(消息消费者)等,上游事务成功,则消息可被下游消费,否则消息回滚。
创建主题
Linuxsh mqadmin updateTopic -n <nameserver_address> -t <topic_name> \ -c <cluster_name> -a +message.type=TRANSACTIONWindowsstart %ROCKETMQ_HOME%\bin\mqadmin.cmd updateTopic -n <nameserver_address> ^ -t <topic_name> -c <cluster_name> -a +message.type=TRANSACTION示例代码
生产者// 事务消息 public static void sendTransactionMessage() throws ClientException { final String topic = "testTransactionTopic"; final String tag = "testTransactionTag"; // 获取事务消息生产者对象 final Producer producer = ProducerSingleton.getTransactionalInstance( messageView -> { // 设置事务检查器。若服务端未接收到第二次确认提交,回查事务 return checkSomething(messageView) ? TransactionResolution.COMMIT : TransactionResolution.ROLLBACK; }, topic); // 开启消息事务 final Transaction transaction = producer.beginTransaction(); byte[] messageBody = ("发送事务消息").getBytes(StandardCharsets.UTF_8); final Message message = ClientServiceProvider.loadService().newMessageBuilder() .setTopic(topic).setTag(tag) .setKeys("yourMessageKey") // 消息主键,一般设为业务相关联的唯一 id,如用户 id、订单 id 等 .setBody(messageBody) //一般事务消息都会设置一个本地事务关联的唯一 ID,用来做本地事务回查的校验。 .addProperty("OrderId", "xxx").build(); // 第一次提交,发送半事务消息 final SendReceipt sendReceipt = producer.send(message, transaction); // 同步发送 log.info("半事务消息发送成功;此时消费端无法获取消息,除非本地事务执行成功,messageId={}", sendReceipt.getMessageId()); // 模拟执行本地事务 boolean isLocalTranOk = doSomething(); // 二次提交;若网络原因导致提交失败,上面设置的事务检查器就会生效。 if (isLocalTranOk) { transaction.commit(); } else { transaction.rollback(); } // 回滚消息事务 } static boolean doSomething() { return true; } // 设置事务检查器,若由于网络原因二次提交失败,则执行这个方法 static boolean checkSomething(MessageView messageView) { String orderId = messageView.getProperties().get("OrderId"); // 检查数据库是否存在数据,若存在说明事务已经成功提交 return true; }消费者// 事务消息 public static void getTransactionMessageByPushConsumer() throws ClientException { final String topic = "testTransactionTopic"; final String tag = "testTransactionTag"; // 通过消费者分组,可使多个消费行为一致的消费者实现消息负载均衡。 final String consumerGroup = "testGroup4"; PushConsumer consumer = ConsumerSingleton.getPushConsumer(topic, tag, consumerGroup, messageView -> { log.info("收到事务消息={}", StandardCharsets.UTF_8.decode(messageView.getBody())); return ConsumeResult.SUCCESS; }); // consumer.close(); }测试// 事务消息 @Test void testTransactionMessageByPushConsumer() throws Exception { ProducerExample.sendTransactionMessage(); ConsumerExample.getTransactionMessageByPushConsumer(); // 防止 main 线程关太快,导致消费者来不及消费程序就停了 Thread.sleep(Duration.ofSeconds(10).toMillis()); }
消费者负载均衡
消费组间广播消费
各消费者分组都订阅相同的消息,即都能读取到队列中所有消息,适用于一对多的场景,如网关推送、配置推送等。
消费组内共享消费
消费者共同分担组内的所有消息,实现水平拆分和均衡负载,适用于削峰解耦。
详见官网,懒得写了。
死信队列
死信队列(Dead Letter Queue, DLQ)是消息队列系统中一种特殊的队列,用于存储那些无法被正常消费的消息(称为"死信")。当消息在主要队列中无法被成功处理时,系统会将这些消息转移到死信队列中,而不是直接丢弃。
当消息被拒绝且未设置重新入队、消息过期(TTL 到期)、达到最大重试次数、队列达到最大长度限制、消费者处理超时等情况下,消息会进入死信队列。
在 RocketMQ 中,死信队列命名格式为:%DLQ% + ConsumerGroup;
例如消费者组为 myConsumerGroup,则对应的死信队列为%DLQ%myConsumerGroup
可以专门创建一个消费者来消费死信队列:
DefaultMQPushConsumer dlqConsumer = new DefaultMQPushConsumer("DLQ_Consumer_Group");
// 订阅指定消费者组的死信队列(命名格式:%DLQ% + 原消费者组名)
dlqConsumer.subscribe("%DLQ%ORIGINAL_CONSUMER_GROUP", "*");
dlqConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
for (MessageExt msg : msgs) {
try {
// 1. 记录死信消息详情(关键!)
log.error("死信消息:Topic={}, MsgId={}, 重试次数={}, 错误原因:{}",
msg.getTopic(),
msg.getMsgId(),
msg.getReconsumeTimes(),
new String(msg.getBody(), StandardCharsets.UTF_8));
// 2. 人工干预处理(如:告警通知/存储到数据库)
alertService.sendAlert(msg);
// 3. 可选:修复后重新投递(谨慎操作!)
if (fixAndReprocess(msg)) {
Message newMsg = new Message("RECOVERY_TOPIC", msg.getBody());
producer.send(newMsg);
}
} catch (Exception e) {
// 死信处理失败时记录(避免循环死亡)
log.error("处理死信消息失败", e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER; // 会再次进入死信队列!
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
dlqConsumer.start();管理控制台可以查看死信队列,查找以%DLQ%开头的 Topic 即可,可以查看消息内容、重试次数等详细信息

