OkHttp源码分析:分发器任务调配,拦截器责任链设计,连接池socket复用

目录

一,分发器和拦截器

二,分发器处理异步请求

1.分发器处理入口

2.分发器工作流程

3.分发器中的线程池设计

三,分发器处理同步请求

四,拦截器处理请求

1.责任链设计模式

 2.拦截器工作原理

3.OkHttp五大拦截器


一,分发器和拦截器

        OkHttp在内部维护了这几个重要对象:分发器dispatcher,连接池connectionPool,拦截器Interceptor;

//拦截器
@get:JvmName("dispatcher") val dispatcher: Dispatcher = builder.dispatcher

//连接池
@get:JvmName("connectionPool") val connectionPool: ConnectionPool = builder.connectionPool
  
//拦截器
@get:JvmName("interceptors") val interceptors: List<Interceptor> =
      builder.interceptors.toImmutableList()

@get:JvmName("networkInterceptors") val networkInterceptors: List<Interceptor> =
      builder.networkInterceptors.toImmutableList()

他们的作用分别为:

  • 分发器Dispatcher:调配请求任务,内部维护队列线程池  
  • 拦截器:处理请求与响应,完成请求过程
  • 连接池:管理socket连接与连接复用

        从OkHttp的请求处理流程来看: 拦截器负责完成网络请求过程,同步和异步请求必须经过分发器调配后才会发给拦截器进行网络请求;

二,分发器处理异步请求

1.分发器处理入口

private void visitInternet() {
    //1.创建HttpClient对象
    OkHttpClient okHttpClient = new OkHttpClient();
    //2.获取request对象
    Request.Builder builder = new Request.Builder()
            .url("https://www.bilibili.com/");
    Request request = builder.build();
    //3.获取call对象
    Call call = okHttpClient.newCall(request);
    //4.执行网络操作
    try {
        Response response = call.execute();
        String result = response.body().string();
        showResultOnUiThread(result);
    } catch (IOException e) {
        throw new RuntimeException(e);
    }
}

        从OkHttp处理流程来看,每次发送请求前我们需要调用 newCall() 方法获取call对象,这里的Call是一个接口,newCall返回的是Call接口的实现类RealCall;

  /** Prepares the [request] to be executed at some point in the future. */
  override fun newCall(request: Request): Call = RealCall(this, request, forWebSocket = false)

call对象只能使用一次 

        发起异步请求需要调用call对象的 enqueue() 方法,enqueue方法首先会将call对象中的executed字段置为true,代表这个call对象已经使用过,第二次就无法使用,想要再次使用的话需要调用call对象的 clone() 方法;

        callStart方法执行后表示请求开始,之后便会执行分发器的enqueue方法处理异步请求,这里传入的对象AsyncCall是Runnable接口的实现类,可以理解为是我们要处理的异步任务;

  override fun enqueue(responseCallback: Callback) {
    //call对象只能使用一次
    check(executed.compareAndSet(false, true)) { "Already Executed" }

    callStart() //请求开始
    
    //分发器处理异步请求
    client.dispatcher.enqueue(AsyncCall(responseCallback))
  }

2.分发器工作流程

分发器中维护了三个队列:

  • readyAsyncCalls:等待中异步请求队列
  • runningAsyncCalls:执行中异步请求队列
  • runningSyncCalls:执行中同步请求队列

        分发器dispatcher的enqueue方法执行后,异步请求AsyncCall默认先放到readAsyncCalls中,如果是非websocket连接,则检查一下runningAsyncCalls和readAsyncCalls中是否有相同域名host的请求,如果有则复用之前的域名的计数器existingCall

        计数器之后用于判断同一主机(域名)请求连接数

  internal fun enqueue(call: AsyncCall) {
    synchronized(this) {
      readyAsyncCalls.add(call)

      if (!call.call.forWebSocket) {
        val existingCall = findExistingCallWithHost(call.host)
        if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
      }
    }
    promoteAndExecute()
  }

