第5篇_PUBLISH不是收到就转发_Broker怎么处理QoS_PacketId和多客户端fanout

第5篇_PUBLISH不是收到就转发_Broker怎么处理QoS_PacketId和多客户端fanout PUBLISH 看起来最简单收到消息找到订阅者转发出去。但 Broker 真正掉线、重复、乱序、延迟很多都藏在这里。先给结论Broker 不能直接把发布者的 PacketId 原封不动转发给订阅者。PacketId 是一条客户端连接内的事务编号Broker fanout 时必须为每个目标连接重新分配。当前讨论的 fanout 仍然是 PLC 侧轻量 Broker 的小规模路由不追求通用 Broker 的海量订阅索引和跨节点分发。一、一条 PUBLISH 进入 Broker 后发生了什么注意最后一行发布者的PUBACK packetId10是对发布者连接的确认。订阅者收到的 PUBLISH 会使用订阅者连接自己的 PacketId。二、PacketId 的作用域假设发布者 A 发来PUBLISH PacketId 10Broker 要转发给 B 和 C连接PacketId 作用A - BrokerA 这条连接的入站事务Broker - BB 这条连接的出站事务Broker - CC 这条连接的出站事务这三者不能混用。如果 Broker 直接把10转给 B 和 C后果可能是和 B/C 当前未完成事务冲突。重发时无法判断 ACK 属于哪条投递。QoS2 四步握手状态错乱。客户端认为收到重复或非法报文。三、QoS0 / QoS1 / QoS2 在 Broker 侧的差异QoS发布者到 BrokerBroker 到订阅者事务要求QoS0收到即处理直接投递不需要 ACKQoS1收到后回 PUBACK目标连接等待 PUBACK至少一次可能重发QoS2PUBREC / PUBREL / PUBCOMP目标连接独立 QoS2 闭环恰好一次语义状态最多Broker 里必须拆成两套事务事务方向作用RxInflight处理发布者发来的 QoS1/QoS2TxInflight处理 Broker 转发给订阅者的 QoS1/QoS2这不是为了显得复杂而是 PacketId 作用域决定的。四、fanout 不能让慢客户端拖垮其他客户端一个 PUBLISH 可能命中多个订阅者。如果某个客户端很慢Broker 不能阻塞整个路由链。当前设计是每个客户端有自己的投递队列。队列作用协议响应队列CONNACK、SUBACK、PUBACK、PINGRESP 等优先发送普通投递队列PUBLISH fanout 后进入目标客户端QoS 事务表记录等待 ACK 的出站消息协议响应必须优先因为客户端协议状态机在等它。五、QoS2 不可怕可怕的是状态没闭环QoS2 的链路是Broker 转发给订阅者时又是另一条 QoS2 链这两条链不能混在一起。否则测试 QoS2 时就会出现客户端显示发送成功但订阅端不收。Broker 重复发同一条。客户端过一会儿断开。PacketId 表看起来“莫名其妙占满”。六、ST 代码入口代码入口作用FB_MqttBroker.M_HandlePublish处理连接槽位输出的发布消息FB_MqttBroker.M_RoutePublishNow根据路由表 fanout 到订阅者FB_MqttBrokerConnection.M_EnqueueDelivery写入目标连接投递队列FB_MqttBrokerTxScheduler.M_RegisterPublish注册出站 QoS 事务FB_MqttBrokerConnection.M_AssignPacketId为目标连接分配 PacketIdFB_MqttBrokerCodec.M_BuildPublish构造发给订阅者的 PUBLISH关键逻辑可以压缩成// 发布者 PacketId 只用于发布者连接的 ACK。 uiSourcePacketId : stPublishIn.uiPacketId; // fanout 到订阅者时必须按目标连接重新分配 PacketId。 uiTargetPacketId : aConnections[uiTargetSlot].M_AssignPacketId(); aConnections[uiTargetSlot].M_EnqueueDelivery( sTopicName : stPublishIn.sTopicName, pPayload : stPublishIn.pPayload, uiPayloadLen : stPublishIn.uiPayloadLen, eQoS : eTargetQoS, uiPacketId : uiTargetPacketId);七、现场排障表现象优先检查可能原因QoS1 重复消息很多TxInflight 是否正确收 PUBACK重发事务没闭环QoS2 一发就断PUBREC/PUBREL/PUBCOMP 状态入站和出站事务混用发布者成功但订阅者收不到投递队列水位路由命中但目标队列满一个慢客户端影响全部各 Slot 队列是否独立fanout 同步阻塞PacketId 冲突目标连接 PacketId 分配直接沿用发布者 PacketId模型边界与验证路径PUBLISH fanout 本质上是状态作用域问题。表面上看Broker 只是把一条消息转给多个订阅者。往上看一层它其实要维护三类边界发布者连接边界、订阅者连接边界、QoS 事务边界。结论可信度依据验证路径PacketId 是连接作用域highMQTT QoS 事务语义用两个订阅者同时接收 QoS1观察各自 PacketId入站 QoS 和出站 QoS 事务要分开highBroker 既是接收方又是发送方分别观察RxInflight和TxInflight慢客户端应只影响自身队列medium当前槽位队列隔离模型故意让一个客户端低速接收观察另一个客户端延迟这里最容易写错的不是报文格式而是把“消息 ID”想成全局概念。MQTT 的 PacketId 不是全局消息编号它只在当前客户端连接里有意义。这个模型边界没想清楚QoS1 / QoS2 后面基本都会出问题。八、这一篇你最该记住的 6 句话PUBLISH 不是收到就转发Broker 还要做路由、队列和 QoS 事务。PacketId 是连接作用域不是全局消息 ID。Broker 转发给订阅者时必须按目标连接重新分配 PacketId。QoS1 / QoS2 要拆入站事务和出站事务。协议响应队列应该优先于普通 PUBLISH 投递队列。慢客户端必须被隔离在自己的队列里不能拖垮整个 Broker。下篇预告下一篇讲 Broker 不能只会转发 PUBLISH 的三个能力Retain、Will、KeepAlive。这三个功能看起来像补充项但工业现场非常常用。完整 ST 代码本篇涉及的完整代码入口MqttBroker/Device/Application/POUs/FBs/FB_MqttBroker.M_HandlePublish.stMqttBroker/Device/Application/POUs/FBs/FB_MqttBroker.M_RoutePublishNow.stMqttBroker/Device/Application/POUs/FBs/FB_MqttBrokerConnection.M_EnqueueDelivery.stMqttBroker/Device/Application/POUs/FBs/FB_MqttBrokerTxScheduler.M_RegisterPublish.stMqttBroker/Device/Application/POUs/FBs/FB_MqttBrokerConnection.M_AssignPacketId.st系列导航系列定位第 5 篇上一篇SUBSCRIBE 不是存个字符串Broker 怎么维护订阅表、通配符和多客户端路由下一篇Retain、Will、KeepAlive工业现场为什么不能只会转发 PUBLISH项目与资料开源项目名称MqttBroker前置系列MqttClient_V2_0核心关键词PUBLISH、fanout、QoS、PacketId、投递队列适合谁收藏正在实现 Broker 消息转发的人QoS1 / QoS2 经常重复、超时、断线的人想理解 PacketId 作用域的人想把 Broker 写成可维护状态机的人