一、为什么要用消息队列在没有消息队列的时候服务之间的调用是直接的。比如服务 A 需要服务 B 处理一些任务A 就直接调用B然后等着B处理完返回结果。这种方式有两个明显的问题耦合度高A 和 B 绑得太紧B 出问题A 也会受影响。性能瓶颈如果 A 调用 B 很频繁B 的压力会很大A 也得一直等着整个系统响应就慢了。消息队列MQ就是为了解决这些问题出现的。它在生产者和消费者之间加了一个 “中转站”让两者解耦。生产者只管把消息发送到MQ不用关心谁来处理、什么时候处理。消费者从MQ中获取消息按自己的节奏处理处理完再告知 MQ。市面上常见的MQ有RabbitMQ、Kafka等它们大同小异。我们的目标就是理解这些 MQ 的核心原理然后自己实现一个。二、MQ 的核心概念要理解 MQ先搞清楚几个核心角色和流程生产者 (Producer)消息的发送方负责创建并发送消息。消费者 (Consumer)消息的接收方负责从 MQ 中拉取并处理消息。Broker (MQ 服务器)MQ 的核心负责接收、存储和转发消息。发布 (Publish)生产者向 Broker 发送消息的过程。订阅 (Subscribe)消费者向 Broker 表达对某类消息感兴趣的过程。消费 (Consume)消费者从 Broker 中拉取并处理消息的过程。Broker 内部还有几个关键组件虚拟主机 (Virtual Host)可以理解为一个逻辑上的隔离空间不同的虚拟主机之间资源是隔离的就像 MySQL 里的不同的数据库。交换机 (Exchange)生产者发送的消息先到交换机交换机再根据规则把消息转发到对应的队列。队列 (Queue)真正存储消息的地方消费者从队列里获取消息。绑定 (Binding)交换机和队列之间的关联关系通过绑定交换机知道消息该转发到哪个队列。一个交换机可以对应到多个队列一个队列也可以被多个交换机所对应。消息 (Message)生产者和消费者之间传递的数据载体包含消息头和消息体。三、核心 API 设计我们的 MQ 需要提供一套核心 API让生产者和消费者能和Broker交互。这些 API 参考了 RabbitMQ 的设计主要包括创建队列 (queueDeclare)声明一个队列如果队列不存在就创建存在就直接使用。销毁队列 (queueDelete)删除一个队列。创建交换机 (exchangeDeclare)声明一个交换机。销毁交换机 (exchangeDelete)删除一个交换机。创建绑定 (queueBind)建立交换机和队列之间的绑定关系。销毁绑定 (queueUnbind)解除交换机和队列之间的绑定关系。发布消息 (basicPublish)生产者发送消息到交换机。订阅消息 (basicConsume)消费者订阅队列持续从队列中拉取消息。确认消费 (basicAck)消费者处理完消息后告知 MQ 可以删除这条消息。这里需要注意对于 MQ 和消费者之间的工作模式主要有两种Push 模式Broker主动把消息推送给消费者RabbitMQ 采用。Pull 模式消费者主动从 Broker 拉取消息Kafka 采用。在我们的项目中主要实现Push模式并提供消费确认机制确保消息被正确处理。四、交换机类型MQ 里的消息 “快递员”交换机就像是 MQ 里负责送快递的小哥生产者把包裹消息交给它它再根据地址规则准确地送到对应的收件人队列手里。1. Direct 直接交换机精准快递这种交换机的工作方式最简单就像你给一个人发专属红包。生产者发送消息时会明确指定一个 “收件人名字”这个名字就是路由键。交换机收到消息后就去查自己绑定的所有队列看看哪个队列的绑定键和这个名字完全一样。如果找到了就把消息精准地投递给那个队列如果没找到这个消息就会被丢弃。举个例子你发了一个红包备注 “给张三”这个红包就只会到张三的账户里其他人谁也领不到。2. Fanout 扇出交换机群发短信这种交换机就像在群里发红包或者群发短信所有人都能收到。生产者发送消息时不需要指定任何路由键。交换机收到消息后会把这条消息原封不动地复制多份然后发送给所有和它绑定的队列。不管这些队列的绑定键是什么只要绑定了这个交换机就一定能收到消息。举个例子你在公司群发了 10 块钱红包设置成 “所有人可领”那么群里的每一个同事都能领到这 10 块钱。3. Topic 主题交换机智能匹配的 “暗号”这种交换机最灵活就像玩 “对对碰” 或者 “猜暗号” 的游戏。它有两个关键概念bindingKey绑定键队列在和交换机绑定时会说一句 “暗号”比如 “地瓜地瓜我是地雷”。routingKey路由键生产者发送消息时也会说一句 “暗号”比如 “地雷地雷我是地瓜”。交换机的工作就是判断这两句 “暗号” 能不能对上号。如果能就把消息转发给对应的队列对不上就不转发。举个例子队列 A 的暗号是 “地雷地雷我是地瓜”队列 B 的暗号是 “春眠不觉晓”队列 C 的暗号是 “今天天气真不错”当生产者发送一条暗号为 “地瓜地瓜我是地雷” 的消息时交换机发现和队列 A 的暗号能对上就把消息发给队列 A而队列 B 和 C 则收不到。这种匹配规则比简单的 “完全一样” 要复杂得多后面我们会详细说。它就像画图画领红包你发了 10 块钱说 “画个桌子圆的好方的像才能领”只有符合你描述的人才能领到这 10 块钱。这三种交换机类型正好对应了我们生活中三种常见的沟通场景一对一精准送达Direct一对多全员通知Fanout按条件智能分发Topic五、消息持久化为了防止 Broker 重启后消息丢失我们需要实现消息持久化把消息和队列、交换机等元数据保存到磁盘上。内存存储速度快但重启后数据会丢失。磁盘存储速度慢但数据可以永久保存。对于 MQ 来说消息的可靠性至关重要。因此我们需要将关键的元数据队列、交换机、绑定关系和重要的消息持久化到磁盘确保即使 Broker 重启数据也不会丢失。六、网络通信生产者和消费者客户端需要通过网络和 Broker 进行交互。我们采用 TCP 协议并自定义应用层协议来实现客户端和服务器之间的通信。Connection连接代表一个 TCP 连接是客户端和 Broker 之间通信的基础。Channel信道在一个 Connection 内部可以创建多个 Channel。每个 Channel 都是一个独立的逻辑会话用于传输数据。这样可以复用 TCP 连接减少资源消耗。客户端的 API 调用本质上是通过 Channel 向 Broker 发送请求Broker 处理后再通过 Channel 返回响应。七、应答模式为了确保消息被正确处理我们需要实现应答模式自动应答消费者拿到消息后立即告知 MQ 消息已处理。这种模式速度快但如果消费者处理消息时崩溃消息就会丢失。手动应答消费者处理完消息后主动调用basicAck方法告知 MQ。如果消费者崩溃MQ 会认为消息未被处理会将消息重新分发给其他消费者。在我们的项目中主要采用手动应答模式确保消息的可靠性。八、模块划分在开始编码前我们需要对项目进行模块划分让代码结构更清晰Broker 服务器模块负责接收客户端请求处理消息的存储和路由。客户端模块为生产者和消费者提供 API封装网络通信细节。协议模块定义客户端和 Broker 之间的通信协议。存储模块负责消息和元数据的持久化。交换机模块实现不同类型的交换机和路由逻辑。通过这样的模块划分我们可以更清晰地组织代码也方便后续的维护和扩展。结语这篇文章我们系统梳理了实现一个 Java 自定义协议消息队列所需的前置知识。我们从 “为什么需要消息队列” 这个问题出发接着拆解了 MQ 的核心角色与工作流程明确了生产者、消费者、Broker 之间的协作关系然后深入 Broker 内部认识了虚拟主机、交换机、队列、绑定和消息等关键组件之后又学习了核心 API 的设计思路、三种不同的交换机类型、消息持久化的必要性、基于 TCP 的网络通信模型以及保障消息可靠投递的应答模式最后我们对整个项目进行了清晰的模块划分为后续编码做好了结构上的准备。这些知识共同构成了我们理解和实现 MQ 的完整知识框架是从 0 到 1 构建一个消息队列系统的坚实基础。
从零开始实现一个 Java 消息队列:项目前置知识全解析
一、为什么要用消息队列在没有消息队列的时候服务之间的调用是直接的。比如服务 A 需要服务 B 处理一些任务A 就直接调用B然后等着B处理完返回结果。这种方式有两个明显的问题耦合度高A 和 B 绑得太紧B 出问题A 也会受影响。性能瓶颈如果 A 调用 B 很频繁B 的压力会很大A 也得一直等着整个系统响应就慢了。消息队列MQ就是为了解决这些问题出现的。它在生产者和消费者之间加了一个 “中转站”让两者解耦。生产者只管把消息发送到MQ不用关心谁来处理、什么时候处理。消费者从MQ中获取消息按自己的节奏处理处理完再告知 MQ。市面上常见的MQ有RabbitMQ、Kafka等它们大同小异。我们的目标就是理解这些 MQ 的核心原理然后自己实现一个。二、MQ 的核心概念要理解 MQ先搞清楚几个核心角色和流程生产者 (Producer)消息的发送方负责创建并发送消息。消费者 (Consumer)消息的接收方负责从 MQ 中拉取并处理消息。Broker (MQ 服务器)MQ 的核心负责接收、存储和转发消息。发布 (Publish)生产者向 Broker 发送消息的过程。订阅 (Subscribe)消费者向 Broker 表达对某类消息感兴趣的过程。消费 (Consume)消费者从 Broker 中拉取并处理消息的过程。Broker 内部还有几个关键组件虚拟主机 (Virtual Host)可以理解为一个逻辑上的隔离空间不同的虚拟主机之间资源是隔离的就像 MySQL 里的不同的数据库。交换机 (Exchange)生产者发送的消息先到交换机交换机再根据规则把消息转发到对应的队列。队列 (Queue)真正存储消息的地方消费者从队列里获取消息。绑定 (Binding)交换机和队列之间的关联关系通过绑定交换机知道消息该转发到哪个队列。一个交换机可以对应到多个队列一个队列也可以被多个交换机所对应。消息 (Message)生产者和消费者之间传递的数据载体包含消息头和消息体。三、核心 API 设计我们的 MQ 需要提供一套核心 API让生产者和消费者能和Broker交互。这些 API 参考了 RabbitMQ 的设计主要包括创建队列 (queueDeclare)声明一个队列如果队列不存在就创建存在就直接使用。销毁队列 (queueDelete)删除一个队列。创建交换机 (exchangeDeclare)声明一个交换机。销毁交换机 (exchangeDelete)删除一个交换机。创建绑定 (queueBind)建立交换机和队列之间的绑定关系。销毁绑定 (queueUnbind)解除交换机和队列之间的绑定关系。发布消息 (basicPublish)生产者发送消息到交换机。订阅消息 (basicConsume)消费者订阅队列持续从队列中拉取消息。确认消费 (basicAck)消费者处理完消息后告知 MQ 可以删除这条消息。这里需要注意对于 MQ 和消费者之间的工作模式主要有两种Push 模式Broker主动把消息推送给消费者RabbitMQ 采用。Pull 模式消费者主动从 Broker 拉取消息Kafka 采用。在我们的项目中主要实现Push模式并提供消费确认机制确保消息被正确处理。四、交换机类型MQ 里的消息 “快递员”交换机就像是 MQ 里负责送快递的小哥生产者把包裹消息交给它它再根据地址规则准确地送到对应的收件人队列手里。1. Direct 直接交换机精准快递这种交换机的工作方式最简单就像你给一个人发专属红包。生产者发送消息时会明确指定一个 “收件人名字”这个名字就是路由键。交换机收到消息后就去查自己绑定的所有队列看看哪个队列的绑定键和这个名字完全一样。如果找到了就把消息精准地投递给那个队列如果没找到这个消息就会被丢弃。举个例子你发了一个红包备注 “给张三”这个红包就只会到张三的账户里其他人谁也领不到。2. Fanout 扇出交换机群发短信这种交换机就像在群里发红包或者群发短信所有人都能收到。生产者发送消息时不需要指定任何路由键。交换机收到消息后会把这条消息原封不动地复制多份然后发送给所有和它绑定的队列。不管这些队列的绑定键是什么只要绑定了这个交换机就一定能收到消息。举个例子你在公司群发了 10 块钱红包设置成 “所有人可领”那么群里的每一个同事都能领到这 10 块钱。3. Topic 主题交换机智能匹配的 “暗号”这种交换机最灵活就像玩 “对对碰” 或者 “猜暗号” 的游戏。它有两个关键概念bindingKey绑定键队列在和交换机绑定时会说一句 “暗号”比如 “地瓜地瓜我是地雷”。routingKey路由键生产者发送消息时也会说一句 “暗号”比如 “地雷地雷我是地瓜”。交换机的工作就是判断这两句 “暗号” 能不能对上号。如果能就把消息转发给对应的队列对不上就不转发。举个例子队列 A 的暗号是 “地雷地雷我是地瓜”队列 B 的暗号是 “春眠不觉晓”队列 C 的暗号是 “今天天气真不错”当生产者发送一条暗号为 “地瓜地瓜我是地雷” 的消息时交换机发现和队列 A 的暗号能对上就把消息发给队列 A而队列 B 和 C 则收不到。这种匹配规则比简单的 “完全一样” 要复杂得多后面我们会详细说。它就像画图画领红包你发了 10 块钱说 “画个桌子圆的好方的像才能领”只有符合你描述的人才能领到这 10 块钱。这三种交换机类型正好对应了我们生活中三种常见的沟通场景一对一精准送达Direct一对多全员通知Fanout按条件智能分发Topic五、消息持久化为了防止 Broker 重启后消息丢失我们需要实现消息持久化把消息和队列、交换机等元数据保存到磁盘上。内存存储速度快但重启后数据会丢失。磁盘存储速度慢但数据可以永久保存。对于 MQ 来说消息的可靠性至关重要。因此我们需要将关键的元数据队列、交换机、绑定关系和重要的消息持久化到磁盘确保即使 Broker 重启数据也不会丢失。六、网络通信生产者和消费者客户端需要通过网络和 Broker 进行交互。我们采用 TCP 协议并自定义应用层协议来实现客户端和服务器之间的通信。Connection连接代表一个 TCP 连接是客户端和 Broker 之间通信的基础。Channel信道在一个 Connection 内部可以创建多个 Channel。每个 Channel 都是一个独立的逻辑会话用于传输数据。这样可以复用 TCP 连接减少资源消耗。客户端的 API 调用本质上是通过 Channel 向 Broker 发送请求Broker 处理后再通过 Channel 返回响应。七、应答模式为了确保消息被正确处理我们需要实现应答模式自动应答消费者拿到消息后立即告知 MQ 消息已处理。这种模式速度快但如果消费者处理消息时崩溃消息就会丢失。手动应答消费者处理完消息后主动调用basicAck方法告知 MQ。如果消费者崩溃MQ 会认为消息未被处理会将消息重新分发给其他消费者。在我们的项目中主要采用手动应答模式确保消息的可靠性。八、模块划分在开始编码前我们需要对项目进行模块划分让代码结构更清晰Broker 服务器模块负责接收客户端请求处理消息的存储和路由。客户端模块为生产者和消费者提供 API封装网络通信细节。协议模块定义客户端和 Broker 之间的通信协议。存储模块负责消息和元数据的持久化。交换机模块实现不同类型的交换机和路由逻辑。通过这样的模块划分我们可以更清晰地组织代码也方便后续的维护和扩展。结语这篇文章我们系统梳理了实现一个 Java 自定义协议消息队列所需的前置知识。我们从 “为什么需要消息队列” 这个问题出发接着拆解了 MQ 的核心角色与工作流程明确了生产者、消费者、Broker 之间的协作关系然后深入 Broker 内部认识了虚拟主机、交换机、队列、绑定和消息等关键组件之后又学习了核心 API 的设计思路、三种不同的交换机类型、消息持久化的必要性、基于 TCP 的网络通信模型以及保障消息可靠投递的应答模式最后我们对整个项目进行了清晰的模块划分为后续编码做好了结构上的准备。这些知识共同构成了我们理解和实现 MQ 的完整知识框架是从 0 到 1 构建一个消息队列系统的坚实基础。