检查完之后调用promoteAndExecute()方法,在这个方法中会检查两件事:

  • 进行中异步请求数是否 ≥ 64(runningAsyncCalls队列的size是否 ≥ 64),
  • 对同一域名(主机)的请求callsPerHost是否大于5;

若条件符合,将异步任务加入到runningAsyncCalls中

检查完可执行请求并更新状态后,将请求提交到线程池中执行

private fun promoteAndExecute(): Boolean {
    this.assertThreadDoesntHoldLock()

    val executableCalls = mutableListOf<AsyncCall>()
    val isRunning: Boolean
    synchronized(this) {
      val i = readyAsyncCalls.iterator()
      while (i.hasNext()) {
        val asyncCall = i.next()

          //检查可执行请求
        if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
        if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.

        i.remove()
        asyncCall.callsPerHost.incrementAndGet()
        executableCalls.add(asyncCall)
        runningAsyncCalls.add(asyncCall)
      }
      isRunning = runningCallsCount() > 0
    }

    //提交到线程池中执行
    for (i in 0 until executableCalls.size) {
      val asyncCall = executableCalls[i]
      asyncCall.executeOn(executorService //线程池)
    }

    return isRunning
  }

将AsyncCall提交到线程池后,AsyncCall对象的run方法便会被执行;

在run方法中,从拦截器中获取了服务器的响应,完成请求后调用dispatcher的finish方法,结束本次异步请求;

override fun run() {
      threadName("OkHttp ${redactedUrl()}") {
        var signalledCallback = false
        timeout.enter()
        try {
            //拦截器完成请求,返回响应
          val response = getResponseWithInterceptorChain()
          signalledCallback = true
          responseCallback.onResponse(this@RealCall, response)
        } catch (e: IOException) {
          if (signalledCallback) {
            // Do not signal the callback twice!
            Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e)
          } else {
            responseCallback.onFailure(this@RealCall, e)
          }
        } catch (t: Throwable) {
          cancel()
          if (!signalledCallback) {
            val canceledException = IOException("canceled due to $t")
            canceledException.addSuppressed(t)
            responseCallback.onFailure(this@RealCall, canceledException)
          }
          throw t
        } finally {
            //调用finish方法,结束本次异步请求
          client.dispatcher.finished(this)
        }
      }
    }

在完成一次请求后,runningAsyncCalls队列会空出位置

所以在finish方法中,会重新调用检查异步任务方法promoteAndExecute(),也就是在结束一次请求后,会去检查readyAsyncCalls队列中符合条件的异步任务,并去执行他们

idleCallback.run() 用于在所有请求完成后执行特定操作,操作内容自定义

internal fun finished(call: AsyncCall) {
    call.callsPerHost.decrementAndGet()
    finished(runningAsyncCalls, call)
  }

  /** Used by [Call.execute] to signal completion. */
  internal fun finished(call: RealCall) {
    finished(runningSyncCalls, call)
  }

  private fun <T> finished(calls: Deque<T>, call: T) {
    val idleCallback: Runnable?
    synchronized(this) {
      if (!calls.remove(call)) throw AssertionError("Call wasn't in-flight!")
      idleCallback = this.idleCallback
    }

    //重新调用promoteAndExecute,检查可执行异步请求
    val isRunning = promoteAndExecute()

    if (!isRunning && idleCallback != null) {
        //用于在所有请求完成后执行特定操作,操作内容自定义
      idleCallback.run()
    }
  }

3.分发器中的线程池设计

分发器中的线程池:

  • 核心线程数:0
  • 最大线程数:Int.MAX_VALUE
  • 空闲时间:60s
  • 工作队列:SynchronousQueue()
@get:Synchronized
  @get:JvmName("executorService") val executorService: ExecutorService
    get() {
      if (executorServiceOrNull == null) {
        executorServiceOrNull = ThreadPoolExecutor(
            0, //核心线程数
            Int.MAX_VALUE, //最大线程数
            60, //空闲时间
            TimeUnit.SECONDS, //空闲时间单位(秒)
            SynchronousQueue(), //工作队列
            threadFactory("$okHttpName Dispatcher", false)
        )
      }
      return executorServiceOrNull!!
    }

