欢迎您访问 最编程 本站为您分享编程语言代码,编程技术文章!
您现在的位置是: 首页

Java Netty 通信编程

最编程 2024-03-06 19:28:16
...

解决办法:为消息设置边界。

为消息设置边界通常有三种方法:

  • 消息定长:规定每个消息的长度,接收方按照固定长度读取数据。
  • 消息分隔符:在消息中添加分隔符,接收方根据分隔符拆分消息。
  • 使用特定协议:定义协议头,包含消息的长度等信息。

Netty也支持这三种处理方式,并提供了具体的类来进行支持

解决方式 解码 编码
消息定长

FixedLengthFrameDecoder

不内置,几乎没人用
消息分隔符

DelimiterBasedFrameDecoder

不内置,太简单
协议头中存储消息长度

LengthFieldBasedFrameDecoder

LengthFieldPrepender

本次代码示例中使用的就是第三种方式,“在协议头中存储消息长度”,这也是业界比较推荐的处理方式。

看一下代码实现:

/**
 * 功能: 解决TCP传输的粘包,半包问题
 * <p>
 * 各参数说明(按参数顺序)
 * maxFrameLength:指定解码时,帧的最大长度。如果超过这个长度,LengthFieldBasedFrameDecoder 将抛出 TooLongFrameException 异常。
 * lengthFieldOffset:指定长度字段的偏移量,即长度字段在帧中的起始位置的偏移量。例如,如果长度字段位于帧的起始位置,则 lengthFieldOffset 为 0。
 * lengthFieldLength:指定长度字段的字节长度。常见的取值为 1、2、4 字节。
 * lengthAdjustment:长度字段的值表示的是整个帧的长度,如果帧包含其他固定长度的字段,需要进行长度的调整。lengthAdjustment 就是用来进行这种调整的。
 * initialBytesToStrip:指定从解码帧中跳过的字节数。在某些协议中,头部信息可能包含一些与长度字段无关的额外信息,这些信息需要跳过。
 * failFast:如果设置为 true,则表示在帧长度超过 maxFrameLength 时立即抛出异常。如果设置为 false,则表示只有在实际解码时发现帧长度超过 maxFrameLength 时才抛出异常。
 * </p>
 * 时间: 2024/2/26 18:38
 */
public class TcpPacketDecoder extends LengthFieldBasedFrameDecoder {

    public TcpPacketDecoder() {
        super(Integer.MAX_VALUE, 0, 2, 0, 2, true);
    }

}

/**
 * 功能: 解决TCP传输的粘包,半包问题
 * 各参数说明(按参数顺序)
 * lengthFieldLength: 表示长度字段的字节数。仅支持 1,2,3,4,8
 * lengthAdjustment: 表示长度字段值的调整值。长度字段的值表示的是整个帧的长度,如果帧包含其他固定长度的字段,需要进行长度的调整。lengthAdjustment 就是用来进行这种调整的。
 * lengthIncludesLengthFieldLength: 表示长度字段的值是否包含长度字段的长度。如果设置为 true,则表示长度字段的值包含长度字段自身的长度;如果设置为 false,则表示长度字段的值仅包含有效数据的长度,不包含长度字段本身的长度。
 * 时间: 2024/2/27 23:59
 */
public class TcpPacketEncoder extends LengthFieldPrepender {
    public TcpPacketEncoder() {
        super(2, 0, false);
    }
}

ProtocolDecoder,ProtocolEncoder: 客户端与服务端进行通信的自定义协议的编解码器,用于将字节流解析成Java协议对象,或将Java协议对象编码成字节流。

本次代码通信协议使用json形式,协议对象为Message,有两个属性:请求头MessageHeader和MessageBody。请求对象RequestMessage和返回对象ResponseMessage分别继承于Message,类图如下:详情请看源码(文末有分享链接)

对应的的部分代码如下:

/**
 * 功能: 将ByteBuf的内容解析为约定的协议对象
 */
public class ProtocolDecoder extends MessageToMessageDecoder<ByteBuf> {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> out) throws Exception {
        RequestMessage requestMessage = new RequestMessage();
        requestMessage.decode(byteBuf);
        out.add(requestMessage);
    }
}

/**
 * 功能: 将协议对象转为ByteBuf
 */
