RabbitMQ入门教程 - 从零开始学习消息队列RabbitMQ
RabbitMQ入门教程 - 从零开始学习消息队列RabbitMQ
目录
1. RabbitMQ简介
RabbitMQ是一个开源的消息代理和队列服务器,用来通过普通协议在完全不同的应用之间共享数据。它是用Erlang语言编写的,基于AMQP(Advanced Message Queuing Protocol,高级消息队列协议)实现。
消息队列解决的问题:
- ✅ 解耦:系统之间通过消息通信,降低耦合度
- ✅ 异步:提高系统响应速度
- ✅ 削峰:缓解高并发压力
- ✅ 可靠性:保证消息的可靠传递
- ✅ 扩展性:易于水平扩展
典型应用场景:
订单系统:订单创建后异步处理库存、支付、物流
日志收集:应用日志异步写入日志系统
消息通知:发送邮件、短信等通知
数据同步:不同系统之间的数据同步
可靠性:支持消息持久化、传输确认、发布确认
灵活的路由:支持多种消息路由方式
集群:支持高可用集群部署
管理界面:提供友好的Web管理界面
多语言支持:支持多种编程语言
插件系统:丰富的插件扩展功能
| 特性 | RabbitMQ | Kafka | RocketMQ | ActiveMQ |
|---|---|---|---|---|
| 协议 | AMQP | 自定义协议 | 自定义协议 | 多种协议 |
| 吞吐量 | 中等 | 极高 | 高 | 中等 |
| 延迟 | 低 | 中等 | 低 | 低 |
| 可靠性 | 高 | 高 | 高 | 高 |
| 适用场景 | 通用消息队列 | 日志、大数据 | 电商、金融 | 通用消息队列 |
2. 环境搭建
Windows系统
安装Erlang
- 下载地址:https://www.erlang.org/downloads
- 安装完成后验证:
erl -version
安装RabbitMQ
- 下载地址:https://www.rabbitmq.com/download.html
- 下载Windows版本安装包
- 安装完成后,RabbitMQ会自动启动
启动管理插件
rabbitmq-plugins enable rabbitmq_management访问管理界面
- 浏览器打开:http://localhost:15672
- 默认用户名:guest
- 默认密码:guest
macOS系统
使用Homebrew安装(推荐):
# 安装RabbitMQ
brew install rabbitmq
# 启动RabbitMQ
brew services start rabbitmq
# 启用管理插件
rabbitmq-plugins enable rabbitmq_management
# 访问管理界面
# http://localhost:15672Linux系统
Ubuntu/Debian:
# 添加RabbitMQ仓库
echo 'deb https://dl.bintray.com/rabbitmq-erlang/debian bionic erlang' | sudo tee /etc/apt/sources.list.d/bintray.rabbitmq.list
# 添加密钥
wget -O- https://www.rabbitmq.com/rabbitmq-release-signing-key.asc | sudo apt-key add -
# 更新包列表
sudo apt-get update
# 安装RabbitMQ
sudo apt-get install rabbitmq-server
# 启动服务
sudo systemctl start rabbitmq-server
# 设置开机自启
sudo systemctl enable rabbitmq-server
# 启用管理插件
sudo rabbitmq-plugins enable rabbitmq_managementCentOS/RHEL:
# 安装Erlang
sudo yum install erlang
# 安装RabbitMQ
sudo yum install rabbitmq-server
# 启动服务
sudo systemctl start rabbitmq-server
# 启用管理插件
sudo rabbitmq-plugins enable rabbitmq_management# 拉取RabbitMQ镜像
docker pull rabbitmq:3-management
# 运行RabbitMQ容器
docker run -d \
--name rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=admin123 \
rabbitmq:3-management
# 访问管理界面
# http://localhost:15672
# 用户名:admin
# 密码:admin123# 检查RabbitMQ状态
rabbitmqctl status
# 查看队列
rabbitmqctl list_queues
# 查看交换机
rabbitmqctl list_exchanges
# 查看连接
rabbitmqctl list_connectionsMaven项目配置
在pom.xml中添加依赖:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>rabbitmq-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<rabbitmq.version>5.16.0</rabbitmq.version>
</properties>
<dependencies>
<!-- RabbitMQ客户端 -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>${rabbitmq.version}</version>
</dependency>
<!-- 日志框架 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.36</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.12</version>
</dependency>
<!-- JUnit测试 -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>项目结构
rabbitmq-demo/
├── src/
│ ├── main/
│ │ ├── java/
│ │ │ └── com/
│ │ │ └── example/
│ │ │ ├── util/
│ │ │ │ └── RabbitMQUtil.java
│ │ │ ├── producer/
│ │ │ │ └── SimpleProducer.java
│ │ │ └── consumer/
│ │ │ └── SimpleConsumer.java
│ │ └── resources/
│ │ └── logback.xml
│ └── test/
│ └── java/
└── pom.xml3. RabbitMQ核心概念
Producer(生产者)
发送消息的应用程序。
Consumer(消费者)
接收消息的应用程序。
Queue(队列)
存储消息的缓冲区。消息存储在队列中,等待消费者处理。
队列特性:
- FIFO(先进先出)
- 可以持久化
- 可以设置优先级
- 可以设置TTL(生存时间)
Exchange(交换机)
接收生产者发送的消息,并根据路由规则将消息路由到队列。
Exchange类型:
- direct:直接路由,精确匹配routing key
- fanout:广播,忽略routing key
- topic:主题路由,支持模式匹配
- headers:头部路由,根据headers匹配
Binding(绑定)
连接Exchange和Queue的规则。
Routing Key(路由键)
生产者发送消息时指定的键,用于路由消息。
Producer → Exchange → Binding → Queue → Consumer详细流程:
- 生产者发送消息到Exchange
- Exchange根据routing key和binding规则路由消息
- 消息被路由到匹配的Queue
- 消费者从Queue中获取消息
- 消费者处理完成后发送确认
生产者确认
- 事务模式:使用事务保证消息发送
- 确认模式:使用确认机制保证消息发送
消费者确认
自动确认:消息发送后立即确认
手动确认:消息处理完成后手动确认
队列持久化:队列在RabbitMQ重启后仍然存在
消息持久化:消息在RabbitMQ重启后仍然存在
交换机持久化:交换机在RabbitMQ重启后仍然存在
4. 第一个RabbitMQ程序
创建RabbitMQUtil.java:
package com.example.util;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* RabbitMQ工具类
*/
public class RabbitMQUtil {
private static final String HOST = "localhost";
private static final int PORT = 5672;
private static final String USERNAME = "guest";
private static final String PASSWORD = "guest";
private static final String VIRTUAL_HOST = "/";
/**
* 获取连接
*/
public static Connection getConnection() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(HOST);
factory.setPort(PORT);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
factory.setVirtualHost(VIRTUAL_HOST);
return factory.newConnection();
}
/**
* 关闭连接
*/
public static void closeConnection(Connection connection) throws IOException {
if (connection != null && connection.isOpen()) {
connection.close();
}
}
/**
* 关闭通道
*/
public static void closeChannel(Channel channel) throws IOException, TimeoutException {
if (channel != null && channel.isOpen()) {
channel.close();
}
}
}创建SimpleProducer.java:
package com.example.producer;
import com.example.util.RabbitMQUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 简单生产者
*/
public class SimpleProducer {
private static final String QUEUE_NAME = "hello";
public static void main(String[] args) {
Connection connection = null;
Channel channel = null;
try {
// 获取连接
connection = RabbitMQUtil.getConnection();
// 创建通道
channel = connection.createChannel();
// 声明队列(如果不存在则创建)
// 参数说明:
// queue: 队列名称
// durable: 是否持久化
// exclusive: 是否独占
// autoDelete: 是否自动删除
// arguments: 其他参数
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 发送消息
String message = "Hello RabbitMQ!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
} catch (IOException | TimeoutException e) {
e.printStackTrace();
} finally {
// 关闭资源
try {
RabbitMQUtil.closeChannel(channel);
RabbitMQUtil.closeConnection(connection);
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}
}创建SimpleConsumer.java:
package com.example.consumer;
import com.example.util.RabbitMQUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 简单消费者
*/
public class SimpleConsumer {
private static final String QUEUE_NAME = "hello";
public static void main(String[] args) {
Connection connection = null;
Channel channel = null;
try {
// 获取连接
connection = RabbitMQUtil.getConnection();
// 创建通道
channel = connection.createChannel();
// 声明队列(必须与生产者一致)
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
// 创建消费者
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
// 手动确认消息
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
// 消费消息
// 参数说明:
// queue: 队列名称
// autoAck: 是否自动确认
// consumer: 消费者对象
channel.basicConsume(QUEUE_NAME, false, consumer);
// 保持程序运行
Thread.sleep(10000);
} catch (IOException | TimeoutException | InterruptedException e) {
e.printStackTrace();
} finally {
// 关闭资源
try {
RabbitMQUtil.closeChannel(channel);
RabbitMQUtil.closeConnection(connection);
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}
}- 先运行消费者:启动
SimpleConsumer,等待消息 - 再运行生产者:启动
SimpleProducer,发送消息 - 查看结果:消费者应该能接收到消息
5. 工作队列模式
工作队列(Work Queue)模式用于在多个消费者之间分配任务。当有大量任务需要处理时,可以使用工作队列将任务分发给多个消费者并行处理。
默认情况下,RabbitMQ会按顺序将消息分发给消费者,每个消费者处理的消息数量大致相等。
生产者:
package com.example.producer;
import com.example.util.RabbitMQUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 工作队列生产者
*/
public class WorkQueueProducer {
private static final String QUEUE_NAME = "work_queue";
public static void main(String[] args) {
Connection connection = null;
Channel channel = null;
try {
connection = RabbitMQUtil.getConnection();
channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 发送多条消息
for (int i = 1; i <= 10; i++) {
String message = "Task " + i;
channel.basicPublish("", QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
}
} catch (IOException | TimeoutException e) {
e.printStackTrace();
} finally {
try {
RabbitMQUtil.closeChannel(channel);
RabbitMQUtil.closeConnection(connection);
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}
}消费者:
package com.example.consumer;
import com.example.util.RabbitMQUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 工作队列消费者
*/
public class WorkQueueConsumer {
private static final String QUEUE_NAME = "work_queue";
public static void main(String[] args) {
Connection connection = null;
Channel channel = null;
try {
connection = RabbitMQUtil.getConnection();
channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 设置每次只接收一条消息
channel.basicQos(1);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
// 模拟处理任务
doWork(message);
} finally {
// 手动确认
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
// 关闭自动确认,改为手动确认
channel.basicConsume(QUEUE_NAME, false, consumer);
// 保持运行
Thread.sleep(60000);
} catch (IOException | TimeoutException | InterruptedException e) {
e.printStackTrace();
} finally {
try {
RabbitMQUtil.closeChannel(channel);
RabbitMQUtil.closeConnection(connection);
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}
private static void doWork(String task) {
try {
// 模拟处理时间
Thread.sleep(1000);
System.out.println(" [x] Done: " + task);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}使用basicQos(1)实现公平分发,确保每个消费者处理完当前消息后才接收下一条消息。
// 设置每次只接收一条消息
channel.basicQos(1);6. 发布订阅模式
发布订阅(Publish/Subscribe)模式中,生产者发送的消息会被所有订阅的消费者接收。这需要使用fanout类型的Exchange。
Fanout Exchange会将消息路由到所有绑定的队列,忽略routing key。
生产者:
package com.example.producer;
import com.example.util.RabbitMQUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 发布订阅生产者
*/
public class PublishSubscribeProducer {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) {
Connection connection = null;
Channel channel = null;
try {
connection = RabbitMQUtil.getConnection();
channel = connection.createChannel();
// 声明fanout类型的Exchange
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// 发送消息
String message = "Hello Fanout Exchange!";
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
} catch (IOException | TimeoutException e) {
e.printStackTrace();
} finally {
try {
RabbitMQUtil.closeChannel(channel);
RabbitMQUtil.closeConnection(connection);
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}
}消费者:
package com.example.consumer;
import com.example.util.RabbitMQUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 发布订阅消费者
*/
public class PublishSubscribeConsumer {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) {
Connection connection = null;
Channel channel = null;
try {
connection = RabbitMQUtil.getConnection();
channel = connection.createChannel();
// 声明Exchange
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// 声明临时队列(非持久化,连接关闭后自动删除)
String queueName = channel.queueDeclare().getQueue();
// 绑定队列到Exchange
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
channel.basicConsume(queueName, true, consumer);
Thread.sleep(60000);
} catch (IOException | TimeoutException | InterruptedException e) {
e.printStackTrace();
} finally {
try {
RabbitMQUtil.closeChannel(channel);
RabbitMQUtil.closeConnection(connection);
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}
}7. 路由模式
路由(Routing)模式使用direct类型的Exchange,根据routing key精确匹配路由消息到队列。
生产者:
package com.example.producer;
import com.example.util.RabbitMQUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 路由模式生产者
*/
public class RoutingProducer {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) {
Connection connection = null;
Channel channel = null;
try {
connection = RabbitMQUtil.getConnection();
channel = connection.createChannel();
// 声明direct类型的Exchange
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
// 发送不同级别的日志
String[] severities = {"info", "warning", "error"};
String[] messages = {"Info message", "Warning message", "Error message"};
for (int i = 0; i < severities.length; i++) {
String severity = severities[i];
String message = messages[i];
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + severity + "':'" + message + "'");
}
} catch (IOException | TimeoutException e) {
e.printStackTrace();
} finally {
try {
RabbitMQUtil.closeChannel(channel);
RabbitMQUtil.closeConnection(connection);
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}
}消费者:
package com.example.consumer;
import com.example.util.RabbitMQUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 路由模式消费者
*/
public class RoutingConsumer {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) {
if (args.length < 1) {
System.err.println("Usage: RoutingConsumer [info] [warning] [error]");
System.exit(1);
}
Connection connection = null;
Channel channel = null;
try {
connection = RabbitMQUtil.getConnection();
channel = connection.createChannel();
// 声明Exchange
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
// 声明队列
String queueName = channel.queueDeclare().getQueue();
// 根据命令行参数绑定routing key
for (String severity : args) {
channel.queueBind(queueName, EXCHANGE_NAME, severity);
}
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
String routingKey = envelope.getRoutingKey();
System.out.println(" [x] Received '" + routingKey + "':'" + message + "'");
}
};
channel.basicConsume(queueName, true, consumer);
Thread.sleep(60000);
} catch (IOException | TimeoutException | InterruptedException e) {
e.printStackTrace();
} finally {
try {
RabbitMQUtil.closeChannel(channel);
RabbitMQUtil.closeConnection(connection);
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}
}8. 主题模式
主题(Topics)模式使用topic类型的Exchange,支持routing key的模式匹配。
*:匹配一个单词#:匹配零个或多个单词- 单词之间用
.分隔
示例:
*.orange.*:匹配中间是orange的三段routing key*.*.rabbit:匹配最后是rabbit的三段routing keylazy.#:匹配以lazy开头的routing key
生产者:
package com.example.producer;
import com.example.util.RabbitMQUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 主题模式生产者
*/
public class TopicsProducer {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) {
Connection connection = null;
Channel channel = null;
try {
connection = RabbitMQUtil.getConnection();
channel = connection.createChannel();
// 声明topic类型的Exchange
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
// 发送不同routing key的消息
String[][] bindings = {
{"quick.orange.rabbit", "A quick orange rabbit"},
{"lazy.orange.elephant", "A lazy orange elephant"},
{"quick.orange.fox", "A quick orange fox"},
{"lazy.brown.fox", "A lazy brown fox"},
{"lazy.pink.rabbit", "A lazy pink rabbit"},
{"quick.brown.fox", "A quick brown fox"}
};
for (String[] binding : bindings) {
String routingKey = binding[0];
String message = binding[1];
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
}
} catch (IOException | TimeoutException e) {
e.printStackTrace();
} finally {
try {
RabbitMQUtil.closeChannel(channel);
RabbitMQUtil.closeConnection(connection);
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}
}消费者:
package com.example.consumer;
import com.example.util.RabbitMQUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 主题模式消费者
*/
public class TopicsConsumer {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) {
if (args.length < 1) {
System.err.println("Usage: TopicsConsumer [binding_key]...");
System.exit(1);
}
Connection connection = null;
Channel channel = null;
try {
connection = RabbitMQUtil.getConnection();
channel = connection.createChannel();
// 声明Exchange
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
// 声明队列
String queueName = channel.queueDeclare().getQueue();
// 绑定routing key
for (String bindingKey : args) {
channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
}
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
String routingKey = envelope.getRoutingKey();
System.out.println(" [x] Received '" + routingKey + "':'" + message + "'");
}
};
channel.basicConsume(queueName, true, consumer);
Thread.sleep(60000);
} catch (IOException | TimeoutException | InterruptedException e) {
e.printStackTrace();
} finally {
try {
RabbitMQUtil.closeChannel(channel);
RabbitMQUtil.closeConnection(connection);
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}
}9. RPC模式
RPC(Remote Procedure Call)模式用于实现远程过程调用,客户端发送请求消息,服务器处理并返回响应。
RPC服务器:
package com.example.rpc;
import com.example.util.RabbitMQUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* RPC服务器
*/
public class RPCServer {
private static final String RPC_QUEUE_NAME = "rpc_queue";
public static void main(String[] args) {
Connection connection = null;
Channel channel = null;
try {
connection = RabbitMQUtil.getConnection();
channel = connection.createChannel();
// 声明队列
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
// 设置每次只接收一条消息
channel.basicQos(1);
System.out.println(" [x] Awaiting RPC requests");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
AMQP.BasicProperties replyProps = new AMQP.BasicProperties
.Builder()
.correlationId(properties.getCorrelationId())
.build();
String response = "";
try {
String message = new String(body, "UTF-8");
int n = Integer.parseInt(message);
System.out.println(" [.] fib(" + n + ")");
response += fib(n);
} catch (Exception e) {
System.out.println(" [.] " + e.toString());
response = "";
} finally {
channel.basicPublish("", properties.getReplyTo(), replyProps,
response.getBytes("UTF-8"));
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
Thread.sleep(60000);
} catch (IOException | TimeoutException | InterruptedException e) {
e.printStackTrace();
} finally {
try {
RabbitMQUtil.closeChannel(channel);
RabbitMQUtil.closeConnection(connection);
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}
private static int fib(int n) {
if (n == 0) return 0;
if (n == 1) return 1;
return fib(n - 1) + fib(n - 2);
}
}RPC客户端:
package com.example.rpc;
import com.example.util.RabbitMQUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;
/**
* RPC客户端
*/
public class RPCClient implements AutoCloseable {
private Connection connection;
private Channel channel;
private String requestQueueName = "rpc_queue";
public RPCClient() throws IOException, TimeoutException {
connection = RabbitMQUtil.getConnection();
channel = connection.createChannel();
}
public String call(String message) throws IOException, InterruptedException {
final String corrId = UUID.randomUUID().toString();
String replyQueueName = channel.queueDeclare().getQueue();
AMQP.BasicProperties props = new AMQP.BasicProperties
.Builder()
.correlationId(corrId)
.replyTo(replyQueueName)
.build();
channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));
final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
String ctag = channel.basicConsume(replyQueueName, true, (consumerTag, delivery) -> {
if (delivery.getProperties().getCorrelationId().equals(corrId)) {
response.offer(new String(delivery.getBody(), "UTF-8"));
}
}, consumerTag -> {
});
String result = response.take();
channel.basicCancel(ctag);
return result;
}
public void close() throws IOException {
connection.close();
}
public static void main(String[] args) {
try (RPCClient fibonacciRpc = new RPCClient()) {
for (int i = 0; i < 32; i++) {
String i_str = Integer.toString(i);
System.out.println(" [x] Requesting fib(" + i_str + ")");
String response = fibonacciRpc.call(i_str);
System.out.println(" [.] Got '" + response + "'");
}
} catch (IOException | TimeoutException | InterruptedException e) {
e.printStackTrace();
}
}
}10. Spring Boot集成
<dependencies>
<!-- Spring Boot Starter -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- Spring Boot Web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>application.yml:
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
# 开启确认模式
publisher-confirm-type: correlated
publisher-returns: true
# 消费者确认
listener:
simple:
acknowledge-mode: manual
prefetch: 1package com.example.config;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
public static final String QUEUE_NAME = "spring_queue";
public static final String EXCHANGE_NAME = "spring_exchange";
public static final String ROUTING_KEY = "spring_routing";
@Bean
public Queue queue() {
return new Queue(QUEUE_NAME, true);
}
@Bean
public DirectExchange exchange() {
return new DirectExchange(EXCHANGE_NAME);
}
@Bean
public Binding binding() {
return BindingBuilder.bind(queue())
.to(exchange())
.with(ROUTING_KEY);
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(new Jackson2JsonMessageConverter());
return template;
}
}package com.example.producer;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class SpringProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void send(String message) {
rabbitTemplate.convertAndSend(
"spring_exchange",
"spring_routing",
message
);
}
}package com.example.consumer;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class SpringConsumer {
@RabbitListener(queues = "spring_queue")
public void receive(Message message, Channel channel) throws Exception {
String msg = new String(message.getBody());
System.out.println("Received: " + msg);
// 手动确认
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}11. 高级特性
// 队列持久化
boolean durable = true;
channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
// 消息持久化
channel.basicPublish("", QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes("UTF-8"));// 开启确认模式
channel.confirmSelect();
// 发送消息
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
// 等待确认
if (channel.waitForConfirms()) {
System.out.println("Message confirmed");
}// 队列级别TTL
Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 60000); // 60秒
channel.queueDeclare(QUEUE_NAME, false, false, false, args);
// 消息级别TTL
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.expiration("60000")
.build();
channel.basicPublish("", QUEUE_NAME, properties, message.getBytes("UTF-8"));// 声明死信Exchange
channel.exchangeDeclare("dlx_exchange", "direct");
// 声明死信队列
channel.queueDeclare("dlx_queue", true, false, false, null);
channel.queueBind("dlx_queue", "dlx_exchange", "dlx_routing");
// 声明普通队列,绑定死信Exchange
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "dlx_exchange");
args.put("x-dead-letter-routing-key", "dlx_routing");
channel.queueDeclare("normal_queue", true, false, false, args);使用RabbitMQ延迟插件实现延迟队列:
# 启用延迟插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange// 声明延迟Exchange
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
channel.exchangeDeclare("delayed_exchange", "x-delayed-message", true, false, args);
// 发送延迟消息
Map<String, Object> headers = new HashMap<>();
headers.put("x-delay", 5000); // 延迟5秒
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.headers(headers)
.build();
channel.basicPublish("delayed_exchange", "routing_key", properties, message.getBytes("UTF-8"));12. 最佳实践与常见问题
连接管理
- 使用连接池
- 一个应用一个连接
- 一个线程一个通道
消息确认
- 使用手动确认
- 处理完业务逻辑后再确认
- 失败时拒绝并重新入队
持久化
- 重要消息设置持久化
- 队列和消息都要持久化
性能优化
- 批量发送消息
- 使用批量确认
- 合理设置prefetch
监控和日志
- 启用管理插件
- 记录关键操作日志
- 监控队列长度
Q1: 消息丢失怎么办?
- 开启消息持久化
- 使用确认机制
- 使用事务或发布确认
Q2: 消息重复消费怎么办?
- 实现幂等性
- 使用消息ID去重
- 使用数据库唯一约束
Q3: 如何保证消息顺序?
- 单队列单消费者
- 使用优先级队列
- 业务层面保证顺序
Q4: 如何处理消息堆积?
- 增加消费者
- 使用工作队列模式
- 设置消息TTL
- 监控队列长度
Q5: 如何实现高可用?
- 使用RabbitMQ集群
- 使用镜像队列
- 使用负载均衡
13. 总结与进阶
通过本教程,你已经掌握了:
- ✅ RabbitMQ的基本概念
- ✅ 各种消息模式的使用
- ✅ Spring Boot集成
- ✅ 高级特性的应用
RabbitMQ集群
- 集群搭建
- 镜像队列
- 高可用方案
性能优化
- 消息批量处理
- 连接池优化
- 网络优化
监控和运维
- Prometheus监控
- 日志分析
- 故障排查
与其他技术集成
- 与Spring Cloud集成
- 与Kafka对比
- 微服务架构中的应用
- 官方文档:https://www.rabbitmq.com/documentation.html
- GitHub:https://github.com/rabbitmq
- Spring AMQP:https://spring.io/projects/spring-amqp
结语
RabbitMQ是一个功能强大、易于使用的消息队列中间件。通过本教程的学习,相信你已经掌握了RabbitMQ的核心功能和使用方法。
记住:
- 多实践:通过实际项目加深理解
- 理解原理:理解消息队列的工作原理
- 关注性能:注意性能优化
- 持续学习:关注新特性和最佳实践
祝你学习愉快,编程顺利! 🚀
本教程由Java突击队学习社区编写,如有问题欢迎反馈。