线程池工作原理:

  1. 工作中线程 < 核心线程数 创建新线程
  2. 工作中线程 > 核心线程数且工作队列未满,加入工作队列
  3. 工作队列已满,工作中线程数若 < 最大线程数, 创建新线程
  4. 工作队列已满,工作中线程数 > 最大线程数, 执行拒绝策略(默认为抛出异常,可自定义)

在okhttp的分发器中,线程池使用SynchronousQueue()作为工作队列,这种容器没有容量,也就无法添加任务,所以当工作中线程 > 核心线程数,会直接创建新线程

三,分发器处理同步请求

对于同步请求,分发器只记录请求(放入RunningSyncCalls中)

  override fun execute(): Response {
    check(executed.compareAndSet(false, true)) { "Already Executed" }

    timeout.enter()
    callStart()
    try {
      client.dispatcher.executed(this)
      return getResponseWithInterceptorChain()
    } finally {
      client.dispatcher.finished(this)
    }
  }

//dispatcher.executed()
  @Synchronized internal fun executed(call: RealCall) {
       //分发器只记录同步请求
    runningSyncCalls.add(call)
  }

四,拦截器处理请求

1.责任链设计模式

OkHttp中的拦截器采用责任链设计模式:

        为避免请求发送者与多个请求处理者耦合在一起,于是将所有请求处理者通过前一对象记住下一对象的引用而形成一条链,当有请求发生时,请求只需沿着链传递,直到有对象处理它

模拟责任链设计模式:

我们定义一个Handler抽象类,并让他持有下一Handler对象的引用next,并创建Handler三个子类

abstract class Handler {

    protected var next : Handler? = null;

    fun setNext(next : Handler){
        this.next = next;
    }

    fun getNext() : Handler?{
        return next;
    }

    abstract fun handle(request : String);
}

class Handler1 : Handler() {

    override fun handle(request: String) {
        if("1".equals(request)){
            Log.i("TAG", "handle1处理")
        }else{
            if(getNext() != null){
                next?.handle(request);
            }else{
                Log.i("TAG", "没有下一个handler")
            }
        }
    }
}

class Handler2 : Handler() {
    override fun handle(request: String) {
        if("2".equals(request)){
            Log.i("TAG", "handle1处理")
        }else{
            if(getNext() != null){
                next?.handle(request);
            }else{
                Log.i("TAG", "没有下一个handler")
            }
        }
    }
}

class Handler3 : Handler() {
    override fun handle(request: String) {
        if("3".equals(request)){
            Log.i("TAG", "handle1处理")
        }else{
            if(getNext() != null){
                next?.handle(request);
            }else{
                Log.i("TAG", "没有下一个handler")
            }
        }
    }
}

        我们让handler1拥有2的引用,2拥有3的引用,这样当我们调用1的handle("3")时,request对象就会一直沿着责任链执行,直到遇到能处理他的对象(handler3)

val handler1: Handler = Handler1()
val handler2: Handler = Handler2()
val handler3: Handler = Handler3()

handler1.setNext(handler2)
handler2.setNext(handler3)

handler1.handle("3")

 2.拦截器工作原理

拦截器的工作基本分为三步:

  1. 处理请求request
  2. 将请求传往下一拦截器,获取返回的请求response
  3. 处理响应response并返回

例如,我们自定义一个日志打印拦截器:

