欢迎您访问 最编程 本站为您分享编程语言代码,编程技术文章!
您现在的位置是: 首页

全程实践Netty开发,从踩坑到解冑全过程(含源代码及Netty系列教程)- 掘金年度投稿精选

最编程 2024-07-29 19:44:54
...

在工作中用到Netty进行服务端开发,当服务端接收上位机字节数超过1000字节,服务端接收数据错误。为 什么服务端接收数据有时候没有问题,有时候会接收数据混乱。在不断的测试和对Netty知识点巩固,Netty 在项目开发中有了新的思考。

在TCP通信一定会出现粘包、拆包问题?如果不一定,什么时候出现粘包拆包问题?如果服务端单次接收字节 数超过1000字节的时候,byteBuf容量小于1000字节会出现什么问题?如何解决?

备注(Netty系列)

如果想要对网络编程和Netty是个小白,建议查看列表三篇文章。
1 七层协议和TCP/IP协议、三次握手四次挥手、BIO、NIO(Netty前置)
2 一文入门Netty(Netty一)
3 项目开发中如何选择编解码器?如何解决TCP粘包问题?(Netty二)

本文解决方案代码
GitHub代码地址
gitee代码地址

一 粘包(服务端客户端之间通信)

1.1 实例不出现粘包

1.1.1 代码实现

客户端以每500毫秒间隔向服务端发送相同字节数组,服务端将接收到字节转化为字符串,打印出来。
1. 服务端的配置类

public class TcpServer {
    private final int port;

    public TcpServer(int port) {
        this.port = port;
    }

    public void init() {
        NioEventLoopGroup boss = new NioEventLoopGroup();//主线程组
        NioEventLoopGroup work = new NioEventLoopGroup();//工作线程组
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();//引导对象
            bootstrap.group(boss, work);//配置工作线程组
            bootstrap.channel(NioServerSocketChannel.class);//配置为NIO的socket通道
            bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                protected void initChannel(SocketChannel ch) throws Exception {//绑定通道参数
                    ch.pipeline().addLast("logging", new LoggingHandler("DEBUG"));//设置log监听器,并且日志级别为debug,方便观察运行流程
                    ch.pipeline().addLast("encode", new EncoderHandler());//编码器。发送消息时候用过
                    ch.pipeline().addLast("decode", new DecoderHandler());//解码器,接收消息时候用
                    ch.pipeline().addLast("handler", new TcpServerHandler());//业务处理类,最终的消息会在这个handler中进行业务处理
                }
            });
            //使用了Future来启动线程,并绑定了端口
            ChannelFuture future = bootstrap.bind(port).sync();
            //以异步的方式关闭端口
            future.channel().closeFuture().sync();
        } catch (Exception e) {
        } finally {
            work.shutdownGracefully();
            //出现异常后,关闭线程组
            boss.shutdownGracefully();
        }

    }
}

2. 服务端解码器(decode)

public class DecoderHandler extends ByteToMessageDecoder {

    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        System.out.println("byteBuf的容量为:" + in.capacity());
        System.out.println("byteBuf的可读容量为:" + in.readableBytes());
        System.out.println("byteBuf的可写容量为:" + in.writableBytes());
        byte[] data = new byte[in.readableBytes()];
        //读取核心的数据
        in.readBytes(data);
        out.add(data);
    }
}

3. 服务端编码器(encode)

public class EncoderHandler extends MessageToByteEncoder {
    protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
        out.writeBytes((byte[]) msg);
    }
}

4. 服务端的ChannelHandler

public class TcpServerHandler extends ChannelInboundHandlerAdapter {


    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        byte[] recevierByte = (byte[]) msg;
        String recevierString = ByteTransform.bytesToHexString(recevierByte);
        System.out.println("-------------------长度为:" + recevierString.length());
        System.out.println("---tcp服务接受设备端加密数据:" + recevierString);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
    }

    /**
     * 客户端断开连接触发方法
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        super.channelInactive(ctx);
    }

    /**
     * 方法中报错,触发方法
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.exceptionCaught(ctx, cause);
    }
}

5. 服务端启动类

public class StartServer {
    public static void main(String[] args) {
        try {
            new TcpServer(5566).init();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

6. 工具类(知道方法是干嘛的就行)

public class ByteTransformIntUtil {
    /**
     * 根据byte转化为int
     */
    public static int getIntFromBytes(byte high_h, byte high_l, byte low_h, byte low_l) {
        return (high_h & 0xff) << 24 | (high_l & 0xff) << 16 | (low_h & 0xff) << 8 | low_l & 0xff;
    }
    /**
     * 根据byte转化为int
     */
    public static int getIntFromBytes(byte low_h, byte low_l) {
        return ByteTransformIntUtil.getIntFromBytes((byte)0,(byte)0,low_h,low_l);
    }
}

