一、select和epoll原理分析
外设设备网卡、鼠标、键盘等通过总线写到内存中,中间就有DMA拷贝,操作系统怎么知道内存中有数据了,这就需要操作系统通过中断机制确定,如果有中断信号过来,cpu会首先打断用户程序执行,响应硬件的程序的信号,然后再恢复用户程序的执行;不同外设设备对应的驱动程序不同,发送的中断信号也就不同,操作系统根据对应信号做出相应的处理
1、操作系统如何处理中断请求
内核和设备驱动是通过中断方式来处理的。所谓中断,可以理解当设备上有数据到达时,会给cpu的相关引脚上发一个电压变化,以通知cpu来处理数据;硬件产生的信号需要cpu立马处理,否则数据可能丢失
网络模块比较耗时,中断时会过度占用cpu,导致cpu无法响应其它设备,因此在Linux中段处理函数分上半部和下半部,上半部通知cpu,下半部响应
2、进程阻塞
操作系统为了支持多任务,实现进程调度功能,会把进程分为“运行”和“等待”等几种状态。运行状态是获取cpu使用权,正在执行的代码是运行状态;等待状态是阻塞状态,详情参照下图(个人理解)
3、内核接收网络数据
一旦有cpu响应中断操作就会拷贝数据到内存,经过协议层的解析到socket应用层就有了数据,就会唤醒进程A,重新进入到工作队列中
4、同时监视多个socket的简单方法
select实现思路很直接。假如程序同时监视socket1、socket2、socket3,那么调用select之后,操作系统把进程A分别加入到这三个socket的等待列表中,当任何一个socket收到数据,都会中断程序唤醒进程A,所有的socket的等待队列中的进程A都会被移除,加入到工作队列中
如上图所示,进程A只知道Socket有数据过来,并不知道哪些Socket有数据,所以就需要遍历Socket列表;并且处于遍历的性能考虑,select最大只能监视1024个Socket
5、epoll的原理
当执行epoll_create方法时,内核会创建一个eventpoll对象;当Socket收到数据后,中断程序会操作eventpoll对象,而不直接操作进程;中断程序会给rdlist引用收到的数据Socket2、Socket3,当执行epoll_wait,如果rdlist已经引用了Socket,那么epoll_wait直接返回,如果rdlist为空,阻塞进程
当Socket接收到数据,中断程序一方面修改rdlist,另一个方面唤醒 eventpoll等待队列的线程,线程A回到工作队列中去,由于rdlist引用了接收了数据的Socket,所以不用对所有的进行遍历
二、Netty基础
1、Netty的组件
- Bootstrap是Netty框架的启动类和主入口类,分为客户端类Bootstrap和服务器类ServerBootstrap两种。
- EventLoop暂时可以看成一个线程、EventLoopGroup 自然就可以看成线程组。
- Channel是Java NIO的一个基本构造。
- ChannelHandler和ChannelPipeline:每个事件都可以被分发给 ChannelHandler 类中的某个用户实现的方法,既然事件分为 入站和出站,用来处理事件的 ChannelHandler 也被分为可以处理入站事件的 Handler 和出站 事件的 Handler,当然有些 Handler 既可以处理入站也可以处理出站;这些 ChannelHandler 都放在 ChannelPipeline 中统一管理
- ChannelFuture:异步获取结果的类,类似于JDK的java.util.concurrent.Future类
2、使用示例
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId
<version>4.1.42.Final </version>
<scope>compile</scope>
</dependency>
服务端
public class EchoServer {
private static final Logger LOG = LoggerFactory.getLogger(EchoServer.class);
private final int port;
public EchoServer(int port) {
this.port = port;
}
public static void main(String[] args) throws InterruptedException {
int port = 9999;
EchoServer echoServer = new EchoServer(port);
LOG.info("服务器即将启动");
echoServer.start();
LOG.info("服务器关闭");
}
public void start() throws InterruptedException {
/*线程组*/
EventLoopGroup group = new NioEventLoopGroup();
try {
/*服务端启动必备*/
ServerBootstrap b = new ServerBootstrap();
b.group(group)
.channel(NioServerSocketChannel.class)
.localAddress(new InetSocketAddress(port))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new EchoServerHandler());
}
});
/*异步绑定到服务器,sync()会阻塞到完成*/
ChannelFuture f = b.bind().sync();
LOG.info("服务器启动完成。");
/*阻塞当前线程,直到服务器的ServerChannel被关闭*/
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully().sync();
}
}
}
客户端
public class EchoClient {
private final int port;
private final String host;
public EchoClient(int port, String host) {
this.port = port;
this.host = host;
}
public void start() throws InterruptedException {
/*线程组*/
EventLoopGroup group = new NioEventLoopGroup();
try {
/*客户端启动必备,和服务器的不同点*/
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)/*指定使用NIO的通信模式*/
/*指定服务器的IP地址和端口,和服务器的不同点*/
.remoteAddress(new InetSocketAddress(host,port))
/*和服务器的不同点*/
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new EchoClientHandler());
}
});
/*异步连接到服务器,sync()会阻塞到完成,和服务器的不同点*/
ChannelFuture f = b.connect().sync();
f.channel().closeFuture().sync();/*阻塞当前线程,直到客户端的Channel被关闭*/
} finally {
group.shutdownGracefully().sync();
}
}
public static void main(String[] args) throws InterruptedException {
new EchoClient(9999,"127.0.0.1").start();
}
}
服务端的ChannelHandler
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf in = (ByteBuf)msg;
System.out.println("server accept :" + in.toString(CharsetUtil.UTF_8));
ctx.writeAndFlush(in);
//ctx.close();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("连接已建立");
super.channelActive(ctx);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
客户端的ChannelHandler
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
/*读取到网络数据后进行业务处理,并关闭连接*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
System.out.println("client Accept"+msg.toString(CharsetUtil.UTF_8));
//关闭连接
///ctx.close();
}
/*channel活跃后,做业务处理*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.copiedBuffer(
"Hello,Netty",CharsetUtil.UTF_8));
// ctx.pipeline().write()
// ctx.channel().write()
ctx.alloc().buffer();
}
}