物联网 基于netty构建mqtt服务udp支持简述源码代码物联网 基于netty构建mqtt服务udp支持简述应用中分别启动TCP和UDP两个服务实现协议的分离与消息互通源码https://gitee.com/kcnf-iot/iot-sample/tree/master/netty/netty-sample-02代码package com.jysemel.iot; import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioDatagramChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.mqtt.MqttDecoder; import io.netty.handler.codec.mqtt.MqttEncoder; public class DualProtocolServer { public static void main(String[] args) throws Exception { EventLoopGroup bossGroup new NioEventLoopGroup(1); EventLoopGroup workerGroup new NioEventLoopGroup(); EventLoopGroup udpGroup new NioEventLoopGroup(); // UDP工作组 try { // --- 1. MQTT over TCP 服务 (端口1883) --- ServerBootstrap tcpBootstrap new ServerBootstrap(); tcpBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializerSocketChannel() { Override protected void initChannel(SocketChannel ch) { ChannelPipeline p ch.pipeline(); p.addLast(new MqttDecoder(64 * 1024)); p.addLast(MqttEncoder.INSTANCE); p.addLast(mqttHandler, new MqttServerHandler()); // MQTT业务逻辑 } }); ChannelFuture tcpFuture tcpBootstrap.bind(1883).sync(); System.out.println([TCP] MQTT Broker 启动, 端口 1883); // --- 2. UDP 服务 (端口8888) 接收自定义传感器数据 --- Bootstrap udpBootstrap new Bootstrap(); udpBootstrap.group(udpGroup) .channel(NioDatagramChannel.class) .option(ChannelOption.SO_BROADCAST, true) .handler(new ChannelInitializerNioDatagramChannel() { Override protected void initChannel(NioDatagramChannel ch) { ch.pipeline().addLast(udpHandler, new UdpServerHandler()); } }); ChannelFuture udpFuture udpBootstrap.bind(8888).sync(); System.out.println([UDP] 自定义传感器服务启动, 端口 8888); tcpFuture.channel().closeFuture().sync(); udpFuture.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); udpGroup.shutdownGracefully(); } } }package com.jysemel.iot; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.socket.DatagramPacket; import java.nio.charset.StandardCharsets; public class UdpServerHandler extends SimpleChannelInboundHandlerDatagramPacket { Override protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket packet) { ByteBuf content packet.content(); String udpData content.toString(StandardCharsets.UTF_8); System.out.println([UDP] Received: udpData); // 将UDP数据转换为MQTT消息并广播给所有MQTT客户端 // 这里固定发布到 udp/sensor 主题 MqttServerHandler.broadcast(udp/sensor, udpData); } Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { System.out.println(UDP error: cause.getMessage()); } }
物联网 基于netty构建mqtt服务udp支持
物联网 基于netty构建mqtt服务udp支持简述源码代码物联网 基于netty构建mqtt服务udp支持简述应用中分别启动TCP和UDP两个服务实现协议的分离与消息互通源码https://gitee.com/kcnf-iot/iot-sample/tree/master/netty/netty-sample-02代码package com.jysemel.iot; import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioDatagramChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.mqtt.MqttDecoder; import io.netty.handler.codec.mqtt.MqttEncoder; public class DualProtocolServer { public static void main(String[] args) throws Exception { EventLoopGroup bossGroup new NioEventLoopGroup(1); EventLoopGroup workerGroup new NioEventLoopGroup(); EventLoopGroup udpGroup new NioEventLoopGroup(); // UDP工作组 try { // --- 1. MQTT over TCP 服务 (端口1883) --- ServerBootstrap tcpBootstrap new ServerBootstrap(); tcpBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializerSocketChannel() { Override protected void initChannel(SocketChannel ch) { ChannelPipeline p ch.pipeline(); p.addLast(new MqttDecoder(64 * 1024)); p.addLast(MqttEncoder.INSTANCE); p.addLast(mqttHandler, new MqttServerHandler()); // MQTT业务逻辑 } }); ChannelFuture tcpFuture tcpBootstrap.bind(1883).sync(); System.out.println([TCP] MQTT Broker 启动, 端口 1883); // --- 2. UDP 服务 (端口8888) 接收自定义传感器数据 --- Bootstrap udpBootstrap new Bootstrap(); udpBootstrap.group(udpGroup) .channel(NioDatagramChannel.class) .option(ChannelOption.SO_BROADCAST, true) .handler(new ChannelInitializerNioDatagramChannel() { Override protected void initChannel(NioDatagramChannel ch) { ch.pipeline().addLast(udpHandler, new UdpServerHandler()); } }); ChannelFuture udpFuture udpBootstrap.bind(8888).sync(); System.out.println([UDP] 自定义传感器服务启动, 端口 8888); tcpFuture.channel().closeFuture().sync(); udpFuture.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); udpGroup.shutdownGracefully(); } } }package com.jysemel.iot; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.socket.DatagramPacket; import java.nio.charset.StandardCharsets; public class UdpServerHandler extends SimpleChannelInboundHandlerDatagramPacket { Override protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket packet) { ByteBuf content packet.content(); String udpData content.toString(StandardCharsets.UTF_8); System.out.println([UDP] Received: udpData); // 将UDP数据转换为MQTT消息并广播给所有MQTT客户端 // 这里固定发布到 udp/sensor 主题 MqttServerHandler.broadcast(udp/sensor, udpData); } Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { System.out.println(UDP error: cause.getMessage()); } }