物联网 基于netty控制报文结构(连接与握手)

物联网 基于netty控制报文结构(连接与握手) 物联网 基于netty控制报文结构(连接与握手)简述源码(netty-sample-04-connect)核心概念简易代码server代码client代码报文结构回顾CONNECT客户端 → 服务端CONNACK服务端 → 客户端有效载荷结构真正 MQTT 二进制编码部分源码服务端客户端物联网 基于netty控制报文结构(连接与握手)简述MQTT 协议中 CONNECT 和 CONNACK 报文的编解码展示客户端如何发起连接请求、服务端如何解析并返回连接确认源码(netty-sample-04-connect)https://gitee.com/kcnf-iot/iot-sample/tree/master/netty/netty-sample-04核心概念CONNECT客户端 → 服务端告诉服务端“我是谁ClientId心跳间隔多少是否清理会话”CONNACK服务端 → 客户端回复“连接成功返回码0” 或 失败原因1~5简易代码server代码package com.jysemel.iot; import com.jysemel.iot.handler.SimpleServerHandler; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; public class SimpleConnectServer { public static void main(String[] args) throws Exception { EventLoopGroup boss new NioEventLoopGroup(1); EventLoopGroup worker new NioEventLoopGroup(); try { ServerBootstrap b new ServerBootstrap(); b.group(boss, worker) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializerSocketChannel() { Override protected void initChannel(SocketChannel ch) { ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new StringEncoder()); ch.pipeline().addLast(new SimpleServerHandler()); } }); b.bind(1883).sync().channel().closeFuture().sync(); } finally { boss.shutdownGracefully(); worker.shutdownGracefully(); } } }client代码package com.jysemel.iot; import com.jysemel.iot.handler.SimpleClientHandler; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; public class SimpleConnectClient { public static void main(String[] args) throws Exception { EventLoopGroup group new NioEventLoopGroup(); try { Bootstrap b new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializerSocketChannel() { Override protected void initChannel(SocketChannel ch) { ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new StringEncoder()); ch.pipeline().addLast(new SimpleClientHandler()); } }); Channel ch b.connect(127.0.0.1, 1883).sync().channel(); // 发送模拟的 CONNECT 字符串 ch.writeAndFlush(CONNECT:clientIdtest,keepAlive60,cleanSession1\n); ch.closeFuture().sync(); } finally { group.shutdownGracefully(); } } }报文结构回顾CONNECT客户端 → 服务端固定头可变头有效载荷类型1标志位0剩余长度变长编码协议名MQTT2字节长度4字节数据协议级别1字节MQTT 3.1.1 为 4连接标志1字节保活时间2字节客户端标识符UTF-8 字符串必须遗嘱主题可选遗嘱消息可选用户名可选密码可选CONNACK服务端 → 客户端固定头可变头类型2标志位0剩余长度2连接确认标志1字节当前仅 bit0 会话存在标志返回码1字节0成功1协议不可接受2标识符被拒绝3服务不可用4用户名/密码错误5未授权剩余长度变长编码每个字节最高位表示是否还有后续字节低 7 位表示数值最多 4 字节有效载荷结构在网络协议中有效载荷 指的是报文中 真正要传输的业务数据不包括协议本身的控制字段如固定头、可变头、长度字段等快递单 固定头 可变头包含收件人地址、包裹重量、物流编号等控制信息 包裹里的商品 有效载荷你真正想寄给别人的东西真正 MQTT 二进制编码最简 CONNECT只包含 ClientId的十六进制固定头0x10类型1标志0 剩余长度 可变头 00 04 M Q T T协议名 04协议级别 02连接标志Clean Session1 00 3CKeep Alive60秒 有效载荷00 04 t e s tClientId test部分源码服务端package com.jysemel.iot; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.ByteToMessageDecoder; import java.util.List; public class MqttConnectServer { public static void main(String[] args) throws Exception { EventLoopGroup boss new NioEventLoopGroup(1); EventLoopGroup worker new NioEventLoopGroup(); try { ServerBootstrap b new ServerBootstrap(); b.group(boss, worker).channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializerSocketChannel() { Override protected void initChannel(SocketChannel ch) { ch.pipeline().addLast(new ByteToMessageDecoder() { Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, ListObject out) { if (in.readableBytes() 2) return; in.markReaderIndex(); byte first in.readByte(); int type (first 4) 0x0F; if (type ! 1) { in.resetReaderIndex(); return; } // 这里需要完整解码为了方便直接调用上面的 decodeConnect 方法 // 实际应保存解码结果但本例省略 System.out.println(收到 CONNECT回复 CONNACK); ctx.writeAndFlush(Unpooled.wrappedBuffer(buildConnack((byte)0))); } }); } }); b.bind(1883).sync().channel().closeFuture().sync(); } finally { boss.shutdownGracefully(); worker.shutdownGracefully(); } } // 包含 buildConnack 方法 public static byte[] buildConnack(byte returnCode) { ByteBuf buf Unpooled.buffer(); buf.writeByte(0x20); // 类型2flag0 buf.writeByte(0x02); // 剩余长度2 buf.writeByte(0x00); // 会话存在标志 0 buf.writeByte(returnCode); // 0成功 byte[] result new byte[buf.readableBytes()]; buf.readBytes(result); buf.release(); return result; } }客户端package com.jysemel.iot; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import java.nio.charset.StandardCharsets; public class MqttConnectClient { public static void main(String[] args) throws Exception { EventLoopGroup group new NioEventLoopGroup(); try { Bootstrap b new Bootstrap(); b.group(group).channel(NioSocketChannel.class) .handler(new ChannelInitializerSocketChannel() { Override protected void initChannel(SocketChannel ch) { ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { Override public void channelActive(ChannelHandlerContext ctx) { byte[] connect buildMinimalConnect(testClient); ctx.writeAndFlush(Unpooled.wrappedBuffer(connect)); } Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf buf (ByteBuf) msg; byte first buf.readByte(); int type (first 4) 0x0F; if (type 2) { buf.readByte(); // 剩余长度 byte session buf.readByte(); byte code buf.readByte(); System.out.println(CONNACK received, returnCode code); } ctx.close(); } }); } }); b.connect(127.0.0.1, 1883).sync().channel().closeFuture().sync(); } finally { group.shutdownGracefully(); } } // 包含 buildMinimalConnect 方法 public static byte[] buildMinimalConnect(String clientId) { ByteBuf buf Unpooled.buffer(); // 固定头先写类型剩余长度占位 buf.writeByte(0x10); // 类型1flag0 int lenPos buf.writerIndex(); buf.writeByte(0); // 剩余长度占位 // 可变头 buf.writeShort(4); // 协议名长度 buf.writeBytes(MQTT.getBytes()); buf.writeByte(4); // 协议级别 4 buf.writeByte(0x02); // 连接标志Clean Session1其他0 buf.writeShort(60); // Keep Alive 60秒 // 有效载荷ClientId byte[] idBytes clientId.getBytes(StandardCharsets.UTF_8); buf.writeShort(idBytes.length); buf.writeBytes(idBytes); // 计算并回填剩余长度从可变头开始到结尾的字节数 int remainingLen buf.writerIndex() - (lenPos 1); buf.setByte(lenPos, remainingLen); byte[] result new byte[buf.readableBytes()]; buf.readBytes(result); buf.release(); return result; } }