class LogInterceptor : Interceptor {
    override fun intercept(chain: Interceptor.Chain): Response {
        //1.处理请求
        val request = chain.request();

        val requestLog = StringBuilder().apply {
            append("Request:\n")
            append("URL: ${request.url}\n")
            append("Method: ${request.method}\n")
            append("Headers: ${request.headers}\n")
            request.body?.let {
                append("Body: ${it.toString()}\n")
            }
        }
        Log.d("OkHttp", requestLog.toString())

        //将请求传往下一拦截器,获取响应
        val response = chain.proceed(request)

        //处理响应并返回
        val responseLog = StringBuilder().apply {
            append("Response:\n")
            append("Code: ${response.code}\n")
            append("Headers: ${response.headers}\n")
            response.body?.let {
                append("Body: ${it.string()}\n")
            }
        }
        Log.d("OkHttp", responseLog.toString())

        return response;
    }
}

在chain的proceed方法中,程序会找到拦截器链中的下一拦截器并将请求传给他,获取返回的请求

  @Throws(IOException::class)
  override fun proceed(request: Request): Response {
    check(index < interceptors.size)

    calls++

    if (exchange != null) {
      check(exchange.finder.sameHostAndPort(request.url)) {
        "network interceptor ${interceptors[index - 1]} must retain the same host and port"
      }
      check(calls == 1) {
        "network interceptor ${interceptors[index - 1]} must call proceed() exactly once"
      }
    }

    // 找到拦截器链中的下一拦截器
    val next = copy(index = index + 1, request = request)
    val interceptor = interceptors[index]

    //传递请求,获取响应
    @Suppress("USELESS_ELVIS")
    val response = interceptor.intercept(next) ?: throw NullPointerException(
        "interceptor $interceptor returned null")

    if (exchange != null) {
      check(index + 1 >= interceptors.size || next.calls == 1) {
        "network interceptor $interceptor must call proceed() exactly once"
      }
    }

    check(response.body != null) { "interceptor $interceptor returned a response with no body" }

    return response
  }

3.OkHttp五大拦截器

OkHttp中默认配置五个拦截器,分别为:

val interceptors = mutableListOf<Interceptor>()
interceptors += client.interceptors
interceptors += RetryAndFollowUpInterceptor(client)
interceptors += BridgeInterceptor(client.cookieJar)
interceptors += CacheInterceptor(client.cache)
interceptors += ConnectInterceptor
if (!forWebSocket) {
    interceptors += client.networkInterceptors
}
nterceptors += CallServerInterceptor(forWebSocket)
  • 重试和重定向拦截器 RetryAndFollowUpInterceptor:重试拦截器在交出前(交给下一个拦截器),负责判断用户是否取消了请求。在获得了响应之后,会根据响应码判断是否需要重定向,如果满足所有条件就会重启执行所有拦截器
  • 桥接拦截器(处理请求头和响应头)BridgeInterceptor:在交出之前,负责将Http协议必备的请求头加入请求之中(如Host,Connection),并添加一些默认的行为(如RZIP压缩);获得响应后调用保存cookie接口并解析GZIP数据
  • 缓存拦截器 CacheInterceptor:交出之前读取并判断是否使用缓存;获取响应后判断是否缓存
  • 连接拦截器 ConnectInterceptor:交出之前,负责创建或找到一个连接,并获取socket流;获取响应后不进行额外处理
  • 网络请求拦截器(执行实际的网络请求)CallServerInterceptor:进行真正的与服务器通信,向服务器发送数据,解析读取的响应数据

OkHttp中添加拦截器有两种方式:addInterceptor()和 addNetworkInterceptor(),他们的主要区别如下:

  • 调用时机:Application拦截器在请求开始时调用,Network在网络连接建立后调用
  • 调用次数:Application只调用一次,Network可能调用多次(重定向)
  • 可见信息:Application只能看到最终请求/响应,Network能看到所有中间请求/响应
  • 缓存感知:Application无法感知缓存,Network可以感知缓存
  • 使用场景:Application一般用于业务处理(如:身份验证,日志记录,错误处理),Network一般用于网络层操作(如:网络监控,缓存处理,压缩处理)

OkHttp完整拦截器链如下:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/939689.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Nginx主要知识点总结