public class ProtocolEncoder extends MessageToMessageEncoder<ResponseMessage> {
    @Override
    protected void encode(ChannelHandlerContext ctx, ResponseMessage responseMessage, List<Object> out) throws Exception {
        ByteBuf buffer = ctx.alloc().buffer();
        responseMessage.encode(buffer);
        out.add(buffer);
    }
}

/**
 * 功能: netty传输的自定义协议的消息体
 */
@Data
public abstract class Message<T extends MessageBody> {

    MessageHeader messageHeader;

    T messageBody;


    /**
     * 将ByteBuf解析为Message对象
     *
     * @param byteBuf
     */
    public void decode(ByteBuf byteBuf) {
        // 先读取请求信息中的头部字段
        int version = byteBuf.readInt();
        int opCode = byteBuf.readInt();
        String streamId = byteBuf.readCharSequence(32, CharsetUtil.UTF_8).toString();
        this.messageHeader = new MessageHeader(version, opCode, streamId);

        // 剩余的内容都是消息体了
        String bodyStr = byteBuf.toString(CharsetUtil.UTF_8);
        Class<T> messageBodyClass = getMessageBodyClassFromOpCode(opCode);
        this.messageBody = JSONObject.parseObject(bodyStr, messageBodyClass, JSONReader.Feature.SupportClassForName);
    }

    public abstract Class<T> getMessageBodyClassFromOpCode(int opCode);

    public void encode(ByteBuf buffer) {
        // 先将返回信息中的头部信息写入buffer
        buffer.writeInt(this.messageHeader.getVersion());
        buffer.writeInt(this.messageHeader.getOpCode());
        buffer.writeCharSequence(this.messageHeader.getStreamId(), CharsetUtil.UTF_8);

        // 再将body体转为json写入buffer
        buffer.writeCharSequence(JSON.toJSONString(this.messageBody), CharsetUtil.UTF_8);
    }
}

public class RequestMessage extends Message<Operation> {

    public RequestMessage(String streamId, Operation operation) {
        MessageHeader messageHeader = new MessageHeader();
        messageHeader.setStreamId(streamId);
        messageHeader.setOpCode(OperationEnum.fromOperationClazz(operation.getClass()).getOpCode());
        this.messageHeader = messageHeader;
        this.messageBody = operation;
    }

    @Override
    public Class getMessageBodyClassFromOpCode(int opCode) {
        return OperationEnum.fromOpCode(opCode).getOperationClazz();
    }
}

public class ResponseMessage extends Message<OperationResult> {

    @Override
    public Class getMessageBodyClassFromOpCode(int opCode) {
        return OperationEnum.fromOpCode(opCode).getOperationResultClazz();
    }

}

这里提一下上面的两层编解码的关系

关于ByteToMessageDecoder与MessageToMessageDecoder

一次解码器:ByteToMessageDecoder

io.netty.buffer.ByteBuf (原始数据流)-> io.netty.buffer.ByteBuf (用户数据)

解决tcp的粘包、半包问题就属于一次解码。

二次解码器:MessageToMessageDecoder<I>

io.netty.buffer.ByteBuf (用户数据)-> Java Object

二次解码后ChannelPipeline中的流转的就是解析好的Java协议对象了。

Netty的设计中将两次解码分层处理了,这样每层的任务比较清晰,也能达到解耦的效果。

常见的“二次”编解码方式

  • Java 序列化
  • Marshaling
  • XML
  • JSON
  • MessagePack
  • Protobuf
  • 其他

继续回到Handler的介绍上

ServiceProcessHandler:进行具体业务处理的Handler

本次代码中服务端接收的请求分为两类:探活请求与业务请求,具体的处理逻辑封装在了各自的Operation对象中。

public class ServiceProcessHandler extends SimpleChannelInboundHandler<RequestMessage> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RequestMessage requestMessage) throws Exception {
        log.info("消息到达ServiceProcessHandler,开始执行请求处理的具体逻辑,{}", JSON.toJSONString(requestMessage));
        OperationResult operationResult = requestMessage.getMessageBody().execute();

        ResponseMessage responseMessage = new ResponseMessage();
        responseMessage.setMessageHeader(requestMessage.getMessageHeader());
        responseMessage.setMessageBody(operationResult);