7. 客户端的配置类

public class MyRPCClient {

    public void start(String host, int port) throws Exception {
        //定义工作线程组
        EventLoopGroup worker = new NioEventLoopGroup();
        try {
            //注意:client使用的是Bootstrap
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(worker)
                    .channel(NioSocketChannel.class) //注意:client使用的是NioSocketChannel
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new MyClientHandler());
                        }
                    });
            //连接到远程服务
            ChannelFuture future = bootstrap.connect(host, port).sync();
            future.channel().closeFuture().sync();

        } finally {
            worker.shutdownGracefully();
        }
    }
}

8. 客户端启动类

public class StartClient {
    public static void main(String[] args) {
        new Thread() {
            public void run() {
                try {
                    new MyRPCClient().start("127.0.0.1", 5566);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }.start();
    }
}

9. 客户端ChannelHandler

public class MyClientHandler extends SimpleChannelInboundHandler<Object> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object buf) throws Exception {
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // 向服务端发送数据
        Timer timer = new Timer();
        final byte[] bytes = ByteTransform.hexStringToBytes("10000000");
        timer.scheduleAtFixedRate(new TimerTask() {
            public void run() {
                byte[] msg1 = {0x00,0x00,0x00,0x00,0x01, 0x01, 0x02,0x01};
                ctx.writeAndFlush(Unpooled.copiedBuffer(msg1));
            }
        }, 5000, 50);
    }


    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

    public int getIntFromBytes(byte high_h, byte high_l, byte low_h, byte low_l) {
        return (high_h & 0xff) << 24 | (high_l & 0xff) << 16 | (low_h & 0xff) << 8 | low_l & 0xff;
    }
}

1.1.2 对结果进行分析

查看客户端的Handler,向服务端发送数据的方法,是以50毫秒为间隔发送数据。但是服务端没有出现粘包问题。
总结 客户端向服务端有间隔时间向服务端发送数据,并不会出现粘包问题。

1.2 出现粘包问题

1.2.1 代码实例

客户端不间断的向服务端发字节数组(16个字节)

  1. 客户端ChannelHandler(只需要将上面客户端的MyClientHandler进行替换,其他代码依旧)
public class MyClientHandler extends SimpleChannelInboundHandler<Object> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object buf) throws Exception {

    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
       
        while (true){
            i = i + 1;
            byte[] msg = {0x00, 0x00, 0x00, 0x0c, 51, 52, 53, 54, 0x12, 0x11, 0x11, 0x00, 0x00, 109, 109, 98};
            ctx.writeAndFlush(Unpooled.copiedBuffer(msg));
        }
    }


    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

    public int getIntFromBytes(byte high_h, byte high_l, byte low_h, byte low_l) {
        return (high_h & 0xff) << 24 | (high_l & 0xff) << 16 | (low_h & 0xff) << 8 | low_l & 0xff;
    }
}

1.2.2 对结果进行分析

客户端不间断向服务端发送字节数组(16个字节),服务将接收的字节打印出来。

现象: 每次读取的字节长度发生了改变。
总结: 如果客户端同事接收多比数据会出现粘包问题。

二 拆包

2.1 不出现拆包问题

2.1.1 代码实现

public class MyClientHandler extends SimpleChannelInboundHandler<Object> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object buf) throws Exception {
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // 向服务端发送数据
        Timer timer = new Timer();
        final byte[] bytes = ByteTransform.hexStringToBytes("10000000");
        timer.scheduleAtFixedRate(new TimerTask() {
            public void run() {
                byte[] msg1 = {0x00,0x00,0x00,0x00,0x01, 0x01, 0x02,0x01};
                ctx.writeAndFlush(Unpooled.copiedBuffer(msg1));
            }
        }, 5000, 50);
    }


    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

    public int getIntFromBytes(byte high_h, byte high_l, byte low_h, byte low_l) {
        return (high_h & 0xff) << 24 | (high_l & 0xff) << 16 | (low_h & 0xff) << 8 | low_l & 0xff;
    }
}

2.1.2 结果分析


查看客户端的Handler,向服务端发送数据的方法,是以50毫秒为间隔发送数据。但是服务端没有出现粘包问题。
总结客户端向服务端发送若字节数组(十几个字节),并不会出现拆包问题。

2.2 实例netty拆包问题(一)

