本文为 WIZnet W55RP20 芯片 MicroPython教程第 12 篇基于官方最新固件编写代码均经过实际验证可直接烧录运行。版权声明本文为 WIZnet 官方原创技术文章转载请注明出处。前言上一篇实战教程我们已经完成了W55RP20芯片HTTP协议与OneNET平台数据上云。本篇内容我们聚焦物联网核心通信协议-MQTT.MQTT是物联网最主流的轻量级通信协议采用发布/订阅(Pub/Sub)模式带宽占用小、功耗低、支持断线重连非常适合嵌入式以太网设备上云与设备间通信。W55RP20集成硬件 TCP/IP 协议栈搭配MicroPython的umqtt库几行代码即可实现稳定MQTT通信无需关心底层Socket细节。学完本文你将掌握:MQTT协议核心原理与发布/订阅机制W55RP20连接本地/公共MQTT服务器实现消息发布、订阅、回调处理心跳保活、断线重连等工业级稳定机制本地MQTT测试环境搭建传统嵌入式MQTT开发的3大痛点W55RP20一键解决1. 协议栈调试复杂软件TCP/IP协议栈需手动移植、调试占用MCU资源易出现卡顿、内存溢出W55RP20内置硬件TCP/IP协议栈网络层处理完全由硬件完成不占用CPU资源无需调试协议栈。2. 硬件接线繁琐外接网络模块需手动焊接、调试接口时序新手易出错W55RP20板载以太网接口无需额外外接模块即插即用大幅简化接线操作。3. 入门门槛高MQTT客户端连接、心跳保活等细节需手动编写调试周期长本文提供可直接运行的完整代码注释清晰配合10分钟分步操作新手也能快速出效果依托MicroPython免编译特性代码修改即时生效、调试实时反馈快速获得成就感。本节课核心目标· 完成W55RP20与电脑的硬件连接、开发环境配置· 理解MQTT协议核心原理与极简工作流程· 烧录MQTT客户端代码实现与公共MQTT服务器的连接· 完成主题订阅、消息发布与回环通信测试· 掌握常见故障的快速排查方法系列教程学习路径本专栏共 16 篇循序渐进覆盖 W55RP20-EVB-Pico 模块 C语言开发全流程第 1 篇静态 IP 配置与网络基础第 2 篇DHCP 自动联网与网络诊断第 3 篇TCP Client 客户端通信第 4 篇TCP Server 服务端通信第 5 篇UDP 单播数据通信第 6 篇UDP 组播/广播数据通信第 7 篇DNS 域名解析第 8 篇NTP 从网络获取时间第 9 篇HTTP Client 客户端请求第 10 篇HTTP Server 服务端搭建第 11 篇HTTP 协议与 OneNET 平台数据上云第 12 篇MQTT 协议基础通信验证本文第 13 篇MQTT 协议与阿里云平台对接第 14 篇MQTT 协议与 OneNET 平台对接第 15 篇MQTT 协议与 ThingSpeak 平台对接第 16 篇Modbus 工业协议通信建议收藏本专栏跟随教程逐步学习所有代码均会同步更新至官方 Gitee 仓库目录1. 准备工作1.1 软件准备1.2 硬件准备2. 烧录 W55RP20-EVB-Pico 模块专属 MicroPython 固件3. 硬件连接与开发环境配置3.1 硬件连接3.1.1 基础连接供电调试3.1.2 以太网连接3.1.3 模块与开发板接线3.2 Thonny 开发环境配置4. MQTT协议极简解析5. 核心代码复制与烧录5.1 依赖库说明5.2 完整代码以及烧录历程6. 运行结果与MQTT通信验证6.1 串口输出结果6.2 MQTT回环通信验证7. 常见问题一站式排查8. W55RP20 核心优势对比9. 典型应用场景10. 系列预告与资源获取10.1系列预告10.2 资源获取1. 准备工作1.1 软件准备所需软件均为免费版本按要求下载安装即可无需额外付费。软件名称版本要求下载地址说明Thonny4.0 及以上Thonny 官方下载轻量级 MicroPython IDE支持代码编辑、烧录与串口调试新手友好W55RP20-EVB-Pico 模块 MicroPython 固件最新稳定版WIZnet 官方固件下载专为 W55RP20-EVB-Pico 模块 编写已集成 WIZnet 硬件驱动与协议栈1.2 硬件准备W55RP20-EVB-Pico × 1Micro USB 数据线必须支持数据传输不能使用纯充电线× 1标准网线 × 1开启 DHCP 功能的路由器 / 交换机 × 1用于获取网络参数实现 DNS 解析W55RP20-EVB-Pico 模块已集成以太网相关器件无需额外焊接飞线配合 RP2040 开发板可快速搭建开发环境大幅降低接线错误和硬件故障概率。2. 烧录 W55RP20-EVB-Pico 模块专属 MicroPython 固件W55RP20-EVB-Pico 模块 完全兼容树莓派 Pico 的 UF2 固件烧录方式操作简单无需额外烧录器新手可快速上手按住 RP2040 开发板上的 BOOTSEL 按键不放使用 Micro USB 数据线连接开发板与电脑待电脑识别出名为 RPI-RP2 的 U 盘后松开 BOOTSEL 按键将下载好的 W5500_RP2040_firmware.uf2 固件文件拖拽到 U 盘中开发板会自动重启固件烧录完成。注意如果电脑没有识别出 RPI-RP2 U 盘请尝试更换 USB 数据线、重新插拔开发板或更换电脑 USB 接口优先使用 USB 2.0 接口。3. 硬件连接与开发环境配置3.1 硬件连接W55RP20-EVB-Pico 模块连接分为两步分别实现供电/调试和以太网连接操作简单无需复杂接线3.1.1 基础连接供电调试使用 Micro USB 数据线连接 RP2040 开发板与电脑用于开发板供电、代码烧录和串口调试。3.1.2 以太网连接使用网线连接 W55RP20-EVB-Pico 模块的以太网接口与路由器的 LAN 口或直接连接电脑网口需手动配置电脑 IP 与开发板同网段。3.1.3 模块与开发板接线若使用分离式模块与开发板需按以下引脚对应连接SPI 通信【硬件预留】此处插入硬件连接示意图3.2 Thonny 开发环境配置打开 Thonny 软件按以下步骤配置开发环境确保代码能正常烧录和运行点击顶部菜单栏「运行」→「配置解释器」切换到「解释器」选项卡在「解释器」下拉列表中选择 MicroPython (通用)在「端口」下拉列表中选择开发板对应的串口通常显示为 Board CDC COMx勾选「运行代码前先重启解释器」和「同步设备的实时时钟」点击「确定」完成配置。如果端口列表中没有出现开发板请尝试重新插拔 USB 数据线更换支持数据传输的 USB 数据线关闭其他占用串口的软件如串口助手、Arduino IDE 等重新烧录 MicroPython 固件安装树莓派 Pico USB 驱动。4. MQTT协议极简解析MQTT是一种轻量级物联网消息传输协议基于发布/订阅模式适用于硬件资源有限的嵌入式设备和带宽有限的网络环境广泛应用于物联网、智能硬件、工业控制等场景核心优势是轻量、高效、可靠、易实现最小控制消息仅需两个数据字节消息头精简可优化网络带宽。极简工作流程4步1. 客户端W55RP20连接到MQTT服务器本文使用免费公共服务器无需注册2. 客户端订阅指定主题如/W55RP20/sub用于接收消息3. 客户端或其他设备向指定主题发布消息4. 所有订阅该主题的客户端均可接收并处理消息。本文实战使用免费公共MQTT服务器broker-cn.emqx.io端口1883无需注册直接连接即可使用进一步节省时间助力10分钟完成实战。5. 核心代码复制与烧录5.1 依赖库说明核心依赖 umqttsimple.py 库用于实现MQTT客户端的连接、订阅、发布等基础功能该库是MicroPython生态中常用的轻量级MQTT库体积小、适配性强无需额外修改直接保存至W55RP20即可使用可通过MicroPython官方仓库下载或直接复制本文提供的完整代码。5.2 完整代码以及烧录历程所需的开发环境Thonny如果你必须编译MicroPython则必须使用Linux或Unix环境。第一步将程序复制到Thonny中然后选择环境为Raspberry Pi Pico。第二步将umqttsimple.py库文件保存到开发板 中。第三步运行程序并打开MQTTX 连接相同的服务器然后订阅主题为开发板的发布主题发布主题为开发板的订阅主题。下面有图第四步在MQTTX发送消息观察回环测试效果。注意因为MicroPython的print函数是启用了stdout缓冲的所以有时候并不会第一时间打印出内容。mqttsimple.py代码#mqtt.py file import network import time try: from machine import Pin, WIZNET_PIO_SPI except ImportError: WIZNET_PIO_SPI None Pin None _DEFAULTS { w5100s-evb-pico: {}, w5500-evb-pico: {}, w6100-evb-pico: {}, w5100s-evb-pico2: {}, w5500-evb-pico2: {}, w6100-evb-pico2: {}, w55rp20-evb-pico: {baudrate: 31250000, sck: 21, cs: 20, mosi: 23, miso: 22, reset: 25}, w6300-evb-pico: {baudrate: 31250000, sck: 17, cs: 16, io0: 18, io1: 19, io2: 20, io3: 21, reset: 22}, w6300-evb-pico2: {baudrate: 31250000, sck: 17, cs: 16, io0: 18, io1: 19, io2: 20, io3: 21, reset: 22}, } _AUTO { w5100s-evb-pico, w5500-evb-pico, w6100-evb-pico, w5100s-evb-pico2,w5500-evb-pico2,w6100-evb-pico2, } _SINGLE {w55rp20-evb-pico} _QSPI {w6300-evb-pico, w6300-evb-pico2} def _pin(x): return x if isinstance(x, Pin) else Pin(x) def wiznet(board, *, dhcpTrue, spiNone, csNone, resetNone, **kw): board board.strip().lower() if board not in _DEFAULTS: raise ValueError(Unsupported board: {}.format(board)) cfg _DEFAULTS[board].copy() cfg.update(kw) if spi is not None: if cs is None or reset is None: raise ValueError(When passing custom spi, also pass cs and reset) nic network.WIZNET6K(spi, cs, reset) else: if board in _AUTO: nic network.WIZNET6K() elif board in _SINGLE: if WIZNET_PIO_SPI is None or Pin is None: raise RuntimeError(WIZNET_PIO_SPI/Pin not available on this port) required [sck, cs, mosi, miso, reset] missing [k for k in required if k not in cfg] if missing: raise ValueError(Missing pins for W55RP20 single-SPI: , .join(missing)) spi WIZNET_PIO_SPI( baudratecfg.get(baudrate, 31250000), sck_pin(cfg[sck]), cs_pin(cfg[cs]), mosi_pin(cfg[mosi]), miso_pin(cfg[miso]), ) nic network.WIZNET6K(spi, _pin(cfg[cs]), _pin(cfg[reset])) elif board in _QSPI: if WIZNET_PIO_SPI is None or Pin is None: raise RuntimeError(WIZNET_PIO_SPI/Pin not available on this port) for k in [sck,cs,io0,io1,io2,io3]: if k not in cfg: raise ValueError(Missing pin {} for W6300 QSPI.format(k)) spi WIZNET_PIO_SPI( baudratecfg.get(baudrate, 31250000), sck_pin(cfg[sck]), cs_pin(cfg[cs]), io0_pin(cfg[io0]), io1_pin(cfg[io1]), io2_pin(cfg[io2]), io3_pin(cfg[io3]), ) nic network.WIZNET6K(spi, _pin(cfg[cs]), _pin(cfg.get(reset, cfg[cs]))) else: raise ValueError(Unexpected board mapping) try: nic.active(True) except AttributeError: pass if dhcp: try: nic.ifconfig(dhcp) except Exception: pass else: ip cfg.get(ip); sn cfg.get(sn); gw cfg.get(gw); dns cfg.get(dns, gw or 8.8.8.8) if not (ip and sn and gw): raise ValueError(Static mode requires ip/sn/gw) nic.ifconfig((ip, sn, gw, dns)) while not nic.isconnected(): print(Waiting for the network to connect...) time.sleep(1) print(MAC Address:, :.join(%02x % b for b in nic.config(mac))) print(IP Address:, nic.ifconfig()[0]) return nic from umqttsimple import MQTTClient from machine import Timer import machine #mqtt config mqtt_params {} mqtt_params[url] broker.emqx.io mqtt_params[port] 1883 mqtt_params[clientid] W55RP20 mqtt_params[pubtopic] /W55RP20/pub mqtt_params[subtopic] /W55RP20/sub mqtt_params[pubqos] 0 mqtt_params[subqos] 0 timer_1s_count 0 tim Timer() client None def w5x00_init(): nic wiznet(w55rp20-evb-pico, dhcpTrue) print(IP :,nic.ifconfig()[0]) print(Subnet Mask:,nic.ifconfig()[1]) print(Gateway :,nic.ifconfig()[2]) print(DNS :,nic.ifconfig()[3],\r\n) return nic def sub_cb(topic, msg): topic topic.decode(utf-8) msg msg.decode(utf-8) if topic mqtt_params[subtopic]: global client print(\r\ntopic:,topic,\r\nrecv:, msg) client.publish(mqtt_params[pubtopic],msg,qosmqtt_params[pubqos]) print(\r\ntopic:,mqtt_params[pubtopic],\r\nsend:,msg) def mqtt_connect(): global client # 每次重连换不同ID防止服务器拒绝 from time import time new_id W55RP20_ str(time()) client MQTTClient(new_id, mqtt_params[url], mqtt_params[port], keepalive60) client.connect() print(Connected to %s MQTT Broker%(mqtt_params[url])) return client def reconnect(): print(MQTT 连接断开正在重新连接...) time.sleep(3) def tick(timer): global timer_1s_count global client timer_1s_count 1 if timer_1s_count 30: timer_1s_count 0 try: client.ping() except: pass def subscribe(client): client.set_callback(sub_cb) client.subscribe(mqtt_params[subtopic],mqtt_params[subqos]) print(subscribed to %s%mqtt_params[subtopic]) def main(): global client print(W55RP20 chip MQTT example) # 网络只初始化一次 w5x00_init() # 核心无限循环 出错自动重连 while True: try: client mqtt_connect() subscribe(client) tim.init(freq1, callbacktick) # 正常运行 while True: client.wait_msg() except Exception as e: print(检测到错误即将重连, e) try: client.disconnect() except: pass time.sleep(2) if __name__ __main__: main()补充umqttsimple.py库代码复制保存至W55RP20开发板与mqtt.py同级#umqttsimple.py file import usocket as socket import ustruct as struct from ubinascii import hexlify class MQTTException(Exception): pass class MQTTClient: def __init__( self, client_id, server, port0, userNone, passwordNone, keepalive0, sslFalse, ssl_params{}, ): if port 0: port 8883 if ssl else 1883 self.client_id client_id self.sock None self.server server self.port port self.ssl ssl self.ssl_params ssl_params self.pid 0 self.cb None self.user user self.pswd password self.keepalive keepalive self.lw_topic None self.lw_msg None self.lw_qos 0 self.lw_retain False def _send_str(self, s): self.sock.write(struct.pack(!H, len(s))) self.sock.write(s) def _recv_len(self): n 0 sh 0 while 1: b self.sock.read(1)[0] n | (b 0x7F) sh if not b 0x80: return n sh 7 def set_callback(self, f): self.cb f def set_last_will(self, topic, msg, retainFalse, qos0): assert 0 qos 2 assert topic self.lw_topic topic self.lw_msg msg self.lw_qos qos self.lw_retain retain def connect(self, clean_sessionTrue): self.sock socket.socket() addr socket.getaddrinfo(self.server, self.port)[0][-1] self.sock.connect(addr) if self.ssl: import ussl self.sock ussl.wrap_socket(self.sock, **self.ssl_params) premsg bytearray(b\x10\0\0\0\0\0) msg bytearray(b\x04MQTT\x04\x02\0\0) sz 10 2 len(self.client_id) msg[6] clean_session 1 if self.user is not None: sz 2 len(self.user) 2 len(self.pswd) msg[6] | 0xC0 if self.keepalive: assert self.keepalive 65536 msg[7] | self.keepalive 8 msg[8] | self.keepalive 0x00FF if self.lw_topic: sz 2 len(self.lw_topic) 2 len(self.lw_msg) msg[6] | 0x4 | (self.lw_qos 0x1) 3 | (self.lw_qos 0x2) 3 msg[6] | self.lw_retain 5 i 1 while sz 0x7F: premsg[i] (sz 0x7F) | 0x80 sz 7 i 1 premsg[i] sz self.sock.write(premsg, i 2) self.sock.write(msg) # print(hex(len(msg)), hexlify(msg, :)) self._send_str(self.client_id) if self.lw_topic: self._send_str(self.lw_topic) self._send_str(self.lw_msg) if self.user is not None: self._send_str(self.user) self._send_str(self.pswd) resp self.sock.read(4) assert resp[0] 0x20 and resp[1] 0x02 if resp[3] ! 0: raise MQTTException(resp[3]) return resp[2] 1 def disconnect(self): self.sock.write(b\xe0\0) self.sock.close() def ping(self): self.sock.write(b\xc0\0) def publish(self, topic, msg, retainFalse, qos0): pkt bytearray(b\x30\0\0\0) pkt[0] | qos 1 | retain sz 2 len(topic) len(msg) if qos 0: sz 2 assert sz 2097152 i 1 while sz 0x7F: pkt[i] (sz 0x7F) | 0x80 sz 7 i 1 pkt[i] sz # print(hex(len(pkt)), hexlify(pkt, :)) self.sock.write(pkt, i 1) self._send_str(topic) if qos 0: self.pid 1 pid self.pid struct.pack_into(!H, pkt, 0, pid) self.sock.write(pkt, 2) self.sock.write(msg) if qos 1: while 1: op self.wait_msg() if op 0x40: sz self.sock.read(1) assert sz b\x02 rcv_pid self.sock.read(2) rcv_pid rcv_pid[0] 8 | rcv_pid[1] if pid rcv_pid: return elif qos 2: assert 0 def subscribe(self, topic, qos0): assert self.cb is not None, Subscribe callback is not set pkt bytearray(b\x82\0\0\0) self.pid 1 struct.pack_into(!BH, pkt, 1, 2 2 len(topic) 1, self.pid) # print(hex(len(pkt)), hexlify(pkt, :)) self.sock.write(pkt) self._send_str(topic) self.sock.write(qos.to_bytes(1, little)) while 1: op self.wait_msg() if op 0x90: resp self.sock.read(4) # print(resp) assert resp[1] pkt[2] and resp[2] pkt[3] if resp[3] 0x80: raise MQTTException(resp[3]) return # Wait for a single incoming MQTT message and process it. # Subscribed messages are delivered to a callback previously # set by .set_callback() method. Other (internal) MQTT # messages processed internally. def wait_msg(self): res self.sock.read(1) # self.sock.setblocking(True) if res is None: return None if res b: raise OSError(-1) if res b\xd0: # PINGRESP sz self.sock.read(1)[0] assert sz 0 return None op res[0] if op 0xF0 ! 0x30: return op sz self._recv_len() topic_len self.sock.read(2) topic_len (topic_len[0] 8) | topic_len[1] topic self.sock.read(topic_len) sz - topic_len 2 if op 6: pid self.sock.read(2) pid pid[0] 8 | pid[1] sz - 2 msg self.sock.read(sz) self.cb(topic, msg) if op 6 2: pkt bytearray(b\x40\x02\0\0) struct.pack_into(!H, pkt, 2, pid) self.sock.write(pkt) elif op 6 4: assert 0 return op # Checks whether a pending message from server is available. # If not, returns immediately with None. Otherwise, does # the same processing as wait_msg. def check_msg(self): # self.sock.setblocking(False) return self.wait_msg()补充wiznet_init.py库代码复制保存至W55RP20开发板与mqtt.py同级import network import time try: from machine import Pin, WIZNET_PIO_SPI except ImportError: WIZNET_PIO_SPI None Pin None _DEFAULTS { # Auto-construct boards (no explicit PIO SPI) w5100s-evb-pico: {}, w5500-evb-pico: {}, w6100-evb-pico: {}, w5100s-evb-pico2: {}, w5500-evb-pico2: {}, w6100-evb-pico2: {}, # W55RP20 — single SPI (PIO SPI) w55rp20-evb-pico: {baudrate: 31250000, sck: 21, cs: 20, mosi: 23, miso: 22, reset: 25}, # W6300 — QSPI QUAD(io0..io3) w6300-evb-pico: {baudrate: 31250000, sck: 17, cs: 16, io0: 18, io1: 19, io2: 20, io3: 21, reset: 22}, w6300-evb-pico2: {baudrate: 31250000, sck: 17, cs: 16, io0: 18, io1: 19, io2: 20, io3: 21, reset: 22}, } _AUTO { w5100s-evb-pico, w5500-evb-pico, w6100-evb-pico, w5100s-evb-pico2,w5500-evb-pico2,w6100-evb-pico2, } _SINGLE {w55rp20-evb-pico} # PIO single-SPI _QSPI {w6300-evb-pico, w6300-evb-pico2} def _pin(x): return x if isinstance(x, Pin) else Pin(x) def wiznet(board, *, dhcpTrue, spiNone, csNone, resetNone, **kw): board board.strip().lower() if board not in _DEFAULTS: raise ValueError(Unsupported board: {}.format(board)) cfg _DEFAULTS[board].copy() cfg.update(kw) # Manual override path: if spi is provided, use it directly if spi is not None: if cs is None or reset is None: raise ValueError(When passing custom spi, also pass cs and reset) nic network.WIZNET6K(spi, cs, reset) else: if board in _AUTO: nic network.WIZNET6K() elif board in _SINGLE: if WIZNET_PIO_SPI is None or Pin is None: raise RuntimeError(WIZNET_PIO_SPI/Pin not available on this port) required [sck, cs, mosi, miso, reset] missing [k for k in required if k not in cfg] if missing: raise ValueError(Missing pins for W55RP20 single-SPI: , .join(missing)) spi WIZNET_PIO_SPI( baudratecfg.get(baudrate, 31250000), sck_pin(cfg[sck]), cs_pin(cfg[cs]), mosi_pin(cfg[mosi]), miso_pin(cfg[miso]), ) nic network.WIZNET6K(spi, _pin(cfg[cs]), _pin(cfg[reset])) elif board in _QSPI: if WIZNET_PIO_SPI is None or Pin is None: raise RuntimeError(WIZNET_PIO_SPI/Pin not available on this port) for k in [sck,cs,io0,io1,io2,io3]: if k not in cfg: raise ValueError(Missing pin {} for W6300 QSPI.format(k)) spi WIZNET_PIO_SPI( baudratecfg.get(baudrate, 31250000), sck_pin(cfg[sck]), cs_pin(cfg[cs]), io0_pin(cfg[io0]), io1_pin(cfg[io1]), io2_pin(cfg[io2]), io3_pin(cfg[io3]), ) nic network.WIZNET6K(spi, _pin(cfg[cs]), _pin(cfg.get(reset, cfg[cs]))) else: raise ValueError(Unexpected board mapping) # Bring up (if supported) try: nic.active(True) except AttributeError: pass if dhcp: try: nic.ifconfig(dhcp) except Exception: pass else: ip cfg.get(ip); sn cfg.get(sn); gw cfg.get(gw); dns cfg.get(dns, gw or 8.8.8.8) if not (ip and sn and gw): raise ValueError(Static mode requires ip/sn/gw) nic.ifconfig((ip, sn, gw, dns)) while not nic.isconnected(): print(Waiting for the network to connect...) time.sleep(1) print(MAC Address:, :.join(%02x % b for b in nic.config(mac))) print(IP Address:, nic.ifconfig()) return nic6. 运行结果与MQTT通信验证6.1 串口输出结果在Thonny中点击运行按钮或按F5键Shell窗口会输出以下内容说明网络初始化和MQTT连接成功说明若打印“Waiting for the network to connect.....”属于正常现象等待1-3秒即可获取IP若一直无法获取IP参考“常见问题排查”部分。MPY: soft reboot W55RP20 chip MQTT example Waiting for the network to connect... Waiting for the network to connect... Waiting for the network to connect... Waiting for the network to connect... MAC Address: 02:90:86:88:4d:56 IP Address: 192.168.1.118 IP : 192.168.1.118 Subnet Mask: 255.255.255.0 Gateway : 192.168.1.1 DNS : 202.96.134.33 Connected to broker.emqx.io MQTT Broker subscribed to /W55RP20/sub6.2 MQTT回环通信验证使用MQTTX工具测试步骤如下不会的照着下面的图来1. 打开MQTTX点击“”号创建新连接- 名称自定义如W55RP20测试- 服务器broker-cn.emqx.io- 端口1883- 其他参数默认点击“连接”2. 订阅W55RP20的发布主题/W55RP20/pub可以自己定义3. 向W55RP20的订阅主题/W55RP20/sub 发送消息如“W55RP20 MQTT测试”可以自己定义4. 观察两个地方的反馈- Thonny Shell窗口打印收到的消息以及“消息回环成功”提示- MQTTX工具收到W55RP20回环发布的消息与发送的消息一致。至此W55RP20的MQTT通信实战完成mqttx网址MQTTX全功能 MQTT 客户端工具7. 常见问题一站式排查问题现象排查步骤1. Thonny 无法识别 W55RP20 端口1. 更换支持数据传输的 USB 数据线2. 重新插拔开发板、关闭其他占用串口的软件如串口助手2. 无法获取 IP 地址一直打印 “等待网络连接中...”1. 检查网线是否插紧2. 路由器是否开启 DHCP3. 网线是否连接路由器 LAN 口不是 WAN 口重启路由器和开发板3. MQTT 连接失败触发重连确认网络已连接、MQTT 服务器地址和端口正确本文使用的公共服务器无需注册、umqttsimple.py 库已正确保存。4. 收到消息但无法回环发布1. 确认代码中发布主题和订阅主题配置正确2. MQTT 连接未断开3. 重启开发板重新运行代码5. 串口无打印或打印乱码1. 确认 Thonny 解释器和端口配置正确2. 固件为 W55RP20 专属固件3. 重新烧录固件8. W55RP20 核心优势对比为了让你更直观地了解 W55RP20 的价值我们对比了目前主流的三种嵌入式以太网方案对比维度W55RP20 集成方案外接 PHY 芯片 方案外接 串口转以太网模块 方案BOM 成本低单芯片中高MCU 模块 外围器件高PCB 面积小仅需网口电路大需预留芯片和布线空间高开发难度低一行代码联网中高调试协议栈、编写驱动低网络稳定性极高WIZnet 专注硬件 TCP/IP 协议栈 25 年不定对于研发人员要求高熟悉协议栈与网络开发才能调试稳定不定视研发公司能力水平CPU 资源占用0%协议栈网络处理完全由硬件完成50% 以上协议栈完全运行在 MCU 上占用相关资源0%硬件 Socket 数量8 个独立硬件 Socket视 MCU 能力而定理论支持多路拓展一般为单路透传网络吞吐量最高 15Mbps视 MCU 能力而定约 3-5Mbps接口易用性单芯片集成要 MCU 带有 MII/RMII 等接口TTL 接口部署难度低MicroPython 成熟固件应用层协议绝大部分均有库文件可灵活添加部署高应用层协议需要手动移植开源库适配视模块集成情况无集成的功能需要自我封包拆包9. 典型应用场景W55RP20 芯片集成以太网功能结合其工业级稳定性非常适合以下应用场景1.工业数据采集网关简化现场部署实现传感器数据的稳定上传2.远程监控终端用于工厂、机房、变电站等环境的设备状态远程监控3.串口转网口设备将传统 RS232/RS485 串口设备快速升级为以太网设备4.智能楼宇节点用于照明、空调、门禁等楼宇设备的网络控制5.工业 PLC 扩展模块为 PLC 增加以太网通信能力实现远程编程和数据采集10. 系列预告与资源获取10.1系列预告下一篇教程我们将讲解 W55RP20 MicroPython开发下的 MQTT阿里云实现上传数据带你了解连接建立、心跳包维护、异常重连等关键机制掌握嵌入式以太网通信的基础能力。10.2 资源获取本文完整代码WIZnet 官方 Gitee 仓库W55RP20 芯片手册WIZnet 官方资料网址如果本文对你有帮助欢迎点赞、收藏、关注你的支持是我们持续更新的动力如有任何问题欢迎在评论区留言我们会第一时间回复。
第 12 篇:W55RP20-EVB-Pico MicroPython 实战:MQTT 协议基础通信验证
本文为 WIZnet W55RP20 芯片 MicroPython教程第 12 篇基于官方最新固件编写代码均经过实际验证可直接烧录运行。版权声明本文为 WIZnet 官方原创技术文章转载请注明出处。前言上一篇实战教程我们已经完成了W55RP20芯片HTTP协议与OneNET平台数据上云。本篇内容我们聚焦物联网核心通信协议-MQTT.MQTT是物联网最主流的轻量级通信协议采用发布/订阅(Pub/Sub)模式带宽占用小、功耗低、支持断线重连非常适合嵌入式以太网设备上云与设备间通信。W55RP20集成硬件 TCP/IP 协议栈搭配MicroPython的umqtt库几行代码即可实现稳定MQTT通信无需关心底层Socket细节。学完本文你将掌握:MQTT协议核心原理与发布/订阅机制W55RP20连接本地/公共MQTT服务器实现消息发布、订阅、回调处理心跳保活、断线重连等工业级稳定机制本地MQTT测试环境搭建传统嵌入式MQTT开发的3大痛点W55RP20一键解决1. 协议栈调试复杂软件TCP/IP协议栈需手动移植、调试占用MCU资源易出现卡顿、内存溢出W55RP20内置硬件TCP/IP协议栈网络层处理完全由硬件完成不占用CPU资源无需调试协议栈。2. 硬件接线繁琐外接网络模块需手动焊接、调试接口时序新手易出错W55RP20板载以太网接口无需额外外接模块即插即用大幅简化接线操作。3. 入门门槛高MQTT客户端连接、心跳保活等细节需手动编写调试周期长本文提供可直接运行的完整代码注释清晰配合10分钟分步操作新手也能快速出效果依托MicroPython免编译特性代码修改即时生效、调试实时反馈快速获得成就感。本节课核心目标· 完成W55RP20与电脑的硬件连接、开发环境配置· 理解MQTT协议核心原理与极简工作流程· 烧录MQTT客户端代码实现与公共MQTT服务器的连接· 完成主题订阅、消息发布与回环通信测试· 掌握常见故障的快速排查方法系列教程学习路径本专栏共 16 篇循序渐进覆盖 W55RP20-EVB-Pico 模块 C语言开发全流程第 1 篇静态 IP 配置与网络基础第 2 篇DHCP 自动联网与网络诊断第 3 篇TCP Client 客户端通信第 4 篇TCP Server 服务端通信第 5 篇UDP 单播数据通信第 6 篇UDP 组播/广播数据通信第 7 篇DNS 域名解析第 8 篇NTP 从网络获取时间第 9 篇HTTP Client 客户端请求第 10 篇HTTP Server 服务端搭建第 11 篇HTTP 协议与 OneNET 平台数据上云第 12 篇MQTT 协议基础通信验证本文第 13 篇MQTT 协议与阿里云平台对接第 14 篇MQTT 协议与 OneNET 平台对接第 15 篇MQTT 协议与 ThingSpeak 平台对接第 16 篇Modbus 工业协议通信建议收藏本专栏跟随教程逐步学习所有代码均会同步更新至官方 Gitee 仓库目录1. 准备工作1.1 软件准备1.2 硬件准备2. 烧录 W55RP20-EVB-Pico 模块专属 MicroPython 固件3. 硬件连接与开发环境配置3.1 硬件连接3.1.1 基础连接供电调试3.1.2 以太网连接3.1.3 模块与开发板接线3.2 Thonny 开发环境配置4. MQTT协议极简解析5. 核心代码复制与烧录5.1 依赖库说明5.2 完整代码以及烧录历程6. 运行结果与MQTT通信验证6.1 串口输出结果6.2 MQTT回环通信验证7. 常见问题一站式排查8. W55RP20 核心优势对比9. 典型应用场景10. 系列预告与资源获取10.1系列预告10.2 资源获取1. 准备工作1.1 软件准备所需软件均为免费版本按要求下载安装即可无需额外付费。软件名称版本要求下载地址说明Thonny4.0 及以上Thonny 官方下载轻量级 MicroPython IDE支持代码编辑、烧录与串口调试新手友好W55RP20-EVB-Pico 模块 MicroPython 固件最新稳定版WIZnet 官方固件下载专为 W55RP20-EVB-Pico 模块 编写已集成 WIZnet 硬件驱动与协议栈1.2 硬件准备W55RP20-EVB-Pico × 1Micro USB 数据线必须支持数据传输不能使用纯充电线× 1标准网线 × 1开启 DHCP 功能的路由器 / 交换机 × 1用于获取网络参数实现 DNS 解析W55RP20-EVB-Pico 模块已集成以太网相关器件无需额外焊接飞线配合 RP2040 开发板可快速搭建开发环境大幅降低接线错误和硬件故障概率。2. 烧录 W55RP20-EVB-Pico 模块专属 MicroPython 固件W55RP20-EVB-Pico 模块 完全兼容树莓派 Pico 的 UF2 固件烧录方式操作简单无需额外烧录器新手可快速上手按住 RP2040 开发板上的 BOOTSEL 按键不放使用 Micro USB 数据线连接开发板与电脑待电脑识别出名为 RPI-RP2 的 U 盘后松开 BOOTSEL 按键将下载好的 W5500_RP2040_firmware.uf2 固件文件拖拽到 U 盘中开发板会自动重启固件烧录完成。注意如果电脑没有识别出 RPI-RP2 U 盘请尝试更换 USB 数据线、重新插拔开发板或更换电脑 USB 接口优先使用 USB 2.0 接口。3. 硬件连接与开发环境配置3.1 硬件连接W55RP20-EVB-Pico 模块连接分为两步分别实现供电/调试和以太网连接操作简单无需复杂接线3.1.1 基础连接供电调试使用 Micro USB 数据线连接 RP2040 开发板与电脑用于开发板供电、代码烧录和串口调试。3.1.2 以太网连接使用网线连接 W55RP20-EVB-Pico 模块的以太网接口与路由器的 LAN 口或直接连接电脑网口需手动配置电脑 IP 与开发板同网段。3.1.3 模块与开发板接线若使用分离式模块与开发板需按以下引脚对应连接SPI 通信【硬件预留】此处插入硬件连接示意图3.2 Thonny 开发环境配置打开 Thonny 软件按以下步骤配置开发环境确保代码能正常烧录和运行点击顶部菜单栏「运行」→「配置解释器」切换到「解释器」选项卡在「解释器」下拉列表中选择 MicroPython (通用)在「端口」下拉列表中选择开发板对应的串口通常显示为 Board CDC COMx勾选「运行代码前先重启解释器」和「同步设备的实时时钟」点击「确定」完成配置。如果端口列表中没有出现开发板请尝试重新插拔 USB 数据线更换支持数据传输的 USB 数据线关闭其他占用串口的软件如串口助手、Arduino IDE 等重新烧录 MicroPython 固件安装树莓派 Pico USB 驱动。4. MQTT协议极简解析MQTT是一种轻量级物联网消息传输协议基于发布/订阅模式适用于硬件资源有限的嵌入式设备和带宽有限的网络环境广泛应用于物联网、智能硬件、工业控制等场景核心优势是轻量、高效、可靠、易实现最小控制消息仅需两个数据字节消息头精简可优化网络带宽。极简工作流程4步1. 客户端W55RP20连接到MQTT服务器本文使用免费公共服务器无需注册2. 客户端订阅指定主题如/W55RP20/sub用于接收消息3. 客户端或其他设备向指定主题发布消息4. 所有订阅该主题的客户端均可接收并处理消息。本文实战使用免费公共MQTT服务器broker-cn.emqx.io端口1883无需注册直接连接即可使用进一步节省时间助力10分钟完成实战。5. 核心代码复制与烧录5.1 依赖库说明核心依赖 umqttsimple.py 库用于实现MQTT客户端的连接、订阅、发布等基础功能该库是MicroPython生态中常用的轻量级MQTT库体积小、适配性强无需额外修改直接保存至W55RP20即可使用可通过MicroPython官方仓库下载或直接复制本文提供的完整代码。5.2 完整代码以及烧录历程所需的开发环境Thonny如果你必须编译MicroPython则必须使用Linux或Unix环境。第一步将程序复制到Thonny中然后选择环境为Raspberry Pi Pico。第二步将umqttsimple.py库文件保存到开发板 中。第三步运行程序并打开MQTTX 连接相同的服务器然后订阅主题为开发板的发布主题发布主题为开发板的订阅主题。下面有图第四步在MQTTX发送消息观察回环测试效果。注意因为MicroPython的print函数是启用了stdout缓冲的所以有时候并不会第一时间打印出内容。mqttsimple.py代码#mqtt.py file import network import time try: from machine import Pin, WIZNET_PIO_SPI except ImportError: WIZNET_PIO_SPI None Pin None _DEFAULTS { w5100s-evb-pico: {}, w5500-evb-pico: {}, w6100-evb-pico: {}, w5100s-evb-pico2: {}, w5500-evb-pico2: {}, w6100-evb-pico2: {}, w55rp20-evb-pico: {baudrate: 31250000, sck: 21, cs: 20, mosi: 23, miso: 22, reset: 25}, w6300-evb-pico: {baudrate: 31250000, sck: 17, cs: 16, io0: 18, io1: 19, io2: 20, io3: 21, reset: 22}, w6300-evb-pico2: {baudrate: 31250000, sck: 17, cs: 16, io0: 18, io1: 19, io2: 20, io3: 21, reset: 22}, } _AUTO { w5100s-evb-pico, w5500-evb-pico, w6100-evb-pico, w5100s-evb-pico2,w5500-evb-pico2,w6100-evb-pico2, } _SINGLE {w55rp20-evb-pico} _QSPI {w6300-evb-pico, w6300-evb-pico2} def _pin(x): return x if isinstance(x, Pin) else Pin(x) def wiznet(board, *, dhcpTrue, spiNone, csNone, resetNone, **kw): board board.strip().lower() if board not in _DEFAULTS: raise ValueError(Unsupported board: {}.format(board)) cfg _DEFAULTS[board].copy() cfg.update(kw) if spi is not None: if cs is None or reset is None: raise ValueError(When passing custom spi, also pass cs and reset) nic network.WIZNET6K(spi, cs, reset) else: if board in _AUTO: nic network.WIZNET6K() elif board in _SINGLE: if WIZNET_PIO_SPI is None or Pin is None: raise RuntimeError(WIZNET_PIO_SPI/Pin not available on this port) required [sck, cs, mosi, miso, reset] missing [k for k in required if k not in cfg] if missing: raise ValueError(Missing pins for W55RP20 single-SPI: , .join(missing)) spi WIZNET_PIO_SPI( baudratecfg.get(baudrate, 31250000), sck_pin(cfg[sck]), cs_pin(cfg[cs]), mosi_pin(cfg[mosi]), miso_pin(cfg[miso]), ) nic network.WIZNET6K(spi, _pin(cfg[cs]), _pin(cfg[reset])) elif board in _QSPI: if WIZNET_PIO_SPI is None or Pin is None: raise RuntimeError(WIZNET_PIO_SPI/Pin not available on this port) for k in [sck,cs,io0,io1,io2,io3]: if k not in cfg: raise ValueError(Missing pin {} for W6300 QSPI.format(k)) spi WIZNET_PIO_SPI( baudratecfg.get(baudrate, 31250000), sck_pin(cfg[sck]), cs_pin(cfg[cs]), io0_pin(cfg[io0]), io1_pin(cfg[io1]), io2_pin(cfg[io2]), io3_pin(cfg[io3]), ) nic network.WIZNET6K(spi, _pin(cfg[cs]), _pin(cfg.get(reset, cfg[cs]))) else: raise ValueError(Unexpected board mapping) try: nic.active(True) except AttributeError: pass if dhcp: try: nic.ifconfig(dhcp) except Exception: pass else: ip cfg.get(ip); sn cfg.get(sn); gw cfg.get(gw); dns cfg.get(dns, gw or 8.8.8.8) if not (ip and sn and gw): raise ValueError(Static mode requires ip/sn/gw) nic.ifconfig((ip, sn, gw, dns)) while not nic.isconnected(): print(Waiting for the network to connect...) time.sleep(1) print(MAC Address:, :.join(%02x % b for b in nic.config(mac))) print(IP Address:, nic.ifconfig()[0]) return nic from umqttsimple import MQTTClient from machine import Timer import machine #mqtt config mqtt_params {} mqtt_params[url] broker.emqx.io mqtt_params[port] 1883 mqtt_params[clientid] W55RP20 mqtt_params[pubtopic] /W55RP20/pub mqtt_params[subtopic] /W55RP20/sub mqtt_params[pubqos] 0 mqtt_params[subqos] 0 timer_1s_count 0 tim Timer() client None def w5x00_init(): nic wiznet(w55rp20-evb-pico, dhcpTrue) print(IP :,nic.ifconfig()[0]) print(Subnet Mask:,nic.ifconfig()[1]) print(Gateway :,nic.ifconfig()[2]) print(DNS :,nic.ifconfig()[3],\r\n) return nic def sub_cb(topic, msg): topic topic.decode(utf-8) msg msg.decode(utf-8) if topic mqtt_params[subtopic]: global client print(\r\ntopic:,topic,\r\nrecv:, msg) client.publish(mqtt_params[pubtopic],msg,qosmqtt_params[pubqos]) print(\r\ntopic:,mqtt_params[pubtopic],\r\nsend:,msg) def mqtt_connect(): global client # 每次重连换不同ID防止服务器拒绝 from time import time new_id W55RP20_ str(time()) client MQTTClient(new_id, mqtt_params[url], mqtt_params[port], keepalive60) client.connect() print(Connected to %s MQTT Broker%(mqtt_params[url])) return client def reconnect(): print(MQTT 连接断开正在重新连接...) time.sleep(3) def tick(timer): global timer_1s_count global client timer_1s_count 1 if timer_1s_count 30: timer_1s_count 0 try: client.ping() except: pass def subscribe(client): client.set_callback(sub_cb) client.subscribe(mqtt_params[subtopic],mqtt_params[subqos]) print(subscribed to %s%mqtt_params[subtopic]) def main(): global client print(W55RP20 chip MQTT example) # 网络只初始化一次 w5x00_init() # 核心无限循环 出错自动重连 while True: try: client mqtt_connect() subscribe(client) tim.init(freq1, callbacktick) # 正常运行 while True: client.wait_msg() except Exception as e: print(检测到错误即将重连, e) try: client.disconnect() except: pass time.sleep(2) if __name__ __main__: main()补充umqttsimple.py库代码复制保存至W55RP20开发板与mqtt.py同级#umqttsimple.py file import usocket as socket import ustruct as struct from ubinascii import hexlify class MQTTException(Exception): pass class MQTTClient: def __init__( self, client_id, server, port0, userNone, passwordNone, keepalive0, sslFalse, ssl_params{}, ): if port 0: port 8883 if ssl else 1883 self.client_id client_id self.sock None self.server server self.port port self.ssl ssl self.ssl_params ssl_params self.pid 0 self.cb None self.user user self.pswd password self.keepalive keepalive self.lw_topic None self.lw_msg None self.lw_qos 0 self.lw_retain False def _send_str(self, s): self.sock.write(struct.pack(!H, len(s))) self.sock.write(s) def _recv_len(self): n 0 sh 0 while 1: b self.sock.read(1)[0] n | (b 0x7F) sh if not b 0x80: return n sh 7 def set_callback(self, f): self.cb f def set_last_will(self, topic, msg, retainFalse, qos0): assert 0 qos 2 assert topic self.lw_topic topic self.lw_msg msg self.lw_qos qos self.lw_retain retain def connect(self, clean_sessionTrue): self.sock socket.socket() addr socket.getaddrinfo(self.server, self.port)[0][-1] self.sock.connect(addr) if self.ssl: import ussl self.sock ussl.wrap_socket(self.sock, **self.ssl_params) premsg bytearray(b\x10\0\0\0\0\0) msg bytearray(b\x04MQTT\x04\x02\0\0) sz 10 2 len(self.client_id) msg[6] clean_session 1 if self.user is not None: sz 2 len(self.user) 2 len(self.pswd) msg[6] | 0xC0 if self.keepalive: assert self.keepalive 65536 msg[7] | self.keepalive 8 msg[8] | self.keepalive 0x00FF if self.lw_topic: sz 2 len(self.lw_topic) 2 len(self.lw_msg) msg[6] | 0x4 | (self.lw_qos 0x1) 3 | (self.lw_qos 0x2) 3 msg[6] | self.lw_retain 5 i 1 while sz 0x7F: premsg[i] (sz 0x7F) | 0x80 sz 7 i 1 premsg[i] sz self.sock.write(premsg, i 2) self.sock.write(msg) # print(hex(len(msg)), hexlify(msg, :)) self._send_str(self.client_id) if self.lw_topic: self._send_str(self.lw_topic) self._send_str(self.lw_msg) if self.user is not None: self._send_str(self.user) self._send_str(self.pswd) resp self.sock.read(4) assert resp[0] 0x20 and resp[1] 0x02 if resp[3] ! 0: raise MQTTException(resp[3]) return resp[2] 1 def disconnect(self): self.sock.write(b\xe0\0) self.sock.close() def ping(self): self.sock.write(b\xc0\0) def publish(self, topic, msg, retainFalse, qos0): pkt bytearray(b\x30\0\0\0) pkt[0] | qos 1 | retain sz 2 len(topic) len(msg) if qos 0: sz 2 assert sz 2097152 i 1 while sz 0x7F: pkt[i] (sz 0x7F) | 0x80 sz 7 i 1 pkt[i] sz # print(hex(len(pkt)), hexlify(pkt, :)) self.sock.write(pkt, i 1) self._send_str(topic) if qos 0: self.pid 1 pid self.pid struct.pack_into(!H, pkt, 0, pid) self.sock.write(pkt, 2) self.sock.write(msg) if qos 1: while 1: op self.wait_msg() if op 0x40: sz self.sock.read(1) assert sz b\x02 rcv_pid self.sock.read(2) rcv_pid rcv_pid[0] 8 | rcv_pid[1] if pid rcv_pid: return elif qos 2: assert 0 def subscribe(self, topic, qos0): assert self.cb is not None, Subscribe callback is not set pkt bytearray(b\x82\0\0\0) self.pid 1 struct.pack_into(!BH, pkt, 1, 2 2 len(topic) 1, self.pid) # print(hex(len(pkt)), hexlify(pkt, :)) self.sock.write(pkt) self._send_str(topic) self.sock.write(qos.to_bytes(1, little)) while 1: op self.wait_msg() if op 0x90: resp self.sock.read(4) # print(resp) assert resp[1] pkt[2] and resp[2] pkt[3] if resp[3] 0x80: raise MQTTException(resp[3]) return # Wait for a single incoming MQTT message and process it. # Subscribed messages are delivered to a callback previously # set by .set_callback() method. Other (internal) MQTT # messages processed internally. def wait_msg(self): res self.sock.read(1) # self.sock.setblocking(True) if res is None: return None if res b: raise OSError(-1) if res b\xd0: # PINGRESP sz self.sock.read(1)[0] assert sz 0 return None op res[0] if op 0xF0 ! 0x30: return op sz self._recv_len() topic_len self.sock.read(2) topic_len (topic_len[0] 8) | topic_len[1] topic self.sock.read(topic_len) sz - topic_len 2 if op 6: pid self.sock.read(2) pid pid[0] 8 | pid[1] sz - 2 msg self.sock.read(sz) self.cb(topic, msg) if op 6 2: pkt bytearray(b\x40\x02\0\0) struct.pack_into(!H, pkt, 2, pid) self.sock.write(pkt) elif op 6 4: assert 0 return op # Checks whether a pending message from server is available. # If not, returns immediately with None. Otherwise, does # the same processing as wait_msg. def check_msg(self): # self.sock.setblocking(False) return self.wait_msg()补充wiznet_init.py库代码复制保存至W55RP20开发板与mqtt.py同级import network import time try: from machine import Pin, WIZNET_PIO_SPI except ImportError: WIZNET_PIO_SPI None Pin None _DEFAULTS { # Auto-construct boards (no explicit PIO SPI) w5100s-evb-pico: {}, w5500-evb-pico: {}, w6100-evb-pico: {}, w5100s-evb-pico2: {}, w5500-evb-pico2: {}, w6100-evb-pico2: {}, # W55RP20 — single SPI (PIO SPI) w55rp20-evb-pico: {baudrate: 31250000, sck: 21, cs: 20, mosi: 23, miso: 22, reset: 25}, # W6300 — QSPI QUAD(io0..io3) w6300-evb-pico: {baudrate: 31250000, sck: 17, cs: 16, io0: 18, io1: 19, io2: 20, io3: 21, reset: 22}, w6300-evb-pico2: {baudrate: 31250000, sck: 17, cs: 16, io0: 18, io1: 19, io2: 20, io3: 21, reset: 22}, } _AUTO { w5100s-evb-pico, w5500-evb-pico, w6100-evb-pico, w5100s-evb-pico2,w5500-evb-pico2,w6100-evb-pico2, } _SINGLE {w55rp20-evb-pico} # PIO single-SPI _QSPI {w6300-evb-pico, w6300-evb-pico2} def _pin(x): return x if isinstance(x, Pin) else Pin(x) def wiznet(board, *, dhcpTrue, spiNone, csNone, resetNone, **kw): board board.strip().lower() if board not in _DEFAULTS: raise ValueError(Unsupported board: {}.format(board)) cfg _DEFAULTS[board].copy() cfg.update(kw) # Manual override path: if spi is provided, use it directly if spi is not None: if cs is None or reset is None: raise ValueError(When passing custom spi, also pass cs and reset) nic network.WIZNET6K(spi, cs, reset) else: if board in _AUTO: nic network.WIZNET6K() elif board in _SINGLE: if WIZNET_PIO_SPI is None or Pin is None: raise RuntimeError(WIZNET_PIO_SPI/Pin not available on this port) required [sck, cs, mosi, miso, reset] missing [k for k in required if k not in cfg] if missing: raise ValueError(Missing pins for W55RP20 single-SPI: , .join(missing)) spi WIZNET_PIO_SPI( baudratecfg.get(baudrate, 31250000), sck_pin(cfg[sck]), cs_pin(cfg[cs]), mosi_pin(cfg[mosi]), miso_pin(cfg[miso]), ) nic network.WIZNET6K(spi, _pin(cfg[cs]), _pin(cfg[reset])) elif board in _QSPI: if WIZNET_PIO_SPI is None or Pin is None: raise RuntimeError(WIZNET_PIO_SPI/Pin not available on this port) for k in [sck,cs,io0,io1,io2,io3]: if k not in cfg: raise ValueError(Missing pin {} for W6300 QSPI.format(k)) spi WIZNET_PIO_SPI( baudratecfg.get(baudrate, 31250000), sck_pin(cfg[sck]), cs_pin(cfg[cs]), io0_pin(cfg[io0]), io1_pin(cfg[io1]), io2_pin(cfg[io2]), io3_pin(cfg[io3]), ) nic network.WIZNET6K(spi, _pin(cfg[cs]), _pin(cfg.get(reset, cfg[cs]))) else: raise ValueError(Unexpected board mapping) # Bring up (if supported) try: nic.active(True) except AttributeError: pass if dhcp: try: nic.ifconfig(dhcp) except Exception: pass else: ip cfg.get(ip); sn cfg.get(sn); gw cfg.get(gw); dns cfg.get(dns, gw or 8.8.8.8) if not (ip and sn and gw): raise ValueError(Static mode requires ip/sn/gw) nic.ifconfig((ip, sn, gw, dns)) while not nic.isconnected(): print(Waiting for the network to connect...) time.sleep(1) print(MAC Address:, :.join(%02x % b for b in nic.config(mac))) print(IP Address:, nic.ifconfig()) return nic6. 运行结果与MQTT通信验证6.1 串口输出结果在Thonny中点击运行按钮或按F5键Shell窗口会输出以下内容说明网络初始化和MQTT连接成功说明若打印“Waiting for the network to connect.....”属于正常现象等待1-3秒即可获取IP若一直无法获取IP参考“常见问题排查”部分。MPY: soft reboot W55RP20 chip MQTT example Waiting for the network to connect... Waiting for the network to connect... Waiting for the network to connect... Waiting for the network to connect... MAC Address: 02:90:86:88:4d:56 IP Address: 192.168.1.118 IP : 192.168.1.118 Subnet Mask: 255.255.255.0 Gateway : 192.168.1.1 DNS : 202.96.134.33 Connected to broker.emqx.io MQTT Broker subscribed to /W55RP20/sub6.2 MQTT回环通信验证使用MQTTX工具测试步骤如下不会的照着下面的图来1. 打开MQTTX点击“”号创建新连接- 名称自定义如W55RP20测试- 服务器broker-cn.emqx.io- 端口1883- 其他参数默认点击“连接”2. 订阅W55RP20的发布主题/W55RP20/pub可以自己定义3. 向W55RP20的订阅主题/W55RP20/sub 发送消息如“W55RP20 MQTT测试”可以自己定义4. 观察两个地方的反馈- Thonny Shell窗口打印收到的消息以及“消息回环成功”提示- MQTTX工具收到W55RP20回环发布的消息与发送的消息一致。至此W55RP20的MQTT通信实战完成mqttx网址MQTTX全功能 MQTT 客户端工具7. 常见问题一站式排查问题现象排查步骤1. Thonny 无法识别 W55RP20 端口1. 更换支持数据传输的 USB 数据线2. 重新插拔开发板、关闭其他占用串口的软件如串口助手2. 无法获取 IP 地址一直打印 “等待网络连接中...”1. 检查网线是否插紧2. 路由器是否开启 DHCP3. 网线是否连接路由器 LAN 口不是 WAN 口重启路由器和开发板3. MQTT 连接失败触发重连确认网络已连接、MQTT 服务器地址和端口正确本文使用的公共服务器无需注册、umqttsimple.py 库已正确保存。4. 收到消息但无法回环发布1. 确认代码中发布主题和订阅主题配置正确2. MQTT 连接未断开3. 重启开发板重新运行代码5. 串口无打印或打印乱码1. 确认 Thonny 解释器和端口配置正确2. 固件为 W55RP20 专属固件3. 重新烧录固件8. W55RP20 核心优势对比为了让你更直观地了解 W55RP20 的价值我们对比了目前主流的三种嵌入式以太网方案对比维度W55RP20 集成方案外接 PHY 芯片 方案外接 串口转以太网模块 方案BOM 成本低单芯片中高MCU 模块 外围器件高PCB 面积小仅需网口电路大需预留芯片和布线空间高开发难度低一行代码联网中高调试协议栈、编写驱动低网络稳定性极高WIZnet 专注硬件 TCP/IP 协议栈 25 年不定对于研发人员要求高熟悉协议栈与网络开发才能调试稳定不定视研发公司能力水平CPU 资源占用0%协议栈网络处理完全由硬件完成50% 以上协议栈完全运行在 MCU 上占用相关资源0%硬件 Socket 数量8 个独立硬件 Socket视 MCU 能力而定理论支持多路拓展一般为单路透传网络吞吐量最高 15Mbps视 MCU 能力而定约 3-5Mbps接口易用性单芯片集成要 MCU 带有 MII/RMII 等接口TTL 接口部署难度低MicroPython 成熟固件应用层协议绝大部分均有库文件可灵活添加部署高应用层协议需要手动移植开源库适配视模块集成情况无集成的功能需要自我封包拆包9. 典型应用场景W55RP20 芯片集成以太网功能结合其工业级稳定性非常适合以下应用场景1.工业数据采集网关简化现场部署实现传感器数据的稳定上传2.远程监控终端用于工厂、机房、变电站等环境的设备状态远程监控3.串口转网口设备将传统 RS232/RS485 串口设备快速升级为以太网设备4.智能楼宇节点用于照明、空调、门禁等楼宇设备的网络控制5.工业 PLC 扩展模块为 PLC 增加以太网通信能力实现远程编程和数据采集10. 系列预告与资源获取10.1系列预告下一篇教程我们将讲解 W55RP20 MicroPython开发下的 MQTT阿里云实现上传数据带你了解连接建立、心跳包维护、异常重连等关键机制掌握嵌入式以太网通信的基础能力。10.2 资源获取本文完整代码WIZnet 官方 Gitee 仓库W55RP20 芯片手册WIZnet 官方资料网址如果本文对你有帮助欢迎点赞、收藏、关注你的支持是我们持续更新的动力如有任何问题欢迎在评论区留言我们会第一时间回复。