        if (ctx.channel().isActive() && ctx.channel().isWritable()) {
            ctx.writeAndFlush(responseMessage);
        } else {
            log.error("channel当前不可写,丢弃此消息:" + JSON.toJSONString(responseMessage));
        }

    }
}

public class BusinessOperation extends Operation {

    private long userId;

    @Override
    public OperationResult execute() {
        log.info("收到BusinessOperation消息");
        BusinessOperationResult operationResult = new BusinessOperationResult(userId, 18, "张三");
        log.info("BusinessOperation消息处理完成");
        return operationResult;
    }
}

public class KeepaliveOperation extends Operation {

    private long time;

    public KeepaliveOperation() {
        this.time = System.nanoTime();
    }

    @Override
    public OperationResult execute() {
        return new KeepaliveOperationResult(time);
    }
}

ServiceProcessHandler处理完请求后就封装好ResponseMessage,经过两层编码后返回给客户端了。

客户端代码

客户端启动类

public class NettyClient {

    //此客户端对象拥有的channel
    private Channel channel;

    // 定义一个静态的EVENT_LOOP_GROUP,客户端所有channel共用这一个EVENT_LOOP_GROUP
    public static final EventLoopGroup EVENT_LOOP_GROUP = new NioEventLoopGroup();

    // 请求等待中心,存放等待请求结果返回的Future对象
    public static final RequestPendingCenter REQUEST_PENDING_CENTER = new RequestPendingCenter();

    public synchronized void initClient() {
        try {
            // 双重检查,防止并发访问时多次初始化
            if (this.channel != null) {
                return;
            }
            log.info("开始初始化客户端并建立连接");
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(EVENT_LOOP_GROUP)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline()
                                    // 空闲检测
                                    .addLast("idleCheck", new ClientIdleCheckHandler())
                                    // netty日志
                                    .addLast("logHandler", new LoggingHandler())
                                    // tcp粘包、半包处理
                                    .addLast("tcpPacketDecoder", new TcpPacketDecoder())
                                    .addLast("tcpPacketEncoder", new TcpPacketEncoder())
                                    // 将ByteBuf解析为双端约定好的通信协议对象
                                    .addLast("protocolDecoder", new ProtocolDecoder())
                                    .addLast("protocolEncoder", new ProtocolEncoder())
                                    // 空闲检测具体处理逻辑
                                    .addLast("keepaliveHandler", new KeepaliveHandler())
                                    // 返回结果分发处理
                                    .addLast("responseDispatcherHandler", new ResponseDispatcherHandler(REQUEST_PENDING_CENTER));
                        }
                    });
            //连接服务端
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9001).sync();
            //将channel赋给实例属性,方便对channel进行复用
            this.channel = channelFuture.channel();
            log.info("客户端初始化成功");
            //对通道关闭进行监听,这里不能一直等待,否则会阻塞线程
//            channelFuture.channel().closeFuture().sync();

        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            // 假设 EVENT_LOOP_GROUP 被用于了多个channel, 此channel关闭时,不能将 EVENT_LOOP_GROUP 关掉,确实需要关闭时通过 closeAll 方法关闭
//            EVENT_LOOP_GROUP.shutdownGracefully();
        }
    }

    /**
     * 发送消息
     */
    public OperationResult send(Operation operation) throws ExecutionException, InterruptedException {
        if (this.channel == null) {
            initClient();
        }
        String streamId = IdUtil.fastSimpleUUID();
        // 创建一个Future对象,并放入"请求等待中心"中,等待结果返回
        OperationRequestFuture requestFuture = new OperationRequestFuture();
        REQUEST_PENDING_CENTER.add(streamId, requestFuture);

        // 发送消息给对端
        RequestMessage requestMessage = new RequestMessage(streamId, operation);
        this.channel.writeAndFlush(requestMessage);

        // 阻塞等待返回结果,这是Future的特性
        // 当ResponseDispatcherHandler将返回结果set进Future对象后,requestFuture.get()就能拿到结果并结束阻塞了
        OperationResult operationResult = requestFuture.get();
        return operationResult;
    }

    /**
     * 关闭eventLoop线程池
     */
    public static void closeAll() {
        EVENT_LOOP_GROUP.shutdownGracefully();
    }

    /**
     * 通过main方法启动客户端
     */
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        try {
            NettyClient nettyClient = new NettyClient();
            for (int i = 1; i <= 10; i++) {
                BusinessOperation businessOperation = new BusinessOperation();
                businessOperation.setUserId(i);
                OperationResult result = nettyClient.send(businessOperation);
                log.info("future服务端返回结果:{}", JSON.toJSONString(result));
                ThreadUtil.sleep(10, TimeUnit.SECONDS);
            }
            log.info("调用结束");
        } finally {
            NettyClient.closeAll();
        }
    }

}