public class MyClientHandler extends SimpleChannelInboundHandler<Object> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object buf) throws Exception {
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // 向服务端发送数据
        Timer timer = new Timer();
        final byte[] bytes = ByteTransform.hexStringToBytes("10000000");
        timer.scheduleAtFixedRate(new TimerTask() {
            public void run() {
                byte[] msg = {0x01, 98, 49, 50, 51, 52, 53, 54, 0x12, 0x11, 0x11, 0x00, 0x00, 109, 109,
                            0x01, 98, 49, 50, 51, 52, 53, 54, 0x12, 0x11, 0x11, 0x00, 0x00, 109, 109,
                            0x01, 98, 49, 50, 51, 52, 53, 54, 0x12, 0x11, 0x11, 0x00, 0x00, 109, 109,
                            0x01, 98, 49, 50, 51, 52, 53, 54, 0x12, 0x11, 0x11, 0x00, 0x00, 109, 109,
                            0x01, 98, 49, 50, 51, 52, 53, 54, 0x12, 0x11, 0x11, 0x00, 0x00, 109, 109,
                            0x01, 98, 49, 50, 51, 52, 53, 54, 0x12, 0x11, 0x11, 0x00, 0x00, 109, 109,
                            0x01, 98, 49, 50, 51, 52, 53, 54, 0x12, 0x11, 0x11, 0x00, 0x00, 109, 109,
                            0x01, 98, 49, 50, 51, 52, 53, 54, 0x12, 0x11, 0x11, 0x00, 0x00, 109, 109,
                            0x01, 98, 49, 50, 51, 52, 53, 54, 0x12, 0x11, 0x11, 0x00, 0x00, 109, 109,
                            0x01, 98, 49, 50, 51, 52, 53, 54, 0x12, 0x11, 0x11, 0x00, 0x00, 109, 109,
                            0x01, 98, 49, 50, 51, 52, 53, 54, 0x12, 0x11, 0x11, 0x00, 0x00, 109, 109,
                            0x01, 98, 49, 50, 51, 52, 53, 54, 0x12, 0x11, 0x11, 0x00, 0x00, 109, 109,
                            0x01, 98, 49, 50, 51, 52, 53, 54, 0x12, 0x11, 0x11, 0x00, 0x00, 109, 109,
                            0x01, 98, 49, 50, 51, 52, 53, 54, 0x12, 0x11, 0x11, 0x00, 0x00, 109, 109,
                        0x01, 98, 49, 50, 51, 52, 53, 54, 0x12, 0x11, 0x11, 0x00, 0x00, 109, 109,
                        0x01, 98, 49, 50, 51, 52, 53, 54, 0x12, 0x11, 0x11, 0x00, 0x00, 109, 109,
                        0x01, 98, 49, 50, 51, 52, 53, 54, 0x12, 0x11, 0x11, 0x00, 0x00, 109, 109,
                        0x01, 98, 49, 50, 51, 52, 53, 54, 0x12, 0x11, 0x11, 0x00, 0x00, 109, 109,
                        0x01, 98, 49, 50, 51, 52, 53, 54, 0x12, 0x11, 0x11, 0x00, 0x00, 109, 109,
                        0x01, 98, 49, 50, 51, 52, 53, 54, 0x12, 0x11, 0x11, 0x00, 0x00, 109, 109,
                        0x01, 98, 49, 50, 51, 52, 53, 54, 0x12, 0x11, 0x11, 0x00, 0x00, 109, 109,
                        0x01, 98, 49, 50, 51, 52, 53, 54, 0x12, 0x11, 0x11, 0x00, 0x00, 109, 109,
                        0x01, 98, 49, 50, 51, 52, 53, 54, 0x12, 0x11, 0x11, 0x00, 0x00, 109, 109,
                        0x01, 98, 49, 50, 51, 52, 53, 54, 0x12, 0x11, 0x11, 0x00, 0x00, 109, 109,
                        0x01, 98, 49, 50, 51, 52, 53, 54, 0x12, 0x11, 0x11, 0x00, 0x00, 109, 109,
                        0x01, 98, 49, 50, 51, 52, 53, 54, 0x12, 0x11, 0x11, 0x00, 0x00, 109, 109,
                        0x01, 98, 49, 50, 51, 52, 53, 54, 0x12, 0x11, 0x11, 0x00, 0x00, 109, 109,
                        0x01, 98, 49, 50, 51, 52, 53, 54, 0x12, 0x11, 0x11, 0x00, 0x00, 109, 109,
                        0x01, 98, 49, 50, 51, 52, 53, 54, 0x12, 0x11, 0x11, 0x00, 0x00, 109, 109,
                        0x01, 98, 49, 50, 51, 52, 53, 54, 0x12, 0x11, 0x11, 0x00, 0x00, 109, 109,
                        0x01, 98, 49, 50, 51, 52, 53, 54, 0x12, 0x11, 0x11, 0x00, 0x00, 109, 109,
                        0x01, 98, 49, 50, 51, 52, 53, 54, 0x12, 0x11, 0x11, 0x00, 0x00, 109, 109,
                        0x01, 98, 49, 50, 51, 52, 53, 54, 0x12, 0x11, 0x11, 0x00, 0x00, 109, 109,
                        0x01, 98, 49, 50, 51, 52, 53, 54, 0x12, 0x11, 0x11, 0x00, 0x00, 109, 109,
                        0x01, 98, 49, 50, 51, 52, 53, 54, 0x12, 0x11, 0x11, 0x00, 0x00, 109, 109,
                        0x01, 98, 49, 50, 51, 52, 53, 54, 0x12, 0x11, 0x11, 0x00, 0x00, 109, 109,
                        0x01, 98, 49, 50, 51, 52, 53, 54, 0x12, 0x11, 0x11, 0x00, 0x00, 109, 109,


                        0x01, 98, 49, 50, 51, 52, 53, 54, 0x12, 0x11, 0x11, 0x00, 0x00, 109, 109,
                        0x01, 98, 49, 50, 51, 52, 53, 54, 0x12, 0x11, 0x11, 0x00, 0x00, 109, 109,
                        0x01, 98, 49, 50, 51, 52, 53, 54, 0x12, 0x11, 0x11, 0x00, 0x00, 109, 109,
                        0x01, 98, 49, 50, 51, 52, 53, 54, 0x12, 0x11, 0x11, 0x00, 0x00, 109, 109,
                        0x01, 98, 49, 50, 51, 52, 53, 54, 0x12, 0x11, 0x11, 0x00, 0x00, 109, 109,
                        0x01, 98, 49, 50, 51, 52, 53, 54, 0x12, 0x11, 0x11, 0x00, 0x00, 109, 109,
                        0x01, 98, 49, 50, 51, 52, 53, 54, 0x12, 0x11, 0x11, 0x00, 0x00, 109, 109,
                        0x01, 98, 49, 50, 51, 52, 53, 54, 0x12, 0x11, 0x11, 0x00, 0x00, 109, 109,
                        0x01, 98, 49, 50, 51, 52, 53, 54, 0x12, 0x11, 0x11, 0x00, 0x00, 109, 109,
                        0x01, 98, 49, 50, 51, 52, 53, 54, 0x12, 0x11, 0x11, 0x00, 0x00, 109, 109,
                        0x01, 98, 49, 50, 51, 52, 53, 54, 0x12, 0x11, 0x11, 0x00, 0x00, 109, 109,
                        0x01, 98, 49, 50, 51, 52, 53, 54, 0x12, 0x11, 0x11, 0x00, 0x00, 109, 109,
                        0x01, 98, 49, 50, 51, 52, 53, 54, 0x12, 0x11, 0x11, 0x00, 0x00, 109, 109,
                        0x01, 98, 49, 50, 51, 52, 53, 54, 0x12, 0x11, 0x11, 0x00, 0x00, 109, 109,
                        0x01, 98, 49, 50, 51, 52, 53, 54, 0x12, 0x11, 0x11, 0x00, 0x00, 109, 109,
                        0x01, 98, 49, 50, 51, 52, 53, 54, 0x12, 0x11, 0x11, 0x00, 0x00, 109, 109,
                        0x01, 98, 49, 50, 51, 52, 53, 54, 0x12, 0x11, 0x11, 0x00, 0x00, 109, 109,
                        0x01, 98, 49, 50, 51, 52, 53, 54, 0x12, 0x11, 0x11, 0x00, 0x00, 109, 109,
                        0x01, 98, 49, 50, 51, 52, 53, 54, 0x12, 0x11, 0x11, 0x00, 0x00, 109, 109,
                        0x01, 98, 49, 50, 51, 52, 53, 54, 0x12, 0x11, 0x11, 0x00, 0x00, 109, 109,
                        0x01, 98, 49, 50, 51, 52, 53, 54, 0x12, 0x11, 0x11, 0x00, 0x00, 109, 109,
                        0x01, 98, 49, 50, 51, 52, 53, 54, 0x12, 0x11, 0x11, 0x00, 0x00