1下载nginx 到nginx官网nginx: download下载nginx&#xff0c;然后解压压缩包 然后双击nginx.exe就可以启动nginx 2启动nginx 然后在浏览器的网址处输入localhost&#xff0c;进入如下页面说明nginx启动成功 3了解nginx的配置文件 4熟悉nginx的基本配置和常用操作 Nginx 常…

概率论得学习和整理27:关于离散的数组 随机变量数组的均值,方差的求法3种公式,思考和细节。

目录 1 例子1&#xff1a;最典型的&#xff0c;最简单的数组的均值&#xff0c;方差的求法 2 例子1的问题&#xff1a;例子1只是1个特例&#xff0c;而不是普遍情况。 2.1 例子1各种默认假设&#xff0c;导致了求均值和方差的特殊性&#xff0c;特别简单。 2.2 我觉得 加权…

初学stm32 --- 时钟配置

目录 stm32时钟系统 时钟源 &#xff08;1&#xff09; 2 个外部时钟源&#xff1a; &#xff08;2&#xff09;2 个内部时钟源&#xff1a; 锁相环 PLL PLLXTPRE&#xff1a; HSE 分频器作为 PLL 输入 (HSE divider for PLL entry) PLLSRC&#xff1a; PLL 输入时钟源 (PL…

Latex+VsCode+Win10搭建

最近在写论文&#xff0c;overleaf的免费使用次数受限&#xff0c;因此需要使用本地的形式进行编译。 安装TEXLive 下载地址&#xff1a;https://mirror-hk.koddos.net/CTAN/systems/texlive/Images/ 下载完成直接点击iso进行安装操作。 安装LATEX Workshop插件 设置VsCode文…

深度学习之目标检测篇——残差网络与FPN结合

特征金字塔多尺度融合特征金字塔的网络原理 这里是基于resnet网络与Fpn做的结合&#xff0c;主要把resnet中的特征层利用FPN的思想一起结合&#xff0c;实现resnet_fpn。增强目标检测backone的有效性。代码实现如下&#xff1a; import torch from torch import Tensor from c…

Leetcode 面试150题 399.除法求值

系列博客目录 文章目录 系列博客目录题目思路代码 题目 链接 思路 广度优先搜索 我们可以将整个问题建模成一张图&#xff1a;给定图中的一些点&#xff08;点即变量&#xff09;&#xff0c;以及某些边的权值&#xff08;权值即两个变量的比值&#xff09;&#xff0c;试…

python实现Excel转图片

目录 使用spire.xls库 使用excel2img库 使用spire.xls库 安装&#xff1a;pip install spire.xls -i https://pypi.tuna.tsinghua.edu.cn/simple 支持选择行和列截图&#xff0c;不好的一点就是商业库&#xff0c;转出来的图片有水印。 from spire.xls import Workbookdef …

hpe服务器更新阵列卡firmware

背景 操作系统&#xff1a;RHEL7.8 hpe服务器经常出现硬盘断开&#xff0c;阵列卡重启问题&#xff0c;导致系统hang住。只能手动硬重启。 I/O error&#xff0c;dev sda smartpqi 0000:5c:00:0: resettiong scsi 1:1:0:1 smartpqi 0000:5c:00:0: reset of scsi 1:1:0:1:…

excel 使用vlook up找出两列中不同的内容

当使用 VLOOKUP 函数时&#xff0c;您可以将其用于比较两列的内容。假设您要比较 A 列和 B 列的内容&#xff0c;并将结果显示在 C 列&#xff0c;您可以在 C1 单元格中输入以下公式&#xff1a; 这个公式将在 B 列中的每个单元格中查找是否存在于 A 列中。如果在 A 列中找不到…

北邮,成电计算机考研怎么选?

#总结结论&#xff1a; 基于当前提供的24考研复录数据&#xff0c;从报考性价比角度&#xff0c;建议25考研的同学优先选择北邮计算机学硕。主要原因是:相比成电&#xff0c;北邮计算机学硕的目标分数更低&#xff0c;录取率更高&#xff0c;而且北邮的地理位置优势明显。对于…

OpenHarmony和OpenVela的技术创新以及两者对比

