我们可以把 RabbitMQ 的运作看作是一个极其智能的物流分拣中心。1. 核心四大角色 ️生产者 (Producer)消息的发送方。它只负责把消息发给交换机并贴上一个“路由标签”Routing Key。交换机 (Exchange)分拣中心的核心。它接收生产者的消息并根据规则决定分发到哪个仓库队列。队列 (Queue)消息的缓冲区。消息在这里排队直到被消费者取走。一个队列可以存放成千上万条消息。消费者 (Consumer)消息的接收方。它连接到队列取走消息并执行具体的业务逻辑。基础简单的“生产者”与“消费者” 这是最基础的模式生产者将消息发送到队列消费者从中取出。生产者 (send.py):import pika # 1. 建立连接 connection pika.BlockingConnection(pika.ConnectionParameters(localhost)) channel connection.channel() # 2. 声明队列确保队列存在 channel.queue_declare(queuehello) # 3. 发送消息 # exchange 表示使用默认交换机它会将消息直接路由到 routing_key 指定的队列 channel.basic_publish(exchange, routing_keyhello, bodyHello RabbitMQ!) print( [x] 已发送 Hello RabbitMQ!) connection.close()消费者 (receive.py):import pika connection pika.BlockingConnection(pika.ConnectionParameters(localhost)) channel connection.channel() # 声明队列防止消费者先启动导致找不到队列 channel.queue_declare(queuehello) # 定义回调函数当收到消息时执行的操作 def callback(ch, method, properties, body): print(f [x] 收到消息: {body.decode()}) # 告诉 RabbitMQ 使用 callback 来接收消息 channel.basic_consume(queuehello, on_message_callbackcallback, auto_ackTrue) print( [*] 等待消息中... 按 CTRLC 退出) channel.start_consuming()2. 它们是如何关联的(Binding Routing Key) 要让交换机知道把消息发给哪个队列需要两个关键设置绑定 (Binding)这是交换机和队列之间的“连接关系”。你可以理解为队列向交换机“订阅”了某种类型的消息。路由键 (Routing Key)生产者发消息时带上的“标签”。交换机会对比消息的标签和绑定的规则匹配上了才投递。3. 四大交换机模式决定如何投递 ️这是 RabbitMQ 最灵活的地方决定了消息的流向Direct (直连模式)精确匹配。路由键必须完全一致。例如消息标签是error只有绑定了error的队列能收到。Fanout (广播模式)无视标签。交换机会把消息复制一份发给所有绑定到它的队列。适合下单后同时通知库存、短信、积分系统。Topic (主题模式)模糊匹配。支持通配符如*.orange.*或user.#。适合按类别订阅消息比如“所有关于用户的日志”。Headers (头模式)️不根据标签而是根据消息头里的属性进行匹配较少使用。进阶使用交换机 (Exchange) 实现广播模式 在“广播模式”Fanout下生产者不把消息发给特定队列而是发给交换机由交换机发给所有绑定的队列。生产者 (emit_log.py):Python# ... 建立连接代码同上 ... # 声明一个名为 logs 的 fanout 交换机 channel.exchange_declare(exchangelogs, exchange_typefanout) channel.basic_publish(exchangelogs, routing_key, # 广播模式下不需要指定具体队列名 body系统日志这是一条广播消息)消费者 (receive_logs.py):# ... 建立连接和声明交换机代码同上 ... # 1. 创建一个随机命名的临时队列消费者断开后自动删除 result channel.queue_declare(queue, exclusiveTrue) queue_name result.method.queue # 2. 将队列绑定到交换机上这就是“订阅” channel.queue_bind(exchangelogs, queuequeue_name) # ... 之后的部分同基础消费者 ...精准打击主题模式 (Topic) 主题模式允许我们使用通配符如*匹配一个单词#匹配多个单词进行灵活路由。生产者:# 声明 topic 类型的交换机 channel.exchange_declare(exchangetopic_logs, exchange_typetopic) # 发送一条带标签的消息 routing_key usd.stock.nyse channel.basic_publish(exchangetopic_logs, routing_keyrouting_key, body美股数据更新)消费者只订阅股票数据:# 绑定键使用通配符匹配所有以 .stock. 结尾的中间部分 channel.queue_bind(exchangetopic_logs, queuequeue_name, routing_key*.stock.#)4. 核心优势为什么要用它 ️异步处理 (Asynchrony)⏳用户注册后主程序直接返回成功发邮件等耗时操作交给 RabbitMQ 慢慢处理极大提升用户体验。应用解耦 (Decoupling)订单系统不需要知道库存系统长什么样只要把消息发给 RabbitMQ库存系统自己去领任务即可。削峰填谷 (Peak Shaving)秒杀活动流量暴增时消息先堆积在队列里后端服务器按自己的节奏慢慢处理防止系统被瞬间冲垮。5. 消息的安全性不丢失的保障 持久化 (Persistence)把消息存入硬盘即使 RabbitMQ 重启消息也不会丢失。确认机制 (Ack)消费者处理完消息后回复一个ACK。如果消费者在处理时挂了没有回复RabbitMQ 会把这条消息重新发给别人确保一定被处理。
RabbitMQ基础知识
我们可以把 RabbitMQ 的运作看作是一个极其智能的物流分拣中心。1. 核心四大角色 ️生产者 (Producer)消息的发送方。它只负责把消息发给交换机并贴上一个“路由标签”Routing Key。交换机 (Exchange)分拣中心的核心。它接收生产者的消息并根据规则决定分发到哪个仓库队列。队列 (Queue)消息的缓冲区。消息在这里排队直到被消费者取走。一个队列可以存放成千上万条消息。消费者 (Consumer)消息的接收方。它连接到队列取走消息并执行具体的业务逻辑。基础简单的“生产者”与“消费者” 这是最基础的模式生产者将消息发送到队列消费者从中取出。生产者 (send.py):import pika # 1. 建立连接 connection pika.BlockingConnection(pika.ConnectionParameters(localhost)) channel connection.channel() # 2. 声明队列确保队列存在 channel.queue_declare(queuehello) # 3. 发送消息 # exchange 表示使用默认交换机它会将消息直接路由到 routing_key 指定的队列 channel.basic_publish(exchange, routing_keyhello, bodyHello RabbitMQ!) print( [x] 已发送 Hello RabbitMQ!) connection.close()消费者 (receive.py):import pika connection pika.BlockingConnection(pika.ConnectionParameters(localhost)) channel connection.channel() # 声明队列防止消费者先启动导致找不到队列 channel.queue_declare(queuehello) # 定义回调函数当收到消息时执行的操作 def callback(ch, method, properties, body): print(f [x] 收到消息: {body.decode()}) # 告诉 RabbitMQ 使用 callback 来接收消息 channel.basic_consume(queuehello, on_message_callbackcallback, auto_ackTrue) print( [*] 等待消息中... 按 CTRLC 退出) channel.start_consuming()2. 它们是如何关联的(Binding Routing Key) 要让交换机知道把消息发给哪个队列需要两个关键设置绑定 (Binding)这是交换机和队列之间的“连接关系”。你可以理解为队列向交换机“订阅”了某种类型的消息。路由键 (Routing Key)生产者发消息时带上的“标签”。交换机会对比消息的标签和绑定的规则匹配上了才投递。3. 四大交换机模式决定如何投递 ️这是 RabbitMQ 最灵活的地方决定了消息的流向Direct (直连模式)精确匹配。路由键必须完全一致。例如消息标签是error只有绑定了error的队列能收到。Fanout (广播模式)无视标签。交换机会把消息复制一份发给所有绑定到它的队列。适合下单后同时通知库存、短信、积分系统。Topic (主题模式)模糊匹配。支持通配符如*.orange.*或user.#。适合按类别订阅消息比如“所有关于用户的日志”。Headers (头模式)️不根据标签而是根据消息头里的属性进行匹配较少使用。进阶使用交换机 (Exchange) 实现广播模式 在“广播模式”Fanout下生产者不把消息发给特定队列而是发给交换机由交换机发给所有绑定的队列。生产者 (emit_log.py):Python# ... 建立连接代码同上 ... # 声明一个名为 logs 的 fanout 交换机 channel.exchange_declare(exchangelogs, exchange_typefanout) channel.basic_publish(exchangelogs, routing_key, # 广播模式下不需要指定具体队列名 body系统日志这是一条广播消息)消费者 (receive_logs.py):# ... 建立连接和声明交换机代码同上 ... # 1. 创建一个随机命名的临时队列消费者断开后自动删除 result channel.queue_declare(queue, exclusiveTrue) queue_name result.method.queue # 2. 将队列绑定到交换机上这就是“订阅” channel.queue_bind(exchangelogs, queuequeue_name) # ... 之后的部分同基础消费者 ...精准打击主题模式 (Topic) 主题模式允许我们使用通配符如*匹配一个单词#匹配多个单词进行灵活路由。生产者:# 声明 topic 类型的交换机 channel.exchange_declare(exchangetopic_logs, exchange_typetopic) # 发送一条带标签的消息 routing_key usd.stock.nyse channel.basic_publish(exchangetopic_logs, routing_keyrouting_key, body美股数据更新)消费者只订阅股票数据:# 绑定键使用通配符匹配所有以 .stock. 结尾的中间部分 channel.queue_bind(exchangetopic_logs, queuequeue_name, routing_key*.stock.#)4. 核心优势为什么要用它 ️异步处理 (Asynchrony)⏳用户注册后主程序直接返回成功发邮件等耗时操作交给 RabbitMQ 慢慢处理极大提升用户体验。应用解耦 (Decoupling)订单系统不需要知道库存系统长什么样只要把消息发给 RabbitMQ库存系统自己去领任务即可。削峰填谷 (Peak Shaving)秒杀活动流量暴增时消息先堆积在队列里后端服务器按自己的节奏慢慢处理防止系统被瞬间冲垮。5. 消息的安全性不丢失的保障 持久化 (Persistence)把消息存入硬盘即使 RabbitMQ 重启消息也不会丢失。确认机制 (Ack)消费者处理完消息后回复一个ACK。如果消费者在处理时挂了没有回复RabbitMQ 会把这条消息重新发给别人确保一定被处理。