欢迎您访问 最编程 本站为您分享编程语言代码,编程技术文章!
您现在的位置是: 首页

实战讲解:Netty在TCP协议中的应用实例

最编程 2024-07-30 14:14:26
...
package com.example.demo.tcp.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class TCPServer {
public static void bind(int port) {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
//服务端要建立两个group,一个负责接收客户端的连接,一个负责处理数据传输
//连接处理group
EventLoopGroup boss = new NioEventLoopGroup();
//事件处理group
EventLoopGroup worker = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
// 绑定处理group
bootstrap.group(boss, worker).channel(NioServerSocketChannel.class)
//保持连接的数目
.option(ChannelOption.SO_BACKLOG, 1024)
//有数据立即发送
.option(ChannelOption.TCP_NODELAY, true)
//保持连接
.childOption(ChannelOption.SO_KEEPALIVE, true)
//处理新连接
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
// 增加任务处理
ChannelPipeline p = sc.pipeline();
p.addLast(
new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4),
new LengthFieldPrepender(4),
new StringDecoder(CharsetUtil.UTF_8),
new StringEncoder(CharsetUtil.UTF_8),
//心跳检测,读超时,写超时,读写超时
//new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS),
//自定义的处理器
new TCPServerHandler()
);
}
});

//绑定端口,同步等待成功
ChannelFuture future;
try {
future = bootstrap.bind(port).sync();
if (future.isSuccess()) {
log.info("协议==> TCP服务端启动成功(端口:{})", port);
} else {
log.info("协议==> TCP服务端启动失败(端口:{})", port);
}

//等待服务监听端口关闭,就是由于这里会将线程阻塞,导致无法发送信息,所以我这里开了线程
future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
//优雅地退出,释放线程池资源
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
});
thread.start();
}

public static void sendMsg(ChannelHandlerContext ctx, String message) {
ByteBuf byteBuf = Unpooled.copiedBuffer(message, CharsetUtil.UTF_8);

ChannelFuture future = ctx.writeAndFlush(byteBuf);
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
log.info("协议==> TCP服务端发送成功。");
} else {
log.error("协议==> TCP服务端发送失败。内容:{}", message);
}
}
});
}

// public static void main(String[] args) {
// Server server = new Server(61000);
// }
}

推荐阅读