RocketMQ入门教程 - 从零开始学习RocketMQ消息队列
RocketMQ入门教程 - 从零开始学习RocketMQ消息队列
目录
1. RocketMQ简介
RocketMQ是阿里巴巴开源的一款分布式消息中间件,具有高性能、高可靠、高实时、分布式等特点。RocketMQ在2016年捐赠给Apache基金会,成为Apache顶级项目。
核心特点:
- ✅ 高性能:单机支持万级并发,低延迟
- ✅ 高可靠:支持消息持久化,支持多副本
- ✅ 高可用:支持集群部署,自动故障转移
- ✅ 分布式:支持分布式事务,支持顺序消息
- ✅ 丰富的消息类型:支持普通消息、顺序消息、事务消息、延时消息等
- ✅ 消息过滤:支持Tag过滤、SQL92过滤
- ✅ 消息轨迹:支持消息轨迹查询
- ✅ 丰富的API:支持多种编程语言
| 特性 | RocketMQ | Kafka | RabbitMQ | ActiveMQ |
|---|---|---|---|---|
| 吞吐量 | 高 | 非常高 | 中 | 中 |
| 延迟 | 低 | 低 | 低 | 中 |
| 可靠性 | 高 | 高 | 高 | 中 |
| 事务消息 | 支持 | 不支持 | 支持 | 支持 |
| 顺序消息 | 支持 | 支持 | 不支持 | 支持 |
| 延时消息 | 支持 | 不支持 | 支持 | 支持 |
| 消息过滤 | 支持 | 支持 | 支持 | 支持 |
- 异步解耦:系统间异步通信,降低耦合度
- 削峰填谷:处理突发流量,保护下游系统
- 数据分发:一个消息被多个系统消费
- 最终一致性:分布式事务的最终一致性保证
- 日志收集:收集系统日志进行分析
- 消息推送:实时消息推送服务
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Producer │────▶│ NameServer │◀────│ Consumer │
│ 生产者 │ │ 命名服务 │ │ 消费者 │
└─────────────┘ └─────────────┘ └─────────────┘
│ │
└──────────────┬─────────────────────────┘
│
┌───────▼────────┐
│ Broker │
│ 消息服务器 │
└────────────────┘核心组件:
- NameServer:路由注册中心,管理Broker信息
- Broker:消息存储和转发服务器
- Producer:消息生产者
- Consumer:消息消费者
2. 环境搭建
- JDK:JDK 1.8或更高版本
- Maven:3.6+(可选,用于Java项目)
- 操作系统:Linux、macOS、Windows
方式一:官网下载
访问Apache RocketMQ官网:https://rocketmq.apache.org/
下载最新版本(推荐5.x版本):
# 下载
wget https://archive.apache.org/dist/rocketmq/5.1.4/rocketmq-all-5.1.4-bin-release.zip
# 解压
unzip rocketmq-all-5.1.4-bin-release.zip
cd rocketmq-all-5.1.4-bin-release方式二:GitHub下载
git clone https://github.com/apache/rocketmq.git
cd rocketmq
mvn -Prelease-all -DskipTests clean install -ULinux/macOS
# 设置环境变量
export ROCKETMQ_HOME=/path/to/rocketmq-all-5.1.4-bin-release
export PATH=$ROCKETMQ_HOME/bin:$PATH
# 启动NameServer
nohup sh mqnamesrv > namesrv.log 2>&1 &
# 查看日志
tail -f ~/logs/rocketmqlogs/namesrv.logWindows
# 设置环境变量
set ROCKETMQ_HOME=C:\rocketmq-all-5.1.4-bin-release
set PATH=%ROCKETMQ_HOME%\bin;%PATH%
# 启动NameServer
start mqnamesrv.cmd验证启动:
# 查看进程
jps -l | grep NamesrvStartup
# 查看端口(9876)
netstat -an | grep 9876修改配置文件
编辑 conf/broker.conf:
# Broker名称
brokerName=broker-a
# Broker ID,0表示Master,大于0表示Slave
brokerId=0
# NameServer地址
namesrvAddr=127.0.0.1:9876
# 数据存储路径
storePathRootDir=/tmp/rocketmq/store
# CommitLog存储路径
storePathCommitLog=/tmp/rocketmq/store/commitlog
# 消息队列存储路径
storePathConsumeQueue=/tmp/rocketmq/store/consumequeue
# 消息索引存储路径
storePathIndex=/tmp/rocketmq/store/index启动Broker
Linux/macOS:
# 启动Broker
nohup sh mqbroker -n localhost:9876 -c conf/broker.conf > broker.log 2>&1 &
# 查看日志
tail -f ~/logs/rocketmqlogs/broker.logWindows:
start mqbroker.cmd -n localhost:9876 -c conf\broker.conf验证启动:
# 查看进程
jps -l | grep BrokerStartup
# 查看端口(10909, 10911, 10912)
netstat -an | grep 10911使用Docker Compose
创建 docker-compose.yml:
version: '3.8'
services:
namesrv:
image: apache/rocketmq:5.1.4
container_name: rmqnamesrv
ports:
- 9876:9876
environment:
JAVA_OPT_EXT: "-server -Xms512m -Xmx512m -Xmn256m"
command: sh mqnamesrv
broker:
image: apache/rocketmq:5.1.4
container_name: rmqbroker
ports:
- 10909:10909
- 10911:10911
- 10912:10912
environment:
NAMESRV_ADDR: namesrv:9876
JAVA_OPT_EXT: "-server -Xms512m -Xmx512m -Xmn256m"
command: sh mqbroker -n namesrv:9876 -c /home/rocketmq/rocketmq-5.1.4/conf/broker.conf
depends_on:
- namesrv
console:
image: styletang/rocketmq-console-ng:latest
container_name: rmqconsole
ports:
- 8080:8080
environment:
JAVA_OPTS: "-Drocketmq.namesrv.addr=namesrv:9876"
depends_on:
- namesrv
- broker启动:
docker-compose up -d发送测试消息
# 设置环境变量
export NAMESRV_ADDR=localhost:9876
# 发送消息
sh tools.sh org.apache.rocketmq.example.quickstart.Producer接收测试消息
# 接收消息
sh tools.sh org.apache.rocketmq.example.quickstart.Consumer3. 核心概念
Topic是消息的逻辑分类,生产者发送消息到Topic,消费者从Topic订阅消息。
// Topic名称示例
String topic = "OrderTopic";
String topic = "PaymentTopic";MessageQueue是Topic的物理分区,一个Topic可以包含多个MessageQueue,实现消息的分布式存储。
Topic: OrderTopic
├── MessageQueue0
├── MessageQueue1
├── MessageQueue2
└── MessageQueue3Tag是消息的二级分类,用于消息过滤。一个消息只能有一个Tag。
// 消息Tag示例
String tag = "CREATE_ORDER";
String tag = "CANCEL_ORDER";Key是消息的唯一标识,用于消息查询和去重。
String keys = "ORDER_20240115_001";Producer是消息的发送方,负责发送消息到Broker。
Producer类型:
- 普通Producer:发送普通消息
- 事务Producer:发送事务消息
Consumer是消息的接收方,从Broker订阅并消费消息。
Consumer类型:
- PushConsumer:Broker主动推送消息
- PullConsumer:Consumer主动拉取消息
消费模式:
- 集群模式(CLUSTERING):多个Consumer实例共同消费,每条消息只被消费一次
- 广播模式(BROADCASTING):每个Consumer实例都消费所有消息
ConsumerGroup是消费者的逻辑分组,相同ConsumerGroup的消费者共同消费消息。
String consumerGroup = "OrderConsumerGroup";NameServer是路由注册中心,管理Broker的路由信息。
功能:
- Broker注册
- 路由信息管理
- 客户端路由查询
Broker是消息存储和转发的服务器。
功能:
- 消息存储
- 消息转发
- 消息查询
- 高可用支持
4. 第一个RocketMQ程序
添加依赖
在 pom.xml 中添加RocketMQ依赖:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>rocketmq-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<rocketmq.version>5.1.4</rocketmq.version>
</properties>
<dependencies>
<!-- RocketMQ客户端 -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>${rocketmq.version}</version>
</dependency>
<!-- 日志框架 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.36</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.12</version>
</dependency>
</dependencies>
</project>创建 SimpleProducer.java:
package com.example.rocketmq;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class SimpleProducer {
public static void main(String[] args) throws MQClientException {
// 创建Producer实例
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup");
// 设置NameServer地址
producer.setNamesrvAddr("localhost:9876");
// 启动Producer
producer.start();
try {
// 发送10条消息
for (int i = 0; i < 10; i++) {
// 创建消息
Message msg = new Message(
"TopicTest", // Topic
"TagA", // Tag
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// 发送消息
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 关闭Producer
producer.shutdown();
}
}
}创建 SimpleConsumer.java:
package com.example.rocketmq;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class SimpleConsumer {
public static void main(String[] args) throws MQClientException {
// 创建Consumer实例
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroup");
// 设置NameServer地址
consumer.setNamesrvAddr("localhost:9876");
// 设置消费位置:从最后位置开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
// 订阅Topic和Tag
consumer.subscribe("TopicTest", "*");
// 注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> messages,
ConsumeConcurrentlyContext context) {
for (MessageExt msg : messages) {
System.out.printf("收到消息: %s%n",
new String(msg.getBody()));
}
// 返回消费状态:成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动Consumer
consumer.start();
System.out.println("Consumer启动成功,等待消息...");
}
}运行顺序
- 启动NameServer和Broker(如果还没启动)
- 运行Consumer:先启动消费者,等待消息
- 运行Producer:发送消息
运行结果
Producer输出:
SendResult [sendStatus=SEND_OK, msgId=..., offsetMsgId=..., messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=0], queueOffset=0]
...Consumer输出:
Consumer启动成功,等待消息...
收到消息: Hello RocketMQ 0
收到消息: Hello RocketMQ 1
...5. 消息发送与接收
同步发送会等待Broker返回结果,可靠性高但性能较低。
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup");
producer.setNamesrvAddr("localhost:9876");
producer.start();
try {
Message msg = new Message("TopicTest", "TagA", "同步消息".getBytes());
SendResult sendResult = producer.send(msg);
System.out.println("发送结果: " + sendResult);
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.shutdown();
}异步发送不等待结果,通过回调处理结果,性能高。
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup");
producer.setNamesrvAddr("localhost:9876");
producer.start();
Message msg = new Message("TopicTest", "TagA", "异步消息".getBytes());
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("发送成功: " + sendResult);
}
@Override
public void onException(Throwable e) {
System.out.println("发送失败: " + e.getMessage());
}
});
// 等待回调执行
Thread.sleep(3000);
producer.shutdown();单向发送只发送不等待结果,适用于日志等对可靠性要求不高的场景。
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup");
producer.setNamesrvAddr("localhost:9876");
producer.start();
Message msg = new Message("TopicTest", "TagA", "单向消息".getBytes());
// 单向发送,不等待结果
producer.sendOneway(msg);
producer.shutdown();Message msg = new Message("TopicTest", "TagA", "消息内容".getBytes());
// 设置消息Key(用于查询和去重)
msg.setKeys("ORDER_20240115_001");
// 设置延迟级别(1-18,对应不同延迟时间)
msg.setDelayTimeLevel(3);
// 设置用户属性
msg.putUserProperty("orderId", "12345");
msg.putUserProperty("userId", "10001");
producer.send(msg);DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroup");
consumer.setNamesrvAddr("localhost:9876");
// 设置消费位置
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
// 或
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 或
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
// 设置消费模式
consumer.setMessageModel(MessageModel.CLUSTERING); // 集群模式
// 或
consumer.setMessageModel(MessageModel.BROADCASTING); // 广播模式
// 设置最大消费线程数
consumer.setConsumeThreadMin(5);
consumer.setConsumeThreadMax(20);
// 订阅Topic和Tag
consumer.subscribe("TopicTest", "TagA || TagB"); // 支持Tag过滤
consumer.start();并发消费监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> messages,
ConsumeConcurrentlyContext context) {
for (MessageExt msg : messages) {
try {
// 处理消息
String body = new String(msg.getBody());
System.out.println("消费消息: " + body);
// 处理成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
// 处理失败,稍后重试
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});顺序消费监听器
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(
List<MessageExt> messages,
ConsumeOrderlyContext context) {
for (MessageExt msg : messages) {
try {
// 处理消息
String body = new String(msg.getBody());
System.out.println("消费消息: " + body);
return ConsumeOrderlyStatus.SUCCESS;
} catch (Exception e) {
// 处理失败
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
}
return ConsumeOrderlyStatus.SUCCESS;
}
});6. 消息类型详解
普通消息是最基本的消息类型,支持异步解耦。
Message msg = new Message("TopicTest", "TagA", "普通消息".getBytes());
SendResult result = producer.send(msg);顺序消息保证消息按照发送顺序被消费。
发送顺序消息
// 创建顺序消息Producer
DefaultMQProducer producer = new DefaultMQProducer("OrderProducerGroup");
producer.setNamesrvAddr("localhost:9876");
producer.start();
for (int i = 0; i < 10; i++) {
Message msg = new Message(
"OrderTopic",
"TagA",
("订单消息 " + i).getBytes()
);
// 设置消息Key,相同Key的消息会被发送到同一个队列
msg.setKeys("ORDER_001");
// 发送到指定队列(保证顺序)
SendResult result = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
// 根据业务参数选择队列
Integer orderId = (Integer) arg;
int index = orderId % mqs.size();
return mqs.get(index);
}
}, 1); // 传入业务参数(订单ID)
}
producer.shutdown();消费顺序消息
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("OrderConsumerGroup");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("OrderTopic", "*");
// 使用顺序消费监听器
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(
List<MessageExt> messages,
ConsumeOrderlyContext context) {
for (MessageExt msg : messages) {
System.out.println("消费顺序消息: " + new String(msg.getBody()));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();延时消息可以指定延迟时间后消费。
Message msg = new Message("TopicTest", "TagA", "延时消息".getBytes());
// 设置延时级别(1-18)
// 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
msg.setDelayTimeLevel(3); // 延迟10秒
SendResult result = producer.send(msg);延时级别对应关系:
1 -> 1秒
2 -> 5秒
3 -> 10秒
4 -> 30秒
5 -> 1分钟
...批量发送可以提高性能。
List<Message> messages = new ArrayList<>();
for (int i = 0; i < 10; i++) {
Message msg = new Message(
"TopicTest",
"TagA",
("批量消息 " + i).getBytes()
);
messages.add(msg);
}
// 批量发送
SendResult result = producer.send(messages);注意:
- 批量消息总大小不能超过4MB
- 批量消息必须有相同的Topic
- 不能是延时消息或事务消息
7. 消息过滤
Tag过滤是RocketMQ最简单的过滤方式。
// 生产者:发送带Tag的消息
Message msg = new Message("TopicTest", "TagA", "消息内容".getBytes());
producer.send(msg);
// 消费者:订阅指定Tag
consumer.subscribe("TopicTest", "TagA"); // 只消费TagA
consumer.subscribe("TopicTest", "TagA || TagB"); // 消费TagA或TagB
consumer.subscribe("TopicTest", "*"); // 消费所有TagSQL92过滤支持更复杂的过滤条件。
启用SQL过滤
在Broker配置文件中添加:
enablePropertyFilter=true使用SQL过滤
// 生产者:设置消息属性
Message msg = new Message("TopicTest", "TagA", "消息内容".getBytes());
msg.putUserProperty("orderId", "12345");
msg.putUserProperty("amount", "100.00");
msg.putUserProperty("status", "PAID");
producer.send(msg);
// 消费者:使用SQL过滤
consumer.subscribe("TopicTest",
MessageSelector.bySql("orderId = '12345' AND amount > 50"));支持的SQL语法:
- 数值比较:
>,>=,<,<=,BETWEEN,= - 字符比较:
=,<>,IN - 逻辑运算:
AND,OR,NOT - 空值判断:
IS NULL,IS NOT NULL
8. 事务消息
事务消息保证本地事务和消息发送的原子性,适用于分布式事务场景。
1. 发送Half消息到Broker
2. 执行本地事务
3. 根据本地事务结果提交或回滚消息
4. Broker根据结果决定消息是否可消费// 创建事务Producer
TransactionMQProducer producer = new TransactionMQProducer("TransactionProducerGroup");
producer.setNamesrvAddr("localhost:9876");
// 设置事务监听器
producer.setTransactionListener(new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
// 执行本地事务
// 例如:保存订单到数据库
saveOrder(arg);
// 本地事务成功,提交消息
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
// 本地事务失败,回滚消息
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 检查本地事务状态
// 例如:查询订单是否已保存
if (isOrderSaved(msg.getKeys())) {
return LocalTransactionState.COMMIT_MESSAGE;
} else {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
});
producer.start();
// 发送事务消息
Message msg = new Message("OrderTopic", "TagA", "订单消息".getBytes());
msg.setKeys("ORDER_001");
TransactionSendResult result = producer.sendMessageInTransaction(msg, orderData);
producer.shutdown();9. 顺序消息
- 订单创建、支付、发货的顺序
- 用户注册、激活、登录的顺序
- 数据同步的顺序
所有消息严格按照顺序消费。
// 创建顺序Producer
DefaultMQProducer producer = new DefaultMQProducer("OrderProducerGroup");
producer.setNamesrvAddr("localhost:9876");
producer.start();
// 发送到同一个队列(队列ID为0)
Message msg = new Message("OrderTopic", "TagA", "消息".getBytes());
SendResult result = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
// 始终选择第一个队列
return mqs.get(0);
}
}, null);
producer.shutdown();同一分区的消息按顺序消费,不同分区可以并行消费。
// 根据业务ID选择队列,保证同一业务的消息顺序
SendResult result = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
String orderId = (String) arg;
int index = Math.abs(orderId.hashCode()) % mqs.size();
return mqs.get(index);
}
}, orderId);10. 延时消息
- 订单超时取消
- 定时任务触发
- 延迟通知
Message msg = new Message("TopicTest", "TagA", "延时消息".getBytes());
// 设置延时级别
// 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
msg.setDelayTimeLevel(3); // 延迟10秒
SendResult result = producer.send(msg);如果需要更精确的延时,可以在消息属性中设置:
Message msg = new Message("TopicTest", "TagA", "延时消息".getBytes());
// 设置期望的消费时间(时间戳)
long delayTime = System.currentTimeMillis() + 30000; // 30秒后
msg.putUserProperty("delayTime", String.valueOf(delayTime));
producer.send(msg);
// 消费者检查时间戳,如果未到时间则重新投递11. 批量消息
List<Message> messages = new ArrayList<>();
for (int i = 0; i < 100; i++) {
Message msg = new Message(
"TopicTest",
"TagA",
("批量消息 " + i).getBytes()
);
messages.add(msg);
}
// 批量发送
SendResult result = producer.send(messages);- 批量消息总大小不能超过4MB
- 批量消息必须有相同的Topic
- 不能是延时消息或事务消息
如果消息数量很大,可以分批发送:
List<Message> allMessages = new ArrayList<>();
// ... 添加消息
int batchSize = 100;
for (int i = 0; i < allMessages.size(); i += batchSize) {
int end = Math.min(i + batchSize, allMessages.size());
List<Message> batch = allMessages.subList(i, end);
producer.send(batch);
}12. 消息重试与死信队列
并发消费重试
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> messages,
ConsumeConcurrentlyContext context) {
try {
// 处理消息
processMessage(messages);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
// 返回RECONSUME_LATER,消息会重试
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
});重试时间间隔:
第1次重试:10秒后
第2次重试:30秒后
第3次重试:1分钟后
第4次重试:2分钟后
第5次重试:3分钟后
第6次重试:4分钟后
第7次重试:5分钟后
第8次重试:6分钟后
第9次重试:7分钟后
第10次重试:8分钟后
第11次重试:9分钟后
第12次重试:10分钟后
第13次重试:20分钟后
第14次重试:30分钟后
第15次重试:1小时后
第16次重试:2小时后顺序消费重试
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(
List<MessageExt> messages,
ConsumeOrderlyContext context) {
try {
processMessage(messages);
return ConsumeOrderlyStatus.SUCCESS;
} catch (Exception e) {
// 返回SUSPEND_CURRENT_QUEUE_A_MOMENT,暂停当前队列
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
}
});当消息重试次数达到最大次数(默认16次)后,消息会被发送到死信队列。
死信队列命名规则:
%DLQ%ConsumerGroup处理死信消息:
// 创建死信队列消费者
DefaultMQPushConsumer dlqConsumer = new DefaultMQPushConsumer("DLQConsumerGroup");
dlqConsumer.setNamesrvAddr("localhost:9876");
dlqConsumer.subscribe("%DLQ%ConsumerGroup", "*");
dlqConsumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> messages,
ConsumeConcurrentlyContext context) {
for (MessageExt msg : messages) {
System.out.println("死信消息: " + new String(msg.getBody()));
// 记录日志、告警、人工处理等
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
dlqConsumer.start();13. 集群部署
单Master模式
适用于测试环境,风险高。
多Master模式
无Slave,性能高,但单机故障会丢失消息。
多Master多Slave模式(异步复制)
性能高,数据可能丢失。
多Master多Slave模式(同步双写)
数据不丢失,但性能略低。
NameServer集群
# namesrv1.conf
listenPort=9876# namesrv2.conf
listenPort=9877Broker集群
# broker-a-master.conf
brokerName=broker-a
brokerId=0
namesrvAddr=192.168.1.1:9876;192.168.1.2:9876
brokerClusterName=DefaultCluster# broker-a-slave.conf
brokerName=broker-a
brokerId=1
namesrvAddr=192.168.1.1:9876;192.168.1.2:9876
brokerClusterName=DefaultCluster14. 监控与管理
RocketMQ Console是官方提供的Web管理控制台。
启动Console
# 下载rocketmq-console
git clone https://github.com/apache/rocketmq-dashboard.git
cd rocketmq-dashboard
mvn clean package -Dmaven.test.skip=true
# 运行
java -jar target/rocketmq-dashboard-1.0.0.jar# 查看集群信息
sh mqadmin clusterList -n localhost:9876
# 查看Topic列表
sh mqadmin topicList -n localhost:9876
# 创建Topic
sh mqadmin updateTopic -n localhost:9876 -t TopicTest -c DefaultCluster
# 查看消费者状态
sh mqadmin consumerStatus -n localhost:9876 -g ConsumerGroup
# 查看消息
sh mqadmin queryMsgById -n localhost:9876 -i msgId15. 最佳实践与常见问题
Producer最佳实践
- 使用单例Producer
- 设置合理的超时时间
- 使用异步发送提高性能
- 设置消息Key便于查询
Consumer最佳实践
- 保证幂等性
- 快速消费,避免阻塞
- 合理设置消费线程数
- 处理异常,避免消息丢失
Topic和Tag设计
- Topic按业务划分
- Tag按消息类型划分
- 避免Tag过多
消息设计
- 消息体不要过大
- 设置消息Key
- 合理使用用户属性
Q1: 消息丢失怎么办?
- 使用同步发送
- 使用事务消息
- 开启消息持久化
- 使用多Master多Slave模式(同步双写)
Q2: 消息重复消费怎么办?
- 保证消费幂等性
- 使用消息Key去重
- 使用数据库唯一约束
Q3: 消费速度慢怎么办?
- 增加Consumer实例
- 增加消费线程数
- 优化消费逻辑
- 使用批量消费
Q4: 如何保证消息顺序?
- 使用顺序消息
- 同一业务的消息发送到同一队列
- 使用顺序消费监听器
16. 总结与进阶
通过本教程,你已经掌握了:
- ✅ RocketMQ的基本概念和架构
- ✅ 环境搭建和基本使用
- ✅ 消息发送和接收
- ✅ 各种消息类型
- ✅ 消息过滤和事务消息
- ✅ 集群部署和监控
源码学习
- NameServer实现原理
- Broker消息存储机制
- 消息投递流程
性能优化
- 消息压缩
- 批量发送优化
- 消费性能优化
高可用设计
- 多机房部署
- 消息复制机制
- 故障转移
Spring集成
- Spring Boot集成
- 事务消息集成
- 消息监听器配置
- 官方文档:https://rocketmq.apache.org/
- GitHub:https://github.com/apache/rocketmq
- RocketMQ Console:https://github.com/apache/rocketmq-dashboard
- 实现订单系统的异步处理
- 实现分布式事务场景
- 实现消息推送系统
- 实现日志收集系统
结语
RocketMQ是一个功能强大、性能优秀的分布式消息中间件。通过本教程的学习,相信你已经掌握了RocketMQ的核心功能和使用方法。
记住:
- 多实践:理论结合实践,多写代码
- 理解原理:理解消息队列的工作原理
- 关注性能:注意性能优化
- 保证可靠性:保证消息不丢失、不重复
祝你学习愉快,编程顺利! 🚀
本教程由Java突击队学习社区编写,如有问题欢迎反馈。