RabbitMQ 从入门到实战:详解安装、五种消息模型及持久化

RabbitMQ 从入门到实战:详解安装、五种消息模型及持久化 一、RabbitMQ 核心介绍1. 什么是 MQMQ (Message Queue消息队列)​ 是一种消息中间件主要用于在微服务或分布式系统之间进行异步通信。核心作用通过“发送者 - 队列 - 消费者”的模式实现服务间的解耦。2. 什么是 AMQPAMQP (Advanced Message Queuing Protocol)​ 即高级消息队列协议。RabbitMQ 是基于此协议实现的。在 RabbitMQ 中主要包含以下几个核心概念Connection连接类似于一条高速公路如京港澳高速。Channel信道建立在 Connection 之上的虚拟连接类似于公路上的车道。大部分 API 操作都在 Channel 中进行它是复用的减少了建立 TCP 连接的开销。Queue队列存储消息的容器消息最终在这里等待被消费。Exchange交换机负责接收生产者发送的消息并根据路由规则将消息分发到一个或多个队列中。3. 什么是 RabbitMQRabbitMQ 是使用Erlang 语言编写的一款开源消息代理软件它实现了 AMQP 协议具有高性能、高可用性和易用性等特点。4. 为什么要使用 MQ引入 MQ 主要为了解决以下问题解耦服务 A 原本需要调用 B 和 C。如果新增 D 服务只需让 D 监听队列即可无需修改 A 的代码。异步服务 A 将消息发送到 MQ 后即可返回B 和 C 服务异步消费消息进行处理提升了响应速度。削峰在高并发场景下请求先进入 MQ后端服务根据自己的处理能力慢慢拉取消息防止系统被瞬时流量冲垮。5. Spring Boot 整合在 Spring Boot 项目中通常使用起步依赖dependency groupIdorg.springframework.boot/groupId artifactIdspring-boot-starter-amqp/artifactId /dependency二、RabbitMQ 安装与配置 (Linux/CentOS)1. 安装 Erlang 环境RabbitMQ 依赖 Erlang需先安装cd /usr/upload rpm -ivh esl-erlang-17.3-1.x86_64.rpm --force --nodeps rpm -ivh esl-erlang_17.3-1~centos~6_amd64.rpm --force --nodeps rpm -ivh esl-erlang-compat-R14B-1.el6.noarch.rpm --force --nodeps2. 安装 RabbitMQ Serverrpm -ivh rabbitmq-server-3.4.1-1.noarch.rpm3. 服务管理# 启动服务 service rabbitmq-server start # 查看状态 service rabbitmq-server status # 设置开机自启 chkconfig rabbitmq-server on # 停止服务 service rabbitmq-server stop # 重启服务 service rabbitmq-server restart4. 启用后台管理界面RabbitMQ 提供了一个 Web 管理界面非常方便rabbitmq-plugins enable rabbitmq_management service rabbitmq-server restart5. 用户管理默认的guest用户通常只允许本地访问建议创建新管理员# 创建用户 admin密码 1111 rabbitmqctl add_user admin 1111 # 设置用户角色为管理员 rabbitmqctl set_user_tags admin administrator # 设置权限 (允许访问默认vhost /并赋予所有权限) rabbitmqctl set_permissions -p / admin .* .* .* # 查看用户列表 rabbitmqctl list_users6. 访问测试浏览器访问http://你的服务器IP:15672使用刚创建的admin/1111登录。三、五种核心消息模型1. Simple 简单模型结构Producer - Queue - Consumer这是最简单的模型一对一发送。关键问题如何保证消息被成功消费答案使用手动 ACK (Acknowledge)。默认是自动ACK即消息一旦被消费端收到就认为成功。如果消费过程中报错消息就丢了。手动ACK代码示例// 第二个参数 autoAck 设为 false channel.basicConsume(queueName, false, new DefaultConsumer(channel) { Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { try { // 业务逻辑处理 System.out.println(Received: new String(body)); // 手动确认参数1是消息标签参数2是是否批量确认 channel.basicAck(envelope.getDeliveryTag(), false); } catch (Exception e) { // 异常处理可以选择拒绝或重回队列 } } });2. Work 工作模型结构Producer - Queue - 多个 Consumer多个消费者竞争消费同一个队列中的消息。关键问题如何防止消息堆积答案采用能者多劳 (Prefetch Count)​ 策略。默认情况下RabbitMQ 会按顺序将消息平均分给消费者轮询。如果某个消费者处理慢会导致堆积。解决方案设置channel.basicQos(1)。意思是在我没有确认当前消息处理完之前不要再给我发新的消息。这样处理快的人就会多干点活。3. Fanout 广播模型结构Producer - Exchange - (绑定) - 多个 Queue - 多个 Consumer特点不处理路由键只需要将队列绑定到交换机上。发送到交换机的消息会被转发到所有与该交换机绑定的队列上。注意如果 Exchange 没有绑定任何 Queue消息会被直接丢弃。4. Direct 定向模型结构Producer - Exchange (RoutingKey) - Queue - Consumer特点通过Routing Key (路由键)​ 来控制消息的分发。队列在绑定交换机时需要指定一个 Binding Key。只有当消息的 Routing Key 与 Binding Key 完全匹配时消息才会被路由到该队列。5. Topic 通配符模型结构Producer - Exchange (Pattern) - Queue - Consumer特点Topic 是 Direct 的扩展支持通配符匹配。通配符规则*匹配一个单词用.分隔。#匹配零个或多个单词。示例Routing Key 为user.news可以匹配*.news或user.#。四、持久化机制数据安全为了保证 RabbitMQ 重启后消息不丢失我们需要做三层持久化1. Exchange 持久化在声明交换机时将durable参数设置为true。// 第三个参数为 durable (是否持久化) channel.exchangeDeclare(EXCHANGE_NAME, direct, true);2. Queue 持久化在声明队列时同样设置持久化。// 第二个参数为 durable channel.queueDeclare(QUEUE_NAME, true, false, false, null);3. 消息持久化在发送消息时设置消息属性为持久化。// 使用 MessageProperties.PERSISTENT_TEXT_PLAIN 表示持久化 channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());总结保证消息不丢的终极方案 手动ACK应对消费失败​ 持久化应对MQ宕机。 发布建议这篇文章结构已经很清晰了。如果你打算发布建议在开头加一段“前言”简单说说你为什么学 RabbitMQ或者在结尾加个“总结”这样更容易获得读者的收藏和点赞哦