文章目录
- 概要
- pom依赖
- Netty的server服务端类
- Netty通道初始化
- I/O数据读写处理
- 测试发送消息 并 接收服务端回复
- 异步启动Netty
- 运行截图
概要
Netty是业界最流行的nio框架之一,它具有功能强大、性能优异、可定制性和可扩展性的优点
Netty的优点:
1.API使用简单,开发入门门槛低。
2.功能十分强大,预置多种编码解码功能,支持多种主流协议。
3.可定制、可扩展能力强,可以通过其提供的ChannelHandler进行灵活的扩展。
4.性能优异,特别在综合性能上的优异性。
5.成熟,稳定,适用范围广。
6.可用于智能GSM/GPRS模块的通讯服务端开发,使用它进行MQTT协议的开发。
好了,废话不多说了,上代码
pom依赖
<!-- netty依赖 springboot2.x自动导入版本 -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
Netty的server服务端类
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.AdaptiveRecvByteBufAllocator;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* Netty服务器(端口自行更换,默认端口10100)
* @author wusiwee
*/
@Service
@Slf4j
public class NettyServer {
/**
* 注入Netty通道初始化处理器
*/
private final NettyChannelInboundHandlerAdapter handlerAdapter;
/**
* 通过构造函数注入依赖
* @param handlerAdapter 处理器
*/
@Autowired
public NettyServer(NettyChannelInboundHandlerAdapter handlerAdapter) {
this.handlerAdapter = handlerAdapter;
}
/**
* 启动Netty服务器
* @throws Exception 如果启动过程中发生异常
*/
public void bind() throws Exception {
// 定义bossGroup和workerGroup来处理网络事件
// 用于接受客户端连接
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
// 用于实际的业务处理操作
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// 创建ServerBootstrap实例来引导绑定和启动服务器
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
// 指定使用NIO的传输Channel
.channel(NioServerSocketChannel.class)
// 设置TCP接收缓冲区大小
.option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(64, 10496, 1048576))
.childOption(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(64, 10496, 1048576))
// 设置自定义的Channel初始化器
.childHandler(new NettyChannelInitializer(handlerAdapter));
log.info("netty server start success!");
// 绑定端口,并同步等待成功,即启动Netty服务
ChannelFuture f = serverBootstrap.bind(10100).sync();
// 等待服务端监听端口关闭
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.error("Netty server startup interrupted", e);
Thread.currentThread().interrupt();
} finally {
// 优雅关闭事件循环组
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
Netty通道初始化
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.string.StringEncoder;
import org.springframework.stereotype.Component;
/**
* 通道初始化
* @author wusiwee
*/
@Component
public class NettyChannelInitializer<SocketChannel> extends ChannelInitializer<Channel> {
/**
* 注入,目的是在该 HandlerAdapter 可以正确的注入业务Service
*/
private final NettyChannelInboundHandlerAdapter handlerAdapter;
public NettyChannelInitializer(NettyChannelInboundHandlerAdapter handlerAdapter) {
this.handlerAdapter = handlerAdapter;
}
@Override
protected void initChannel(Channel ch) {
ChannelPipeline pipeline = ch.pipeline();
// 响应字符串
pipeline.addLast(new StringEncoder());
// 自定义ChannelInboundHandlerAdapter
pipeline.addLast(handlerAdapter);
}
I/O数据读写处理
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.text.DecimalFormat;
import java.time.LocalDateTime;
import java.util.Date;
/**
* I/O数据读写处理类
* 客户端发送的消息 以及 回复客户端消息 均在此处
* @ChannelHandler.Sharable 此注解用于在多个 Channel 中重复使用同一个 Handler 实例
* @author wusiwee
*/
@Slf4j
@ChannelHandler.Sharable
@Component
public class NettyChannelInboundHandlerAdapter extends ChannelInboundHandlerAdapter{
/**
* 这里可以注入自己的service
*/
@Autowired
private IUserService iUserService;
/**
* 从客户端收到新的数据时,这个方法会在收到消息时被调用
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf in = (ByteBuf) msg;
// 确保接收的数据长度足够,minimumLength 是所有字段长度的总和
if (in.readableBytes() < MINIMUM_LENGTH) {
ctx.writeAndFlush("报文长度过低,数据不完整"+"\n");
return;
}
// 1,读取固定长度字符
byte[] frameStart = new byte[4];
in.readBytes(frameStart);
String frameStartStr = new String(frameStart, java.nio.charset.StandardCharsets.UTF_8);
log.info("1.解析:"+frameStartStr);
ctx.writeAndFlush("I got it\n");
}
/**
* 从客户端收到新的数据、读取完成时调用
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
log.info("读取完成 channelReadComplete");
ctx.flush();
}
/**
* 当出现 Throwable 对象才会被调用,即当 Netty 由于 IO 错误或者处理器在处理事件时抛出的异常时
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
log.info("exceptionCaught");
cause.printStackTrace();
//抛出异常,断开与客户端的连接
ctx.close();
}
/**
* 客户端与服务端第一次建立连接时 执行
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
ctx.channel().read();
InetSocketAddress inSocket = (InetSocketAddress) ctx.channel().remoteAddress();
String clientIp = inSocket.getAddress().getHostAddress();
//此处不能使用ctx.close(),否则客户端始终无法与服务端建立连接
log.info("客户端连接 channelActive{}", clientIp+" "+ctx.name());
}
/**
* 客户端与服务端 断连时 执行
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
InetSocketAddress inSocket = (InetSocketAddress) ctx.channel().remoteAddress();
String clientIp = inSocket.getAddress().getHostAddress();
//断开连接时,必须关闭,否则造成资源浪费,并发量很大情况下可能造成宕机
ctx.close();
log.info("channelInactive{}", clientIp);
}
/**
* 服务端当read超时, 会调用这个方法
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
super.userEventTriggered(ctx, evt);
InetSocketAddress inSocket = (InetSocketAddress) ctx.channel().remoteAddress();
String clientIp = inSocket.getAddress().getHostAddress();
//超时 断开连接
ctx.close();
log.info("userEventTriggered{}", clientIp);
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) {
log.info("注册 channelRegistered");
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) {
log.info("channelUnregistered");
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) {
log.info("channelWritabilityChanged");
}
}
测试发送消息 并 接收服务端回复
@Test
void contextLoads() {
try {
// 服务器地址
String host = "127.0.0.1";
// 服务器端口
int port = 10100;
// 要发送的消息
String message = "7E7E010038401010123433004D02000B22";
Socket socket = new Socket(host, port);
// 获取输出流
OutputStream outputStream = socket.getOutputStream();
// 将字符串转换为字节数组
byte[] data = message.getBytes();
// 写入数据到输出流
outputStream.write(data);
// 刷新输出流,确保数据发送
outputStream.flush();
InputStream input = socket.getInputStream();
//读取服务器返回的消息
BufferedReader br = new BufferedReader(new InputStreamReader(input, StandardCharsets.UTF_8));
String mess = br.readLine();
System.out.println("服务器回复:" + mess);
input.close();
outputStream.close();
socket.close();
}catch (Exception e){
System.out.println("出现异常");
}
}
异步启动Netty
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableAsync;
/**
* 启动类
*/
@SpringBootApplication
@EnableAsync
public class NettyApplication implements ApplicationRunner{
/**
* 启动springboot
*/
public static void main( String[] args ) {
SpringApplication.run(NettyApplication.class, args);
}
/**
* 创建独立线程池
*/
private final ExecutorService executorService = new ThreadPoolExecutor(
1, 1, 30, TimeUnit.SECONDS,
new LinkedBlockingDeque<>(2),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardOldestPolicy());
/**
* 注入Netty消息处理器
*/
@Resource
private NettyChannelInboundHandlerAdapter handlerAdapter;
@Override
public void run(ApplicationArguments args) throws Exception {
// 使用线程池 异步启动Netty服务器
executorService.submit(() -> {
try {
// 启动netty,绑定端口号
new NettyServer(handlerAdapter).bind();
} catch (Exception e) {
// 异常处理
System.out.println("启动netty出现异常:"+e.getMessage());
}
});
}
}
运行截图
回复客户端消息的代码片段
测试发送
客户端收到回复,断开连接
攀峰之高险,岂有崖颠;搏海之明辉,何来彼岸?前进不止,奋斗不息。