Java Netty 通信编程
解决办法:为消息设置边界。
为消息设置边界通常有三种方法:
- 消息定长:规定每个消息的长度,接收方按照固定长度读取数据。
- 消息分隔符:在消息中添加分隔符,接收方根据分隔符拆分消息。
- 使用特定协议:定义协议头,包含消息的长度等信息。
Netty也支持这三种处理方式,并提供了具体的类来进行支持
解决方式 | 解码 | 编码 |
---|---|---|
消息定长 | FixedLengthFrameDecoder |
不内置,几乎没人用 |
消息分隔符 | DelimiterBasedFrameDecoder |
不内置,太简单 |
协议头中存储消息长度 | LengthFieldBasedFrameDecoder |
LengthFieldPrepender |
本次代码示例中使用的就是第三种方式,“在协议头中存储消息长度”,这也是业界比较推荐的处理方式。
看一下代码实现:
/**
* 功能: 解决TCP传输的粘包,半包问题
* <p>
* 各参数说明(按参数顺序)
* maxFrameLength:指定解码时,帧的最大长度。如果超过这个长度,LengthFieldBasedFrameDecoder 将抛出 TooLongFrameException 异常。
* lengthFieldOffset:指定长度字段的偏移量,即长度字段在帧中的起始位置的偏移量。例如,如果长度字段位于帧的起始位置,则 lengthFieldOffset 为 0。
* lengthFieldLength:指定长度字段的字节长度。常见的取值为 1、2、4 字节。
* lengthAdjustment:长度字段的值表示的是整个帧的长度,如果帧包含其他固定长度的字段,需要进行长度的调整。lengthAdjustment 就是用来进行这种调整的。
* initialBytesToStrip:指定从解码帧中跳过的字节数。在某些协议中,头部信息可能包含一些与长度字段无关的额外信息,这些信息需要跳过。
* failFast:如果设置为 true,则表示在帧长度超过 maxFrameLength 时立即抛出异常。如果设置为 false,则表示只有在实际解码时发现帧长度超过 maxFrameLength 时才抛出异常。
* </p>
* 时间: 2024/2/26 18:38
*/
public class TcpPacketDecoder extends LengthFieldBasedFrameDecoder {
public TcpPacketDecoder() {
super(Integer.MAX_VALUE, 0, 2, 0, 2, true);
}
}
/**
* 功能: 解决TCP传输的粘包,半包问题
* 各参数说明(按参数顺序)
* lengthFieldLength: 表示长度字段的字节数。仅支持 1,2,3,4,8
* lengthAdjustment: 表示长度字段值的调整值。长度字段的值表示的是整个帧的长度,如果帧包含其他固定长度的字段,需要进行长度的调整。lengthAdjustment 就是用来进行这种调整的。
* lengthIncludesLengthFieldLength: 表示长度字段的值是否包含长度字段的长度。如果设置为 true,则表示长度字段的值包含长度字段自身的长度;如果设置为 false,则表示长度字段的值仅包含有效数据的长度,不包含长度字段本身的长度。
* 时间: 2024/2/27 23:59
*/
public class TcpPacketEncoder extends LengthFieldPrepender {
public TcpPacketEncoder() {
super(2, 0, false);
}
}
ProtocolDecoder,ProtocolEncoder: 客户端与服务端进行通信的自定义协议的编解码器,用于将字节流解析成Java协议对象,或将Java协议对象编码成字节流。
本次代码通信协议使用json形式,协议对象为Message,有两个属性:请求头MessageHeader和MessageBody。请求对象RequestMessage和返回对象ResponseMessage分别继承于Message,类图如下:详情请看源码(文末有分享链接)
对应的的部分代码如下:
/**
* 功能: 将ByteBuf的内容解析为约定的协议对象
*/
public class ProtocolDecoder extends MessageToMessageDecoder<ByteBuf> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> out) throws Exception {
RequestMessage requestMessage = new RequestMessage();
requestMessage.decode(byteBuf);
out.add(requestMessage);
}
}
/**
* 功能: 将协议对象转为ByteBuf
*/
public class ProtocolEncoder extends MessageToMessageEncoder<ResponseMessage> {
@Override
protected void encode(ChannelHandlerContext ctx, ResponseMessage responseMessage, List<Object> out) throws Exception {
ByteBuf buffer = ctx.alloc().buffer();
responseMessage.encode(buffer);
out.add(buffer);
}
}
/**
* 功能: netty传输的自定义协议的消息体
*/
@Data
public abstract class Message<T extends MessageBody> {
MessageHeader messageHeader;
T messageBody;
/**
* 将ByteBuf解析为Message对象
*
* @param byteBuf
*/
public void decode(ByteBuf byteBuf) {
// 先读取请求信息中的头部字段
int version = byteBuf.readInt();
int opCode = byteBuf.readInt();
String streamId = byteBuf.readCharSequence(32, CharsetUtil.UTF_8).toString();
this.messageHeader = new MessageHeader(version, opCode, streamId);
// 剩余的内容都是消息体了
String bodyStr = byteBuf.toString(CharsetUtil.UTF_8);
Class<T> messageBodyClass = getMessageBodyClassFromOpCode(opCode);
this.messageBody = JSONObject.parseObject(bodyStr, messageBodyClass, JSONReader.Feature.SupportClassForName);
}
public abstract Class<T> getMessageBodyClassFromOpCode(int opCode);
public void encode(ByteBuf buffer) {
// 先将返回信息中的头部信息写入buffer
buffer.writeInt(this.messageHeader.getVersion());
buffer.writeInt(this.messageHeader.getOpCode());
buffer.writeCharSequence(this.messageHeader.getStreamId(), CharsetUtil.UTF_8);
// 再将body体转为json写入buffer
buffer.writeCharSequence(JSON.toJSONString(this.messageBody), CharsetUtil.UTF_8);
}
}
public class RequestMessage extends Message<Operation> {
public RequestMessage(String streamId, Operation operation) {
MessageHeader messageHeader = new MessageHeader();
messageHeader.setStreamId(streamId);
messageHeader.setOpCode(OperationEnum.fromOperationClazz(operation.getClass()).getOpCode());
this.messageHeader = messageHeader;
this.messageBody = operation;
}
@Override
public Class getMessageBodyClassFromOpCode(int opCode) {
return OperationEnum.fromOpCode(opCode).getOperationClazz();
}
}
public class ResponseMessage extends Message<OperationResult> {
@Override
public Class getMessageBodyClassFromOpCode(int opCode) {
return OperationEnum.fromOpCode(opCode).getOperationResultClazz();
}
}
这里提一下上面的两层编解码的关系
关于ByteToMessageDecoder与MessageToMessageDecoder
一次解码器:ByteToMessageDecoder
io.netty.buffer.ByteBuf (原始数据流)-> io.netty.buffer.ByteBuf (用户数据)
解决tcp的粘包、半包问题就属于一次解码。
二次解码器:MessageToMessageDecoder<I>
io.netty.buffer.ByteBuf (用户数据)-> Java Object
二次解码后ChannelPipeline中的流转的就是解析好的Java协议对象了。
Netty的设计中将两次解码分层处理了,这样每层的任务比较清晰,也能达到解耦的效果。
常见的“二次”编解码方式
- Java 序列化
- Marshaling
- XML
- JSON
- MessagePack
- Protobuf
- 其他
继续回到Handler的介绍上
ServiceProcessHandler:进行具体业务处理的Handler
本次代码中服务端接收的请求分为两类:探活请求与业务请求,具体的处理逻辑封装在了各自的Operation对象中。
public class ServiceProcessHandler extends SimpleChannelInboundHandler<RequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RequestMessage requestMessage) throws Exception {
log.info("消息到达ServiceProcessHandler,开始执行请求处理的具体逻辑,{}", JSON.toJSONString(requestMessage));
OperationResult operationResult = requestMessage.getMessageBody().execute();
ResponseMessage responseMessage = new ResponseMessage();
responseMessage.setMessageHeader(requestMessage.getMessageHeader());
responseMessage.setMessageBody(operationResult);
if (ctx.channel().isActive() && ctx.channel().isWritable()) {
ctx.writeAndFlush(responseMessage);
} else {
log.error("channel当前不可写,丢弃此消息:" + JSON.toJSONString(responseMessage));
}
}
}
public class BusinessOperation extends Operation {
private long userId;
@Override
public OperationResult execute() {
log.info("收到BusinessOperation消息");
BusinessOperationResult operationResult = new BusinessOperationResult(userId, 18, "张三");
log.info("BusinessOperation消息处理完成");
return operationResult;
}
}
public class KeepaliveOperation extends Operation {
private long time;
public KeepaliveOperation() {
this.time = System.nanoTime();
}
@Override
public OperationResult execute() {
return new KeepaliveOperationResult(time);
}
}
ServiceProcessHandler处理完请求后就封装好ResponseMessage,经过两层编码后返回给客户端了。
客户端代码
客户端启动类
public class NettyClient {
//此客户端对象拥有的channel
private Channel channel;
// 定义一个静态的EVENT_LOOP_GROUP,客户端所有channel共用这一个EVENT_LOOP_GROUP
public static final EventLoopGroup EVENT_LOOP_GROUP = new NioEventLoopGroup();
// 请求等待中心,存放等待请求结果返回的Future对象
public static final RequestPendingCenter REQUEST_PENDING_CENTER = new RequestPendingCenter();
public synchronized void initClient() {
try {
// 双重检查,防止并发访问时多次初始化
if (this.channel != null) {
return;
}
log.info("开始初始化客户端并建立连接");
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(EVENT_LOOP_GROUP)
.option(ChannelOption.TCP_NODELAY, true)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline()
// 空闲检测
.addLast("idleCheck", new ClientIdleCheckHandler())
// netty日志
.addLast("logHandler", new LoggingHandler())
// tcp粘包、半包处理
.addLast("tcpPacketDecoder", new TcpPacketDecoder())
.addLast("tcpPacketEncoder", new TcpPacketEncoder())
// 将ByteBuf解析为双端约定好的通信协议对象
.addLast("protocolDecoder", new ProtocolDecoder())
.addLast("protocolEncoder", new ProtocolEncoder())
// 空闲检测具体处理逻辑
.addLast("keepaliveHandler", new KeepaliveHandler())
// 返回结果分发处理
.addLast("responseDispatcherHandler", new ResponseDispatcherHandler(REQUEST_PENDING_CENTER));
}
});
//连接服务端
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9001).sync();
//将channel赋给实例属性,方便对channel进行复用
this.channel = channelFuture.channel();
log.info("客户端初始化成功");
//对通道关闭进行监听,这里不能一直等待,否则会阻塞线程
// channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
// 假设 EVENT_LOOP_GROUP 被用于了多个channel, 此channel关闭时,不能将 EVENT_LOOP_GROUP 关掉,确实需要关闭时通过 closeAll 方法关闭
// EVENT_LOOP_GROUP.shutdownGracefully();
}
}
/**
* 发送消息
*/
public OperationResult send(Operation operation) throws ExecutionException, InterruptedException {
if (this.channel == null) {
initClient();
}
String streamId = IdUtil.fastSimpleUUID();
// 创建一个Future对象,并放入"请求等待中心"中,等待结果返回
OperationRequestFuture requestFuture = new OperationRequestFuture();
REQUEST_PENDING_CENTER.add(streamId, requestFuture);
// 发送消息给对端
RequestMessage requestMessage = new RequestMessage(streamId, operation);
this.channel.writeAndFlush(requestMessage);
// 阻塞等待返回结果,这是Future的特性
// 当ResponseDispatcherHandler将返回结果set进Future对象后,requestFuture.get()就能拿到结果并结束阻塞了
OperationResult operationResult = requestFuture.get();
return operationResult;
}
/**
* 关闭eventLoop线程池
*/
public static void closeAll() {
EVENT_LOOP_GROUP.shutdownGracefully();
}
/**
* 通过main方法启动客户端
*/
public static void main(String[] args) throws InterruptedException, ExecutionException {
try {
NettyClient nettyClient = new NettyClient();
for (int i = 1; i <= 10; i++) {
BusinessOperation businessOperation = new BusinessOperation();
businessOperation.setUserId(i);
OperationResult result = nettyClient.send(businessOperation);
log.info("future服务端返回结果:{}", JSON.toJSONString(result));
ThreadUtil.sleep(10, TimeUnit.SECONDS);
}
log.info("调用结束");
} finally {
NettyClient.closeAll();
}
}
}
客户端与服务端的“两次”编解码方式基本相同,主要的不同在于空闲检测的处理与响应结果的处理上。
ClientIdleCheckHandler,KeepaliveHandler:空闲检测与相应处理。
本次代码中,客户端检测5秒内是否有写事件发生,如果没有的话,就向服务端发送一个探活包,以保活它与服务端之间的连接。(毕竟我们这个服务端比较傲娇,10秒内没收到客户端消息的话就主动断开连接了)
public class ClientIdleCheckHandler extends IdleStateHandler {
public ClientIdleCheckHandler() {
super(0, 5, 0, TimeUnit.SECONDS);
}
}
public class KeepaliveHandler extends ChannelInboundHandlerAdapter {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt == IdleStateEvent.FIRST_WRITER_IDLE_STATE_EVENT) {
log.info("检测到写空闲,即将发送探活包");
KeepaliveOperation keepaliveOperation = new KeepaliveOperation();
String streamId = IdUtil.fastSimpleUUID();
RequestMessage requestMessage = new RequestMessage(streamId, keepaliveOperation);
ctx.writeAndFlush(requestMessage);
log.info("探活包发送完成");
}
super.userEventTriggered(ctx, evt);
}
}
ResponseDispatcherHandler:接收到服务端的返回结果,并赋值给对应Future。
客户端发消息的设计里涉及到对Future的使用,因为客户端也是使用的Netty的nio模式,所以请求发出后,代码并不会阻塞等待结果返回,那怎么实现阻塞等待的效果呢?那就使用Future。这里我使用的Future是Netty中内置的DefaultPromise,这个Future支持我们手动给Future赋值以取消阻塞。google guava包中的SettableFuture也能实现类似的效果。
使用Future进行阻塞等待的流程如下图:
代码如下:
public class OperationRequestFuture extends DefaultPromise<OperationResult> {
}
/**
* 功能: 请求等待中心,存放等待返回结果的Future
*/
@Slf4j
public class RequestPendingCenter {
private Map<String, OperationRequestFuture> requestFutureMap = new ConcurrentHashMap<>();
public void add(String streamId, OperationRequestFuture operationRequestFuture) {
requestFutureMap.put(streamId, operationRequestFuture);
}
public void set(String streamId, OperationResult operationResult) {
if (requestFutureMap.containsKey(streamId)) {
requestFutureMap.get(streamId).setSuccess(operationResult);
requestFutureMap.remove(streamId);
return;
}
log.error("streamId:{} 在请求等待集合中不存在", streamId);
}
}
public class ResponseDispatcherHandler extends SimpleChannelInboundHandler<ResponseMessage> {
private RequestPendingCenter requestPendingCenter;
public ResponseDispatcherHandler(RequestPendingCenter requestPendingCenter) {
this.requestPendingCenter = requestPendingCenter;
}
/**
* 这里处理服务端返回结果,并放入Future中
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, ResponseMessage responseMessage) throws Exception {
String streamId = responseMessage.getMessageHeader().getStreamId();
OperationResult operationResult = responseMessage.getMessageBody();
log.info("收到服务端回复:{}", JSON.toJSONString(operationResult));
// 通过"请求等待中心",将返回结果赋值给指定Future
requestPendingCenter.set(streamId, operationResult);
}
}
至此,客户端与服务端就编写完成了,后面我会继续基于这个代码实现一个spring+netty的rpc调用工程,敬请期待!
源码分享
为了方便调试,我将客户端与服务端分了两个项目
https://gitee.com/huo-ming-lu/RpcDemoServer
https://gitee.com/huo-ming-lu/RpcDemoClient
本篇文章涉及到的类均在 com.example.rpcdemo.nio.netty 包下,敬请查阅。
推荐阅读
-
Java 智能电表对接 智能电表如何通信
-
华为 OD 机测试 - 跳格 3 - 动态编程(Java 2024 C 卷 200 分) - III.
-
如何用 Java 语言编程,如何输入 char 类型的字符
-
Java 网络编程之 TCP(五):分析服务器端注册 OP_WRITE 写入数据的各种情况(二)
-
Java 编程思想(第 5 版)--23 个枚举
-
Java 编程进阶 09]Java 单例模式深度解析:从懒汉到枚举的进化之旅
-
致命的 GMSSL 通信-Java/Netty 系列(III)
-
贝塔分布采样的 Java 编程实现或采样示例代码
-
编写单元测试案例流程的 Java 编程技能
-
JAVA 编程蜘蛛接龙_介绍程序员玩接龙游戏