我们知道Netty作为高性能通信框架,优点在于内部封装了管道的连接通信等操作,用户只需要调用封装好的接口,便可以很便捷的进行高并发通信。类似,在Http请求时,我们通过调用HttpClient,内部使用java NIO技术,通过引入连接池概念,来提高Http的并发能力,本文主要讲解该客户端内部是如何实现并发能力提高的原理。Http客户端分为同步和异步方式,以下示例展示了最基本的异步使用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 | RequestConfig.Builder requestConfigBuilder = RequestConfig.custom() .setConnectTimeout(5000) .setSocketTimeout(0) .setConnectionRequestTimeout(3000); HttpAsyncClientBuilder httpClientBuilder = HttpAsyncClientBuilder.create().setDefaultRequestConfig(requestConfigBuilder.build()) // 这些参数会用来生成PoolingNHttpClientConnectionManager,若PoolingNHttpClientConnectionManager自定义了,那么这些参数也就无效了 .setMaxConnPerRoute(10).setMaxConnTotal(30); //配置io线程 IOReactorConfig ioReactorConfig = IOReactorConfig.custom(). setIoThreadCount(Runtime.getRuntime().availableProcessors()) .setSoKeepAlive(true) .build(); DefaultConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(ioReactorConfig); ioReactor.setExceptionHandler(new IOReactorExceptionHandler() { @Override public boolean handle(IOException e) { System.out.println("dsdsdsd"); return true; } @Override public boolean handle(RuntimeException e) { System.out.println("dsssd"); return true; } }); // 设置channel连接池并发参数 PoolingNHttpClientConnectionManager poolingNHttpClientConnectionManager = new PoolingNHttpClientConnectionManager(ioReactor); poolingNHttpClientConnectionManager.setDefaultMaxPerRoute(5); poolingNHttpClientConnectionManager.setMaxTotal(80); httpClientBuilder.setConnectionManager(poolingNHttpClientConnectionManager); // 初始化Client并启动 CloseableHttpAsyncClient client = HttpAsyncClients.custom(). setConnectionManager(poolingNHttpClientConnectionManager) .build(); client.start(); final HttpGet request = new HttpGet("http://1.1.1.2:9200/indexName/_search"); // 异步查询 client.execute(request, new FutureCallback<HttpResponse>() { @Override public void completed(HttpResponse result){ try { System.out.println(EntityUtils.toString(result.getEntity())); } catch (Exception e) { e.fillInStackTrace(); } } @Override public void failed(Exception ex) { ex.fillInStackTrace(); } @Override public void cancelled() { System.out.println("cancelled"); } }); Thread.sleep(10000); client.close(); |
使用上没啥好说的,我们就直接以数据流流向为主线,看内部是如何使用连接池进行请求处理的。需要注意的是,若我们自定义了poolingNHttpClientConnectionManager对象,那么在requestConfigBuilder中设置的连接并发将不生效。
客户端内部初始化
pom文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpcore</artifactId> <version>4.4.12</version> </dependency> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> <version>4.5.10</version> </dependency> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpcore-nio</artifactId> <version>4.4.12</version> </dependency> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpasyncclient</artifactId> <version>4.1.4</version> </dependency> |
目前httpclient已经升级到了5.x,本文源码基于4.X
InternalHttpAsyncClient客户端
我们需要关注下InternalHttpAsyncClient
及其基类CloseableHttpAsyncClientBase
,这里把重要的属性都罗列出来:
1 2 3 4 5 6 | // 线程池管理者 private final NHttpClientConnectionManager connmgr; //MainClientExec 请求发送接收时的处理 private final InternalClientExec exec; // 类似netty的boss线程,负责管道建立连接 private final Thread reactorThread; |
下图是客户端初始化时创建的一些重要的对象:
PoolingNHttpClientConnectionManager:根据名称就可以看到,是连接池管理者。
CPool:连接池,存放了当前连接池的连接信息,比如全局空闲连接available、每个route独自的Pool,后面会详细介绍。
连接建立线程+请求处理线程
客户端内部会创建两类线程,类似netty的boss和worker线程,分别用来创建连接管道:AbstractMultiworkerIOReactor、以及请求发送线程:BaseIOReactor。本文中,也复用netty的称呼,分别将这两类线程称呼为boss线程和worker线程。boss线程在CloseableHttpAsyncClientBase构造函数初始化时初始化:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | if (threadFactory != null && handler != null) { this.reactorThread = threadFactory.newThread(new Runnable() { @Override public void run() { try { // 比如当线程接收到数据,就跑到IOEventDispatch里面了 final IOEventDispatch ioEventDispatch = new InternalIODispatch(handler); // 将跑到PoolingNHttpClientConnectionManager.execute() connmgr.execute(ioEventDispatch); } catch (final Exception ex) { log.error("I/O reactor terminated abnormally", ex); } finally { status.set(Status.STOPPED); } } }); } else { this.reactorThread = null; } |
boss线程真正工作的地方是在AbstractMultiworkerIOReactor,我们需要注意的是selector选择器(会在AbstractMultiworkerIOReactor构造时产生),每当需要构建管道时,都会向该selector上注册OP_CONNECT事件。AbstractMultiworkerIOReactor初始化代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 | public void execute(//eventDispatch=InternalIODispatch final IOEventDispatch eventDispatch) throws InterruptedIOException, IOReactorException { synchronized (this.statusLock) { this.status = IOReactorStatus.ACTIVE; // Start I/O dispatchers for (int i = 0; i < this.dispatchers.length; i++) { final BaseIOReactor dispatcher = new BaseIOReactor(this.selectTimeout, this.interestOpsQueueing); dispatcher.setExceptionHandler(exceptionHandler); this.dispatchers[i] = dispatcher; } for (int i = 0; i < this.workerCount; i++) { final BaseIOReactor dispatcher = this.dispatchers[i]; this.workers[i] = new Worker(dispatcher, eventDispatch); // 产生的线程名称都是"I/O dispatcher 120" this.threads[i] = this.threadFactory.newThread(this.workers[i]); } } try { // I/O dispatcher开头的线程名称 for (int i = 0; i < this.workerCount; i++) { if (this.status != IOReactorStatus.ACTIVE) { return; } this.threads[i].start(); } // 无线死循环了,除非管道关闭 for (;;) { final int readyCount; try { // 默认睡眠1s readyCount = this.selector.select(this.selectTimeout); } catch (final InterruptedIOException ex) { throw ex; } catch (final IOException ex) { throw new IOReactorException("Unexpected selector failure", ex); } // 如果有需要处理的事件, 则进入processEvents流程, 实际的连接过程就在这里 if (this.status.compareTo(IOReactorStatus.ACTIVE) == 0) { // 纯粹管连接的地方 processEvents(readyCount); } // Verify I/O dispatchers for (int i = 0; i < this.workerCount; i++) { final Worker worker = this.workers[i]; final Throwable ex = worker.getThrowable(); if (ex != null) { throw new IOReactorException( "I/O dispatch worker terminated abnormally", ex); } } } } finally { doShutdown(); synchronized (this.statusLock) { this.status = IOReactorStatus.SHUT_DOWN; this.statusLock.notifyAll(); } } } |
具体做了如下事情:
1.构建n个worker线程,线程名称是I/O dispatcher n
开头的, n可以在IOReactorConfig
初始化时设置,默认为cpu的个数。
2.启动n个worker线程,每个worker线程真正工作时会跑到BaseIOReactor.execute()
中的。
3.死循环:select(selectTimeout)
,监听管道建立事件发生,并调用processEvents
进行管道建立的操作,随机选择一个woker线程,将管道及请求塞入对应的newChannels
中,后面会再次介绍。每当有新管道需要创建时,会自动调用selector.wakeup()函数。
我们再看下worker线程内初始化时构建了哪些对象:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | public AbstractIOReactor(final long selectTimeout, final boolean interestOpsQueueing) throws IOReactorException { super(); // 每个worker线程默认睡眠selectTimeout,然后从select(selectTimeout)醒来检查 this.selectTimeout = selectTimeout; // 该worker管理的所有IOSessionImpl this.sessions = Collections.synchronizedSet(new HashSet<IOSession>()); // 该worker接受的从boss线程建立好管道,而需要进行数据尕怂的请求体 this.newChannels = new ConcurrentLinkedQueue<ChannelEntry>(); try { // 每个worker都会拥有一个selector,用来监听读写请求。 this.selector = Selector.open(); } catch (final IOException ex) { throw new IOReactorException("Failure opening selector", ex); } this.statusMutex = new Object(); this.status = IOReactorStatus.INACTIVE; } |
我们需要知道的是:
1.每个woker线程也拥有一个selector。
2.当boss新建管道后,将管道及请求随机放入worker线程newChannels中,后续工作由worker进行。
我们再看下worker线程一直在忙哪些操作:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 | protected void execute() throws InterruptedIOException, IOReactorException { this.status = IOReactorStatus.ACTIVE; try { for (;;) { // 这里也是无线死循环了 final int readyCount; try { readyCount = this.selector.select(this.selectTimeout); // 查询到影响,最多1s } // Process selected I/O events if (readyCount > 0) { // 处理IO 事件 processEvents(this.selector.selectedKeys()); } // Validate active channels //调用AbstractIOReactor.timeoutCheck()检查这个管道对应的请求是否超时。 //超时了会打印milliseconds timeout on connection http-outgoing-日志 validate(this.selector.keys()); // Process closed sessions processClosedSessions(); // If active process new channels if (this.status == IOReactorStatus.ACTIVE) { processNewChannels(); } } } finally { hardShutdown(); synchronized (this.statusMutex) { this.statusMutex.notifyAll(); } } } |
worker主线程做了如下事情:
1.进行select()等待,最多等待selectTimeout。
2.若selector监听到事件产生后,会调用processEvents()
进行处理,worker线程只会处理write和read事件,其余事件忽略不处理。
3.调用validate
检查管道对应的请求是否超时了,超时会打印milliseconds timeout on connection
类似的日志,相当于每个http请求增加了执行超时时间。这里的超时通过setSocketTimeout
设置,若我们不需要设置http级别的超时时间,将该参数设置为0即可。
4.调用processNewChannels
检查是否有boss线程传递过来新建立的管道,有的话,就处理,后面会介绍。
http请求发送阶段
主线程申请请求发送
我们就直接以InternalHttpAsyncClient.execute
代码开始,会首先构建new DefaultClientExchangeHandlerImpl().start()
, 我们尤其需要注意DefaultClientExchangeHandlerImpl
对象,存放着当前请求内容,当申请到管道后,会存放入管道的http.nio.exchange-handler
属性中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | public DefaultClientExchangeHandlerImpl( final Log log, final HttpAsyncRequestProducer requestProducer, final HttpAsyncResponseConsumer<T> responseConsumer,//缓存Response的 final HttpClientContext localContext, final BasicFuture<T> resultFuture, final NHttpClientConnectionManager connmgr, final ConnectionReuseStrategy connReuseStrategy, final ConnectionKeepAliveStrategy keepaliveStrategy, final InternalClientExec exec) { // 1.基类会针对每次请求,产生一个id // 2.我们需要注意localContext,可以存放请求的很多私有属性,比如 super(log, localContext, connmgr, connReuseStrategy, keepaliveStrategy); // 请求产生者 this.requestProducer = requestProducer; // response存储地方 this.responseConsumer = responseConsumer; // 响应用户请求 this.resultFuture = resultFuture; this.exec = exec; // 每次查询都包含一个state, this.state = new InternalState(getId(), requestProducer, responseConsumer, localContext);// 产生当前的state } |
最终会调用AbstractClientExchangeHandler.requestConnection()
-> PoolingNHttpClientConnectionManager.requestConnection()
-> AbstractNIOConnPool.lease()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 | public Future<E> lease( final T route, final Object state, final long connectTimeout, final long leaseTimeout, final TimeUnit timeUnit, final FutureCallback<E> callback) { final BasicFuture<E> future = new BasicFuture<E>(callback); final LeaseRequest<T, C, E> leaseRequest = new LeaseRequest<T, C, E>(route, state, connectTimeout >= 0 ? timeUnit.toMillis(connectTimeout) : -1, leaseTimeout > 0 ? timeUnit.toMillis(leaseTimeout) : 0,// connectionRequestTimeout, future); // 保证一次只能有一个获取,放在pending中占位并发 this.lock.lock(); try { final boolean completed = processPendingRequest(leaseRequest); if (!leaseRequest.isDone() && !completed) { this.leasingRequests.add(leaseRequest); } if (leaseRequest.isDone()) { this.completedRequests.add(leaseRequest); } } finally { this.lock.unlock(); } fireCallbacks(); ...... } |
该函数主要目的是从是连接池中申请连接:
1.首先调用this.lock.lock()
锁住线程池。这里需要说明下,在实际使用时,若并发相对较高时,发现存在严重的锁阻塞,阻塞耗时1-3s, 在httpclint5.x版本里,已经将线程池级别锁粒度细分到单个route粒度的锁,大大降低了锁互斥的等待时间。
2.调用processPendingRequest
检查是否有空闲可用管道、可申请连接、还是请求需要pending。
- 若返回为false, 且leaseRequest不为done, 说明连接池满了,将请求放入leasingRequests挂起,等待后续再次申请。
- 若返回为true, 且leaseRequest为done, 则说明申请到可复用的连接管道,请求则放入completedRequests,等待调用
fireCallbacks()
时交给worker线程。 - 若返回为true,且leaseRequest不为done, 则说明该route的连接并发未达上限,请求已经在
processPendingRequest
内放入了DefaultConnectingIOReactor.requestQueue
,等待boss线程去创建新的管道。
在继续后面的介绍前,先给大家介绍下线程池内部结构:
- leasingRequests: 存放当前route连接并发已经达到上限的请求。
- available: 完成请求后,会将当前管道释放到入available,等待后续请求直接复用该管道。
- pending: pending中存放的是已经获取权限,需要自己构建SocketChannel的请求。直到构建管道
ManagedNHttpClientConnectionImpl
(此时已经建立了SocketChannel),才会将请求从pending转移到leased中。 - leased: 既包含直接从available获取到可用连接管道的请求,也包含创建ManagedNHttpClientConnectionImpl后,从pending转移过来的请求,直到请求完成后将管道释放到available中。
- completedRequests: 直接从连接池中拿到ManagedNHttpClientConnectionImpl,等待放入worker线程池的请求。
- service1: 远程服务器,每个service1就是一个ip, 也就是一个route。
内部请求转化流程如下: 这里对涉及的管道的包含关系如下:
我们具体看下processPendingRequest
是如何从连接池中申请管道的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 | private boolean processPendingRequest(final LeaseRequest<T, C, E> request) { final T route = request.getRoute(); final long now = System.currentTimeMillis(); // 检查获取锁是否已经超时了 if (now > deadline) { request.failed(new TimeoutException("Connection lease request time out")); return false; } // 这个route对应的连接Pool final RouteSpecificPool<T, C, E> pool = getPool(route); E entry; for (;;) { // 首先从free中获取 entry = pool.getFree(state); // 没有空闲的 if (entry == null) { break; } // 从free中获取到了 if (entry.isClosed() || entry.isExpired(System.currentTimeMillis())) { entry.close(); this.available.remove(entry); // 那么直接释放了 pool.free(entry, false); } else { break; } } // 从空闲队列申请到了CPoolEntry if (entry != null) { this.available.remove(entry); // 转移到全局申请出去的列表中 this.leased.add(entry); // 标记完成了 request.completed(entry); // 啥都不做 onReuse(entry); onLease(entry); // 直接return了 return true; } // 没有空闲可用 // New connection is needed final int maxPerRoute = getMax(route); // Shrink the pool prior to allocating a new connection final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute); // 仅仅是为了检查已经生成的的队列是否超过当前route限制,若超过了,就需要主动关闭了 if (excess > 0) { // 超过了就开始从空闲队列中关闭 for (int i = 0; i < excess; i++) { // 从空闲列表中拿个 final E lastUsed = pool.getLastUsed(); if (lastUsed == null) { break; } lastUsed.close(); this.available.remove(lastUsed); pool.remove(lastUsed); // 从本管道中关闭 } } // 该route若还没超过本routing身线程池 if (pool.getAllocatedCount() < maxPerRoute) // 总池子的使用 final int totalUsed = this.pending.size() + this.leased.size(); // 当前申请的是否已经超过了总连接池个数 final int freeCapacity = Math.max(this.maxTotal - totalUsed, 0); // 满了 if (freeCapacity == 0) { return false; } // 此时还没超过 // 查看全局空闲是否超过限制了 final int totalAvailable = this.available.size(); // 当前空闲的+1是否超过了全局可用剩余个数 if (totalAvailable > freeCapacity - 1) { // 若超过了,那么就关闭一个 if (!this.available.isEmpty()) { final E lastUsed = this.available.removeLast();// CPoolEntry lastUsed.close(); final RouteSpecificPool<T, C, E> otherpool = getPool(lastUsed.getRoute()); otherpool.remove(lastUsed); } } final SocketAddress localAddress; final SocketAddress remoteAddress; try { //会把域名映射出来,比如:qa1.l1c.data.hehe.com映射5个ips, admin.daxe1.l1c.data.hehe.com映射一个vip,但是只取第一个 remoteAddress = this.addressResolver.resolveRemoteAddress(route); localAddress = this.addressResolver.resolveLocalAddress(route); } catch (final IOException ex) { request.failed(ex); return false; } // 将请求放入了请求Queue中,并唤醒了主selector。 final SessionRequest sessionRequest = this.ioReactor.connect( remoteAddress, localAddress, route, this.sessionRequestCallback); request.attachSessionRequest(sessionRequest); final long connectTimeout = request.getConnectTimeout(); if (connectTimeout >= 0) { sessionRequest.setConnectTimeout(connectTimeout < Integer.MAX_VALUE ? (int) connectTimeout : Integer.MAX_VALUE); } // 加入到route连接池pending集合 this.pending.add(sessionRequest); // 已经获得了连接权,但是还没有建立连接的请求 pool.addPending(sessionRequest, request.getFuture()); return true; } return false; } |
向连接池申请连接主要做了如下事情:
1.检查获取lock是否超时,超时参数通过setConnectionRequestTimeout(3000)
参数设置。
2.获取该route对应的连接池:RouteSpecificPool, 检测是否有空闲可用的连接,有的话就返回CPoolEntry。
3.检查当前route连接是否超过上限,有的话,就从availabe中取出,并关闭管道。
4.若连接还没达到上限,那么就调用ioReactor.connect()
将请求放入DefaultConnectingIOReactor.requestQueue中,并唤醒主线程,等待boss线程去创建新的管道。
boss线程创建新的连接管道
前面也提到了,boss线程会在selector.select()中唤醒。唤醒后,会进入DefaultConnectingIOReactor.processEvents
判断是否有需要建立连接的请求。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | protected void processEvents(final int readyCount) throws IOReactorException { // 创建新的管道 processSessionRequests(); if (readyCount > 0) { final Set<SelectionKey> selectedKeys = this.selector.selectedKeys(); for (final SelectionKey key : selectedKeys) { // 发现有连接事件发生了 processEvent(key); } selectedKeys.clear(); } // 判断select是否超时(默认1s) final long currentTime = System.currentTimeMillis(); if ((currentTime - this.lastTimeoutCheck) >= this.selectTimeout) { this.lastTimeoutCheck = currentTime; final Set<SelectionKey> keys = this.selector.keys(); processTimeouts(keys); } } |
boss每次循环主要做了如下事情:
1.调用processSessionRequests
创建新的管道。
2.调用processEvent()
处理发生的连接事件。
注意DefaultConnectingIOReactor.processSessionRequests
只负责调用接口创建管道,而不用等待管道是否创建ok;而processEvent
是专门用来监听管道是否建立成功的。我们继续看下创建管道做了哪些事情:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 | private void processSessionRequests() throws IOReactorException { SessionRequestImpl request; // 有权产生新的管道,但是还没有管道可用 while ((request = this.requestQueue.poll()) != null) { // 检查是否完成了 if (request.isCompleted()) { continue; } final SocketChannel socketChannel; // SocketChannelImpl try { // 建立一个socket socketChannel = SocketChannel.open(); } catch (final IOException ex) { request.failed(ex); return; } try { validateAddress(request.getLocalAddress()); validateAddress(request.getRemoteAddress()); // 设置非阻塞 socketChannel.configureBlocking(false); // 设置SocketAdaptor一些参数,比如是否复用,连接超时,写内核buffer prepareSocket(socketChannel.socket()); if (request.getLocalAddress() != null) { // 为null final Socket sock = socketChannel.socket(); sock.setReuseAddress(this.config.isSoReuseAddress()); sock.bind(request.getLocalAddress()); } final SocketAddress targetAddress = request.getRemoteAddress(); // Run this under a doPrivileged to support lib users that run under a SecurityManager this allows granting connect // permissions only to this library // 是否已经连接上 final boolean connected; try { connected = AccessController.doPrivileged( new PrivilegedExceptionAction<Boolean>() { @Override public Boolean run() throws IOException { // 连接远程目标节点 return socketChannel.connect(targetAddress); }; }); } // 如果已经建立连接 if (connected) { final ChannelEntry entry = new ChannelEntry(socketChannel, request); ;// 直接就分配对对应的work了,就没boss线程啥事了 addChannel(entry) continue; } } // 还未连接成功, 则注册到selector, 等待connect事件的触发, 再用processEvent来处理 final SessionRequestHandle requestHandle = new SessionRequestHandle(request); try { // 向这个管道注册connect事件 final SelectionKey key = socketChannel.register(this.selector, SelectionKey.OP_CONNECT, requestHandle); request.setKey(key); } } } |
创建管道过程也相对比较清晰:
1.循环从DefaultConnectingIOReactor.requestQueue拿需要创建管道的请求。(前面提了,主线程会将创建管道的请求放入该queue中)
2.创建SocketChannelImpl后,调用bind绑定:
- 若同步绑定成功后,将产生的ChannelEntry(socketChannel, request)顺序分配给一个worker线程(该worker的
newChannels
中) - 若还未绑定成功,则向boss的selector添加SelectionKey.OP_CONNECT事件,等待管道连接的事件发送。(只有ServerSocketChannel才会注册SelectionKey.OP_ACCEPT事件, SocketChannel只能注册SelectionKey.OP_CONNECT事件)
我们看下processEvent
如何处理OP_CONNECT事件的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 | private void processEvent(final SelectionKey key) { try { // 该key是否是connect属性 if (key.isConnectable()) { final SocketChannel channel = (SocketChannel) key.channel(); // Get request handle final SessionRequestHandle requestHandle = (SessionRequestHandle) key.attachment(); final SessionRequestImpl sessionRequest = requestHandle.getSessionRequest(); // Finish connection process try { // 非阻塞模式下,确认是否连接好,若未连接好,直接返回false,方法必不可少(置位管道状态) channel.finishConnect(); } catch (final IOException ex) { sessionRequest.failed(ex); } key.cancel(); key.attach(null); if (!sessionRequest.isCompleted()) { addChannel(new ChannelEntry(channel, sessionRequest)); } else { try { channel.close(); } catch (final IOException ignore) { } } } } } |
一看就比较清晰了吧,boss线程只接受连接事件,非连接事件一律丢弃。检查到连接创建完成后,构建new ChannelEntry(channel, sessionRequest)顺序分配给一个worker线程(该worker的newChannels
中)
worker线程发送请求
接下来就看worker线程接到请求后如何处理了。前面worker也提到了,worker死循环会做如下三件事情(参考AbstractIOReactor.execute
函数);
1.调用processEvents
检查新的write、read事件。
2.调用validate
判断是否有查询超时,超时参数通过setSocketTimeout参数设置
3.调用processNewChannels
处理boss线程传递的新创建的管道及请求。
我们先看下AbstractIOReactor.processNewChannels()
如何处理新创建的管道及请求的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 | private void processNewChannels() throws IOReactorException { ChannelEntry entry; // 轮循每个新产生的请求及对应的管道 while ((entry = this.newChannels.poll()) != null) { final SocketChannel channel; final SelectionKey key; try { channel = entry.getChannel(); channel.configureBlocking(false); // SelectionKeyImpl,都注册read事件 key = channel.register(this.selector, SelectionKey.OP_READ); } final SessionClosedCallback sessionClosedCallback = new SessionClosedCallback() { @Override public void sessionClosed(final IOSession session) { queueClosedSession(session); } }; InterestOpsCallback interestOpsCallback = null; final IOSession session; try { // IOSessionImpl与key是绑定的,因为key是重复利用的,所以IOSessionImpl也是重复利用的 session = new IOSessionImpl(key, interestOpsCallback, sessionClosedCallback); int timeout = 0; try { timeout = channel.socket().getSoTimeout(); } catch (final IOException ex) { // Very unlikely to happen and is not fatal // as the protocol layer is expected to overwrite // this value anyways } // 设置http.session.attachment session.setAttribute(IOSession.ATTACHMENT_KEY, entry.getAttachment()); // 设置超时 session.setSocketTimeout(timeout); } try { // 一个新的上下文请求 this.sessions.add(session); // 将这个IOSessionImpl放入SelectionKeyImpl中 key.attach(session); final SessionRequestImpl sessionRequest = entry.getSessionRequest(); if (sessionRequest != null) { if (!sessionRequest.isTerminated()) { //1.产生了connection,2.往AbstractNIOConnPool.leased放入CPoolEntry.3.设置可写事件 sessionRequest.completed(session); } if (!sessionRequest.isTerminated()) { // 进来设置write事件了 sessionCreated(key, session); } if (sessionRequest.isTerminated()) { throw new CancelledKeyException(); } } else { sessionCreated(key, session); } } } } |
该函数主要做了如下事情:
1.对该管道添加SelectionKey.OP_READ
事件
2.创建IOSessionImpl对象,需要注意,该对象生命周期与SocketChannelImpl绑定的。
3.调用sessionRequest.completed
:
- 产生
ManagedNHttpClientConnectionImpl
管道。 - 构建CPoolEntry。
- 向该管道对应的
IOSessionImpl.attributes
增加http.nio.exchange-handler
,将请求内容DefaultClientExchangeHandlerImpl
与该管道绑定。 - 并将该管道增加
SelectionKey.OP_WRITE
感兴趣的事件注意:
1.调用AbstractClientExchangeHandler.connectionAllocated
表示ManagedNHttpClientConnectionImpl管道已经就绪,就等待worker发送请求,该函数将在两个地方调用:1.主函数从空闲列表中申请到可用管道。2.worker线程接到boss线程创建的SocketChannleImpl后创建了ManagedNHttpClientConnectionImpl管道。
2.每个管道在数据发送前,会通过http.nio.exchange-handler
属性,与请求绑定。每个管道就是一个连接并发,每次只能发送一次请求,只有当上一个请求结束后,该管道才会分配给下个请求。
3.需要说下,为啥我们不可以直接发送请求、而再来注册SelectionKey.OP_WRITE事件呢?注册后, 系统会去检查内核写缓冲区是否写满了, 若写满了,会发送失败的情况。
4.此时管道已经注册了SelectionKey.OP_READ
和SelectionKey.OP_WRITE
事件。
我们再看下AbstractIOReactor.processEvents
如何处理事件的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 | private void processEvents(final Set<SelectionKey> selectedKeys) { for (final SelectionKey key : selectedKeys) { processEvent(key); } selectedKeys.clear(); } protected void processEvent(final SelectionKey key) { // 直接通过IOsessionImpl获取元数据,复用时, final IOSessionImpl session = (IOSessionImpl) key.attachment(); try { if (key.isAcceptable()) {// accept事件 acceptable(key); // 啥都不干 } if (key.isConnectable()) { // connect事件 connectable(key); // 啥都不干 } if (key.isReadable()) { // 读事件 session.resetLastRead(); readable(key); } if (key.isWritable()) {// 里面注册了可写事件 session.resetLastWrite(); writable(key); // 真正写数据 } } catch (final CancelledKeyException ex) { queueClosedSession(session); key.attach(null); } } |
这里主要关注的是write
、read
事件,针对accept
、connect
直接丢弃,read
响应在下一章详细介绍。我们继续看下监听到write
后发生了什么事情,发送数据时会跑到DefaultNHttpClientConnection.produceOutput
这里:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 | public void produceOutput(final NHttpClientEventHandler handler) {// HttpAsyncRequestExecutor try { if (this.status == ACTIVE) { if (this.contentEncoder == null && !this.outbuf.hasData()) { handler.requestReady(this); } // 编码请求,默认使用LengthDelimitedEncoder进行编码 if (this.contentEncoder != null) { handler.outputReady(this, this.contentEncoder); if (this.contentEncoder.isCompleted()) { resetOutput(); } } } if (this.outbuf.hasData()) { // 真正向管道中刷数据了 final int bytesWritten = this.outbuf.flush(this.session.channel()); if (bytesWritten > 0) { this.outTransportMetrics.incrementBytesTransferred(bytesWritten); } } if (!this.outbuf.hasData()) {// 若没有数据了 if (this.status == CLOSING) { this.session.close(); this.status = CLOSED; resetOutput(); } } } finally { // Finally set the buffered output flag this.hasBufferedOutput = this.outbuf.hasData(); } } |
主要做了两件事:
1.针对body使用LengthDelimitedEncoder
进行编码。
2.调用this.outbuf.flush()
将编码内容从SocketChannel真正发送出去。
http响应阶段
http响应阶段在AbstractIOReactor.processEvents
的key.isReadable()
处接受响应,会进入到BaseIOReactor.readable()
中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | protected void readable(final SelectionKey key) { //获取这个key绑定的IOSessionImpl(在key与管道,IOSessionImpl都是绑定一起的) final IOSession session = getSession(key); try { // Try to gently feed more data to the event dispatcher // if the session input buffer has not been fully exhausted // (the choice of 5 iterations is purely arbitrary) for (int i = 0; i < 5; i++) { // 实现类是InternalIODispatch this.eventDispatch.inputReady(session); if (!session.hasBufferedInput() || (session.getEventMask() & SelectionKey.OP_READ) == 0) { break; } } if (session.hasBufferedInput()) { this.bufferingSessions.add(session); } } } |
从管道读取时,会循环5次(一般调用一次InternalIODispatch.inputReady
就读取完数据了),直到读取完数据。读取数据会进入到DefaultNHttpClientConnection.consumeInput
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 | public void consumeInput(final NHttpClientEventHandler handler) {// HttpAsyncRequestExecutor try { if (this.response == null) { int bytesRead; // 循环读取,直到读取完成 do { // 首先读8k bytesRead = this.responseParser.fillBuffer(this.session.channel()); if (bytesRead > 0) { this.inTransportMetrics.incrementBytesTransferred(bytesRead); } //BasicHttpResponse,解析了如何读取http的字节流 this.response = this.responseParser.parse(); } while (bytesRead > 0 && this.response == null); if (this.response != null) { if (this.response.getStatusLine().getStatusCode() >= 200) { // 这里才会产生一个createContentDecoder final HttpEntity entity = prepareDecoder(this.response); this.response.setEntity(entity); this.connMetrics.incrementResponseCount(); } this.hasBufferedInput = this.inbuf.hasData(); onResponseReceived(this.response);// 没用 handler.responseReceived(this);//从管道中读取完数据后,handler=HttpAsyncRequestExecutor if (this.contentDecoder == null) { resetInput(); } } if (bytesRead == -1 && !this.inbuf.hasData()) { handler.endOfInput(this); } } if (this.contentDecoder != null && (this.session.getEventMask() & SelectionKey.OP_READ) > 0) { // 1.读取body,2.会存在释放管道的行为.3.响应用户 handler.inputReady(this, this.contentDecoder); if (this.contentDecoder.isCompleted()) { // Response entity received // Ready to receive a new response resetInput(); } } } finally { // Finally set buffered input flag this.hasBufferedInput = this.inbuf.hasData(); } } |
读取过程做了如下事情:
1.首先调用responseParser.fillBuffer()
从管道中读取8KB的字节流出来,接着调用AbstractMessageParser.parse()
解析http的头部数据。
2.调用NHttpConnectionBase.prepareDecoder()
,从header的content-length解析出content的长度。并产生解析数据使用的LengthDelimitedDecoder
, 此时8k字节流buffer也放入了LengthDelimitedDecoder中。
3.调用HttpAsyncRequestExecutor.responseReceived
根据content-length来初始化接收响应使用的buffer[],默认使用HeapByteBufferAllocator.INSTANCE。
4.调用HttpAsyncRequestExecutor.inputReady()
来组装整个content(实际会进入SimpleInputBuffer.consumeContent()从channel中读取
, 解析数据使用的LengthDelimitedDecoder
;然后调用HttpAsyncRequestExecutor.processResponse()
释放管道,响应用户。
我们看下AbstractMessageParser.parse()
如何解析http头部的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 | public T parse() throws IOException, HttpException { while (this.state != COMPLETED) { if (this.lineBuf == null) {// 先读取最开头的"HTTP/1.1 200 OK" this.lineBuf = new CharArrayBuffer(64); } else { this.lineBuf.clear(); // 清空接着用 } // 若没有结束的话,每次读取一行,若读取的是\r\n,经过过滤,长度就变成了0,说明headers就读取完了 final boolean lineComplete = this.sessionBuffer.readLine(this.lineBuf, this.endOfStream);// 从sessionBuffer中读取一行 final int maxLineLen = this.constraints.getMaxLineLength(); if (maxLineLen > 0 && (this.lineBuf.length() > maxLineLen || (!lineComplete && this.sessionBuffer.length() > maxLineLen))) { throw new MessageConstraintException("Maximum line length limit exceeded"); } if (!lineComplete) { break; } switch (this.state) {//more是0 case READ_HEAD_LINE:// read_head_line try { // 算是解析HTTP/1.1 200 OK parseHeadLine(); } catch (final ParseException px) { throw new ProtocolException(px.getMessage(), px); } this.state = READ_HEADERS;// read_headers break; case READ_HEADERS:// read_headers if (this.lineBuf.length() > 0) { // 若读取长度为0,就说明读取完了 final int maxHeaderCount = this.constraints.getMaxHeaderCount(); if (maxHeaderCount > 0 && headerBufs.size() >= maxHeaderCount) { throw new MessageConstraintException("Maximum header count exceeded"); } parseHeader(); } else { this.state = COMPLETED; } break; } if (this.endOfStream && !this.sessionBuffer.hasData()) { this.state = COMPLETED; } } if (this.state == COMPLETED) { for (final CharArrayBuffer buffer : this.headerBufs) { try { // 开始解析header this.message.addHeader(lineParser.parseHeader(buffer)); } catch (final ParseException ex) { throw new ProtocolException(ex.getMessage(), ex); } } return this.message; } return null; } |
这里涉及到读取state的转变,转变过程如下:
字节流前缀如下:HTTP/1.1 200 OK\r\nheaders\r\n\r\ncontents
,可以看到,header与content之间以两个\r\n
为分隔符,AbstractMessageParser.parse()
就是解析http content之前的内容。
我们再看下SimpleInputBuffer.consumeContent()
如何组装整个content。在前面读取headers时,直接从SocketChannelImpl读取了8k字节流,此时仅仅读取了http header部分,8k中也包含了部分content内容,这里也会一起读取出来。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 | public int consumeContent(final ContentDecoder decoder) throws IOException { // 重新读取 setInputMode(); int totalRead = 0; int bytesRead; // 可以申请多大的DirectBuffer,就读取多少数据 while ((bytesRead = decoder.read(this.buffer)) != -1) { if (bytesRead == 0) { if (!this.buffer.hasRemaining()) { expand(); } else { break; } } else { totalRead += bytesRead; // 每次只能读取185472b左右的数据,若多了,这里while也读取不完 } } if (bytesRead == -1 || decoder.isCompleted()) { this.endOfStream = true; } return totalRead; } |
这里就比较简单了,就是循环调用decoder.read
来从管道中读取剩余的字节流了。而decoder.read
读取http content部分如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | public int read(final ByteBuffer dst) throws IOException { final int chunk = (int) Math.min((this.contentLength - this.len), Integer.MAX_VALUE); final int bytesRead; if (this.buffer.hasData()) { final int maxLen = Math.min(chunk, this.buffer.length()); bytesRead = this.buffer.read(dst, maxLen); } else { // 一次读取多少文档,取决于从DirectBufferCache中申请的DirectDuffer大小 bytesRead = readFromChannel(dst, chunk); } this.len += bytesRead; if (this.len >= this.contentLength) { setCompleted(); } return isCompleted() && bytesRead == 0 ? -1 : bytesRead; } |
该函数主要做了如下事情:
1.首先将之前8k中未读取的content放入dst中
2.再依次从管道中读取剩余所有的content放入dst中。
此时整个content部分也读取完成了。
总结
http请求发送时存在3种可能,1.连接池无可用管道,连接也没达到上限,那么将请求交给boss线程新建管道,再交给worker线程发送请求。2.连接池有可用管道,那么直接将请求交给worker发送。3.连接池无可用管道,且连接个数已达上限,那么请求阻塞等待。每个管道一次只能发送一次请求,下个请求只能等当前请求完成、管道释放后才能进行,通过管道个数来限制连接并发,导致管道利用率不高,这里也许可以进行部分优化。