客户端与服务端的“两次”编解码方式基本相同,主要的不同在于空闲检测的处理与响应结果的处理上。

ClientIdleCheckHandler,KeepaliveHandler:空闲检测与相应处理。

本次代码中,客户端检测5秒内是否有写事件发生,如果没有的话,就向服务端发送一个探活包,以保活它与服务端之间的连接。(毕竟我们这个服务端比较傲娇,10秒内没收到客户端消息的话就主动断开连接了)

public class ClientIdleCheckHandler extends IdleStateHandler {
    public ClientIdleCheckHandler() {
        super(0, 5, 0, TimeUnit.SECONDS);
    }
}

public class KeepaliveHandler extends ChannelInboundHandlerAdapter {
    
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt == IdleStateEvent.FIRST_WRITER_IDLE_STATE_EVENT) {
            log.info("检测到写空闲,即将发送探活包");
            KeepaliveOperation keepaliveOperation = new KeepaliveOperation();
            String streamId = IdUtil.fastSimpleUUID();
            RequestMessage requestMessage = new RequestMessage(streamId, keepaliveOperation);
            ctx.writeAndFlush(requestMessage);
            log.info("探活包发送完成");
        }

        super.userEventTriggered(ctx, evt);
    }
}

ResponseDispatcherHandler:接收到服务端的返回结果,并赋值给对应Future。

客户端发消息的设计里涉及到对Future的使用,因为客户端也是使用的Netty的nio模式,所以请求发出后,代码并不会阻塞等待结果返回,那怎么实现阻塞等待的效果呢?那就使用Future。这里我使用的Future是Netty中内置的DefaultPromise,这个Future支持我们手动给Future赋值以取消阻塞。google guava包中的SettableFuture也能实现类似的效果。

使用Future进行阻塞等待的流程如下图:

代码如下:

public class OperationRequestFuture extends DefaultPromise<OperationResult> {

}

/**
 * 功能: 请求等待中心,存放等待返回结果的Future
 */
@Slf4j
public class RequestPendingCenter {

    private Map<String, OperationRequestFuture> requestFutureMap = new ConcurrentHashMap<>();

    public void add(String streamId, OperationRequestFuture operationRequestFuture) {
        requestFutureMap.put(streamId, operationRequestFuture);
    }

    public void set(String streamId, OperationResult operationResult) {
        if (requestFutureMap.containsKey(streamId)) {
            requestFutureMap.get(streamId).setSuccess(operationResult);
            requestFutureMap.remove(streamId);
            return;
        }
        log.error("streamId:{} 在请求等待集合中不存在", streamId);
    }
    
}

public class ResponseDispatcherHandler extends SimpleChannelInboundHandler<ResponseMessage> {

    private RequestPendingCenter requestPendingCenter;

    public ResponseDispatcherHandler(RequestPendingCenter requestPendingCenter) {
        this.requestPendingCenter = requestPendingCenter;
    }

    /**
     * 这里处理服务端返回结果,并放入Future中
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ResponseMessage responseMessage) throws Exception {
        String streamId = responseMessage.getMessageHeader().getStreamId();
        OperationResult operationResult = responseMessage.getMessageBody();
        log.info("收到服务端回复:{}", JSON.toJSONString(operationResult));
        // 通过"请求等待中心",将返回结果赋值给指定Future
        requestPendingCenter.set(streamId, operationResult);
    }
}

至此,客户端与服务端就编写完成了,后面我会继续基于这个代码实现一个spring+netty的rpc调用工程,敬请期待!

源码分享

为了方便调试,我将客户端与服务端分了两个项目

https://gitee.com/huo-ming-lu/RpcDemoServer
https://gitee.com/huo-ming-lu/RpcDemoClient

本篇文章涉及到的类均在 com.example.rpcdemo.nio.netty 包下,敬请查阅。