上一篇【第52篇】Netty秒杀系统——高并发网络接入层实战下一篇【第54篇】Netty在Elasticsearch中的应用——分布式搜索引擎的网络通信一、Dubbo网络通信架构------------------ ------------------ | Consumer | | Provider | | (NettyClient) | | (NettyServer) | ------------------ ------------------ | Transport层 | | Transport层 | | Exchange层 | | Exchange层 | | Protocol层 | | Protocol层 | ------------------ ------------------ | | ---------------------- | Dubbo协议基于Netty二、Dubbo协议格式Dubbo RPC协议帧 ------------------------------------------------ | 魔数(0xdabb,2B) | 标志位(1B) | 状态(1B) | ID(8B) | ------------------------------------------------ | 数据长度(4B) | ------------------------------------------------ | 序列化数据(body, variable) | ------------------------------------------------标志位含义bit 0-2序列化类型(0Hessian, 2JSON, 6Kryo, 8Protobuf)bit 3-4事件类型(0Request, 1Response, 2心跳)bit 5是否双向通信bit 6-7保留三、NettyServer初始化// Dubbo 3.x中的NettyServerpublicclassNettyServerextendsAbstractServer{protectedvoiddoOpen(){bootstrapnewServerBootstrap();bootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).childOption(ChannelOption.TCP_NODELAY,true).childOption(ChannelOption.SO_KEEPALIVE,true).childHandler(newChannelInitializerSocketChannel(){protectedvoidinitChannel(SocketChannelch){// Dubbo编解码器ch.pipeline().addLast(decoder,newDubboCountCodec());ch.pipeline().addLast(encoder,newDubboCountCodec());// Dubbo业务处理器ch.pipeline().addLast(handler,newNettyServerHandler());}});ChannelFuturefbootstrap.bind(getBindAddress());}}四、Dubbo编解码器// DubboCodec 长度字段编解码 Dubbo协议publicclassDubboCodecextendsExchangeCodec{protectedvoidencodeRequestData(Channelchannel,Objectoutput,ByteBufbuffer){// 序列化RPC调用数据dubbo版本、服务名、方法名、参数等RpcInvocationinv(RpcInvocation)output;ObjectOutputoutCodecSupport.getSerialization(channel.getUrl()).serialize(channel.getUrl(),buffer);out.writeUTF(inv.getAttachment(DUBBO_VERSION_KEY));out.writeUTF(inv.getAttachment(PATH_KEY));// 服务名out.writeUTF(inv.getAttachment(VERSION_KEY));out.writeUTF(inv.getMethodName());// 方法名out.writeUTF(inv.getParameterTypesDesc());// 参数类型// 写入参数值for(Objectarg:inv.getArguments()){out.writeObject(arg);}}}五、Dubbo心跳机制// 客户端发送心跳publicclassHeartbeatTimerTaskimplementsRunnable{publicvoidrun(){if(System.currentTimeMillis()-lastReadTimeheartbeat){// 发送心跳请求RequestreqnewRequest();req.setVersion(Version.getProtocolVersion());req.setTwoWay(true);req.setEvent(Request.HEARTBEAT_EVENT);channel.send(req);}}}// 服务端响应心跳if(request.isHeartbeat()){ResponseresponsenewResponse();response.setEvent(Response.HEARTBEAT_EVENT);channel.send(response);return;}六、总结组件作用NettyServer/NettyClientTransport层实现DubboCodecDubbo协议编解码ExchangeHandler请求-响应模型心跳机制60秒空闲检测上一篇【第52篇】Netty秒杀系统——高并发网络接入层实战下一篇【第54篇】Netty在Elasticsearch中的应用——分布式搜索引擎的网络通信
【Netty源码解读和权威指南】第53篇:Netty在Dubbo中的应用——Dubbo网络通信层深度解析
上一篇【第52篇】Netty秒杀系统——高并发网络接入层实战下一篇【第54篇】Netty在Elasticsearch中的应用——分布式搜索引擎的网络通信一、Dubbo网络通信架构------------------ ------------------ | Consumer | | Provider | | (NettyClient) | | (NettyServer) | ------------------ ------------------ | Transport层 | | Transport层 | | Exchange层 | | Exchange层 | | Protocol层 | | Protocol层 | ------------------ ------------------ | | ---------------------- | Dubbo协议基于Netty二、Dubbo协议格式Dubbo RPC协议帧 ------------------------------------------------ | 魔数(0xdabb,2B) | 标志位(1B) | 状态(1B) | ID(8B) | ------------------------------------------------ | 数据长度(4B) | ------------------------------------------------ | 序列化数据(body, variable) | ------------------------------------------------标志位含义bit 0-2序列化类型(0Hessian, 2JSON, 6Kryo, 8Protobuf)bit 3-4事件类型(0Request, 1Response, 2心跳)bit 5是否双向通信bit 6-7保留三、NettyServer初始化// Dubbo 3.x中的NettyServerpublicclassNettyServerextendsAbstractServer{protectedvoiddoOpen(){bootstrapnewServerBootstrap();bootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).childOption(ChannelOption.TCP_NODELAY,true).childOption(ChannelOption.SO_KEEPALIVE,true).childHandler(newChannelInitializerSocketChannel(){protectedvoidinitChannel(SocketChannelch){// Dubbo编解码器ch.pipeline().addLast(decoder,newDubboCountCodec());ch.pipeline().addLast(encoder,newDubboCountCodec());// Dubbo业务处理器ch.pipeline().addLast(handler,newNettyServerHandler());}});ChannelFuturefbootstrap.bind(getBindAddress());}}四、Dubbo编解码器// DubboCodec 长度字段编解码 Dubbo协议publicclassDubboCodecextendsExchangeCodec{protectedvoidencodeRequestData(Channelchannel,Objectoutput,ByteBufbuffer){// 序列化RPC调用数据dubbo版本、服务名、方法名、参数等RpcInvocationinv(RpcInvocation)output;ObjectOutputoutCodecSupport.getSerialization(channel.getUrl()).serialize(channel.getUrl(),buffer);out.writeUTF(inv.getAttachment(DUBBO_VERSION_KEY));out.writeUTF(inv.getAttachment(PATH_KEY));// 服务名out.writeUTF(inv.getAttachment(VERSION_KEY));out.writeUTF(inv.getMethodName());// 方法名out.writeUTF(inv.getParameterTypesDesc());// 参数类型// 写入参数值for(Objectarg:inv.getArguments()){out.writeObject(arg);}}}五、Dubbo心跳机制// 客户端发送心跳publicclassHeartbeatTimerTaskimplementsRunnable{publicvoidrun(){if(System.currentTimeMillis()-lastReadTimeheartbeat){// 发送心跳请求RequestreqnewRequest();req.setVersion(Version.getProtocolVersion());req.setTwoWay(true);req.setEvent(Request.HEARTBEAT_EVENT);channel.send(req);}}}// 服务端响应心跳if(request.isHeartbeat()){ResponseresponsenewResponse();response.setEvent(Response.HEARTBEAT_EVENT);channel.send(response);return;}六、总结组件作用NettyServer/NettyClientTransport层实现DubboCodecDubbo协议编解码ExchangeHandler请求-响应模型心跳机制60秒空闲检测上一篇【第52篇】Netty秒杀系统——高并发网络接入层实战下一篇【第54篇】Netty在Elasticsearch中的应用——分布式搜索引擎的网络通信