两款有名的国内开源操作系统&#xff0c;OpenHarmony&#xff0c;OpenVela都非常的优秀。本文对二者的创新进行一个简要的介绍和对比。 一、OpenHarmony OpenHarmony具有诸多有特点的技术突破和重要贡献&#xff0c;以下是一些主要方面&#xff1a; 架构设计创新 分层架构…

C语言——实现找出最高分

问题描述&#xff1a;分别有6名学生的学号、姓名、性别、年龄和考试分数&#xff0c;找出这些学生当中考试成绩最高的学生姓名。 //找出最高分#include<stdio.h>struct student {char stu_num[10]; //学号 char stu_name[10]; //姓名 char sex; //性别 int age; …

Qt Quick:CheckBox 复选框

复选框不止选中和未选中2种状态哦&#xff0c;它还有1种部分选中的状态。这3种状态都是Qt自带的&#xff0c;如果想让复选框有部分选中这个状态&#xff0c;需要将三态属性&#xff08;tristate&#xff09;设为true。 未选中的状态值为0&#xff0c;部分选中是1&#xff0c;选…

Docker常用命令总结~

1、关于镜像 获取镜像 docker pull [image name] [option:tag]AI助手//获取postgres镜像(没有设置镜像版本号则默认获取最新的&#xff0c;使用latest标记) docker pull postgres or docker pull postgres:11.14 列出本地镜像 docker imagesAI助手 指定镜像启动一个容…

贪心算法在背包问题上的运用(Python)

背包问题 有n个物品,它们有各自的体积和价值,现有给定容量的背包,如何让背包里装入的物品具有最大的价值总和? 这就是典型的背包问题(又称为0-1背包问题),也是具体的、没有经过任何延伸的背包问题模型。 背包问题的传统求解方法较为复杂,现定义有一个可以载重为8kg的背…

大屏开源项目go-view二次开发3----象形柱图控件(C#)

环境搭建参考&#xff1a; 大屏开源项目go-view二次开发1----环境搭建(C#)-CSDN博客 要做的象形柱图控件最终效果如下图&#xff1a; 其实这个控件我前面的文章也介绍过&#xff0c;不过是用wpf做的&#xff0c;链接如下&#xff1a; wpf利用Microsoft.Web.WebView2显示html…

无刷电机的概念

无换向器电机 Brushless Direct Current Motor&#xff0c;BLDC 普通电机的转子就是中间旋转的线圈&#xff0c;定子就是两边的磁铁 和普通有刷相比&#xff0c;转子和定子互换材料。四周是通电的线圈&#xff0c;中间在转的是磁铁 负载工况决定额定电压&#xff0c;没有固定…

SLAAC如何工作?

SLAAC如何工作&#xff1f; IPv6无状态地址自动配置(SLAAC)-常见问题 - 苍然满关中 - 博客园 https://support.huawei.com/enterprise/zh/doc/EDOC1100323788?sectionj00shttps://www.zhihu.com/question/6691553243/answer/57023796400 主机在启动或接口UP后&#xff0c;发…

【机器学习】【集成学习——决策树、随机森林】从零起步:掌握决策树、随机森林与GBDT的机器学习之旅

这里写目录标题 一、引言机器学习中集成学习的重要性 二、决策树 (Decision Tree)2.1 基本概念2.2 组成元素2.3 工作原理分裂准则 2.4 决策树的构建过程2.5 决策树的优缺点&#xff08;1&#xff09;决策树的优点&#xff08;2&#xff09;决策树的缺点&#xff08;3&#xff0…

ubuntu+ros新手笔记(五):初探anaconda+cuda+pytorch

深度学习三件套&#xff1a;初探anacondacudapytorch 系统ubuntu22.04 ros2 humble 1.初探anaconda 1.1 安装 安装过程参照【详细】Ubuntu 下安装 Anaconda 1.2 创建和删除环境 创建新环境 conda create -n your_env_name pythonx.x比如我创建了一个名为“py312“的环境…