Go 连接池的设计与实现

为什么需要连接池

如果不用连接池,而是每次请求都创建一个连接是比较昂贵的,因此需要完成3次tcp握手

同时在高并发场景下,由于没有连接池的最大连接数限制,可以创建无数个连接,耗尽文件描述符

连接池就是为了复用这些创建好的连接

连接池设计

基本上连接池都会设计以下几个参数:

  • 初始连接数:在初始化连接池时就会预先创建好的连接数量,如果设置得:

    • 过大:可能造成浪费
    • 过小:请求到来时需要新建连接
  • 最大空闲连接数maxIdle:池中最大缓存的连接个数,如果设置得:

    • 过大:造成浪费,自己不用还把持着连接。因为数据库整体的连接数是有限的,当前进程占用多了,其他进程能获取的就少了
    • 过小:无法应对突发流量
  • 最大连接数maxCap

    • 如果已经用了maxCap个连接,要申请第maxCap+1个连接时,一般会阻塞在那里,直到超时或者别人归还一个连接
  • 最大空闲时间idleTimeout:当发现某连接空闲超过这个时间时,会将其关闭,重新去获取连接

    • 避免连接长时间没用,自动失效的问题

连接池对外提供两个方法,Get:获取一个连接,Put:归还一个连接

大部分连接池的实现大同小异,基本流程如下:

Get

在这里插入图片描述

需要注意:

  • 当有空闲连接时,需要进一步判断连接是否有过期(超过最大空闲时间idleTimeout)

    • 这些连接有可能很久没用过了,在数据库层面已经过期。如果贸然使用可能出现错误,因此最好检查下是否超时
  • 当陷入阻塞时,最好设置超时时间,避免一直没等到有人归还连接而一直阻塞

Put

在这里插入图片描述

归还连接时:

  • 先看有没有阻塞的获取连接的请求,如果有转交连接,并唤醒阻塞请求
  • 否则看能否放回去空闲队列,如果不能直接关闭请求

总结

根据上面总结的流程,连接池还需要维护另外两个结构:

  • 空闲队列
  • 阻塞请求的队列

在这里插入图片描述

开源实现

接下来看几个开源连接池的实现,都大体符合上面介绍的流程

silenceper/pool

代码地址:https://github.com/silenceper/pool

数据结构:

// channelPool 存放连接信息
type channelPool struct {
   mu                       sync.RWMutex
   // 空闲连接
   conns                    chan *idleConn
   // 产生新连接的方法
   factory                  func() (interface{}, error)
   // 关闭连接的方法
   close                    func(interface{}) error
   ping                     func(interface{}) error
   // 最大空闲时间,最大阻塞等待时间(实际没用到)
   idleTimeout, waitTimeOut time.Duration
   // 最大连接数
   maxActive                int
   openingConns             int
   // 阻塞的请求
   connReqs                 []chan connReq
}

可以看出,silenceper/pool

  • 用channel实现了空闲连接队列conns
  • 为每个阻塞的请求创建一个channel,加入connReqs中。这样请求会阻塞在自己的channel上

Get:

func (c *channelPool) Get() (interface{}, error) {
   conns := c.getConns()
   if conns == nil {
      return nil, ErrClosed
   }
   for {
      select {
      // 如果有空闲连接
      case wrapConn := <-conns:
         if wrapConn == nil {
            return nil, ErrClosed
         }
         //判断是否超时,超时则丢弃
         if timeout := c.idleTimeout; timeout > 0 {
            if wrapConn.t.Add(timeout).Before(time.Now()) {
               //丢弃并关闭该连接
               c.Close(wrapConn.conn)
               continue
            }
         }
         //判断是否失效,失效则丢弃,如果用户没有设定 ping 方法,就不检查
         if c.ping != nil {
            if err := c.Ping(wrapConn.conn); err != nil {
               c.Close(wrapConn.conn)
               continue
            }
         }
         return wrapConn.conn, nil
      // 没有空闲连接
      default:
         c.mu.Lock()
         log.Debugf("openConn %v %v", c.openingConns, c.maxActive)
         if c.openingConns >= c.maxActive {
            // 连接数已经达到上线,不能再创建连接
            req := make(chan connReq, 1)
            c.connReqs = append(c.connReqs, req)
            c.mu.Unlock()
            // 将自己阻塞在channel上
            ret, ok := <-req
            if !ok {
               return nil, ErrMaxActiveConnReached
            }
            // 再检查一次是否超时
            if timeout := c.idleTimeout; timeout > 0 {
               if ret.idleConn.t.Add(timeout).Before(time.Now()) {
                  //丢弃并关闭该连接
                  c.Close(ret.idleConn.conn)
                  continue
               }
            }
            return ret.idleConn.conn, nil
         }
         
         // 没有超过最大连接数,创建一个新的连接
         if c.factory == nil {
            c.mu.Unlock()
            return nil, ErrClosed
         }
         conn, err := c.factory()
         if err != nil {
            c.mu.Unlock()
            return nil, err
         }
         c.openingConns++
         c.mu.Unlock()
         return conn, nil
      }
   }
}

这段代码基本符合上面介绍的Get流程,应该很好理解

需要注意:

  1. 当收到别人归还的连接狗,这里再检查了一次是否超时。但我认为这次检查是没必要的,因为别人刚用完,一般不可能超时
  2. 虽然在pool的数据结构定义中有waitTimeOut字段,但实际没有使用,即阻塞获取可能无限期阻塞,这是一个优化点

Put:

// Put 将连接放回pool中
func (c *channelPool) Put(conn interface{}) error {
   if conn == nil {
      return errors.New("connection is nil. rejecting")
   }

   c.mu.Lock()

   if c.conns == nil {
      c.mu.Unlock()
      return c.Close(conn)
   }

   // 如果有请求在阻塞获取连接
   if l := len(c.connReqs); l > 0 {
      req := c.connReqs[0]
      copy(c.connReqs, c.connReqs[1:])
      c.connReqs = c.connReqs[:l-1]
      // 将连接转交
      req <- connReq{
         idleConn: &idleConn{conn: conn, t: time.Now()},
      }
      c.mu.Unlock()
      return nil
   } else {
      // 否则尝试是否能放回空闲连接队列
      select {
      case c.conns <- &idleConn{conn: conn, t: time.Now()}:
         c.mu.Unlock()
         return nil
      default:
         c.mu.Unlock()
         //连接池已满,直接关闭该连接
         return c.Close(conn)
      }
   }
}

值得注意的是:

  • put方法唤醒阻塞请求时,从队头开始唤醒,这样先阻塞的请求先被唤醒,保证了公平性

sql.DB

Go在官方库sql中就实现了连接池,这样的好处在于:

  • 对于开发:就不用像java一样,需要自己找第三方的连接池实现
  • 对于driver的实现:只用关心怎么和数据库交互,不用考虑连接池的问题

sql.DB中和连接池相关的字段如下:

type DB struct {
   /**
   ...
   */
   
   // 空闲连接队列
   freeConn     []*driverConn
   // 阻塞请求的队列
   connRequests map[uint64]chan connRequest
   
   // 已经打开的连接
   numOpen      int    // number of opened and pending open connections
   // 最大空闲连接
   maxIdle           int                    // zero means defaultMaxIdleConns; negative means 0
   // 最大连接数
   maxOpen           int                    // <= 0 means unlimited
   // ...
}

继续看获取连接:

func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn, error) {
   // 检测连接池是否被关闭
   db.mu.Lock()
   if db.closed {
      db.mu.Unlock()
      return nil, errDBClosed
   }

   select {
   default:
   // 检测ctx是否超时
   case <-ctx.Done():
      db.mu.Unlock()
      return nil, ctx.Err()
   }
   lifetime := db.maxLifetime

   
   
   db.numOpen++ // optimistically
   db.mu.Unlock()
   ci, err := db.connector.Connect(ctx)
   if err != nil {
      db.mu.Lock()
      db.numOpen-- // correct for earlier optimism
      db.maybeOpenNewConnections()
      db.mu.Unlock()
      return nil, err
   }
   db.mu.Lock()
   dc := &driverConn{
      db:        db,
      createdAt: nowFunc(),
      ci:        ci,
      inUse:     true,
   }
   db.addDepLocked(dc, dc)
   db.mu.Unlock()
   return dc, nil
}

接下来检测是否有空闲连接:

  
   numFree := len(db.freeConn)
   // 如果有空闲连接
   if strategy == cachedOrNewConn && numFree > 0 {
      // 从队头取一个
      conn := db.freeConn[0]
      copy(db.freeConn, db.freeConn[1:])
      db.freeConn = db.freeConn[:numFree-1]
      conn.inUse = true
      db.mu.Unlock()
      if conn.expired(lifetime) {
         conn.Close()
         return nil, driver.ErrBadConn
      }

      // Reset the session if required.
      if err := conn.resetSession(ctx); err == driver.ErrBadConn {
         conn.Close()
         return nil, driver.ErrBadConn
      }

      return conn, nil
   }

以上代码是1.14版本,但是到了1.18以后,获取空闲连接的方式发生了变化:


last := len(db.freeConn) - 1
if strategy == cachedOrNewConn && last >= 0 {
   // 从最后一个位置获取连接
   conn := db.freeConn[last]
   db.freeConn = db.freeConn[:last]
   conn.inUse = true
   if conn.expired(lifetime) {
      db.maxLifetimeClosed++
      db.mu.Unlock()
      conn.Close()
      return nil, driver.ErrBadConn
   }

可以看出,1.14版本从队首获取,1.18改成从队尾获取连接

为啥从队尾拿连接?

  • 因为队尾的连接是才放进去的,该连接过期概率比队首连接

继续看:

   // 如果已经达到最大连接数
   if db.maxOpen > 0 && db.numOpen >= db.maxOpen {
      req := make(chan connRequest, 1)
      reqKey := db.nextRequestKeyLocked()
      db.connRequests[reqKey] = req
      db.waitCount++
      db.mu.Unlock()

      waitStart := time.Now()
      // 阻塞当前请求,要么ctx超时,要么别人归还了连接
      select {
      case <-ctx.Done():
         db.mu.Lock()
         // 把自己从阻塞队列中删除
         delete(db.connRequests, reqKey)
         db.mu.Unlock()

         atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))

         select {
         default:
         case ret, ok := <-req:
            if ok && ret.conn != nil {
               db.putConn(ret.conn, ret.err, false)
            }
         }
         return nil, ctx.Err()
      case ret, ok := <-req:
         // 别人归还连接
         atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))

         if !ok {
            return nil, errDBClosed
         }
         if strategy == cachedOrNewConn && ret.err == nil && ret.conn.expired(lifetime) {
            ret.conn.Close()
            return nil, driver.ErrBadConn
         }
         if ret.conn == nil {
            return nil, ret.err
         }

         return ret.conn, ret.err
      }
   }

这里需要注意,在ctx超时分支中:

  1. 首先把自己从阻塞队列中删除
  2. 再检查一下req中是否有连接,如果有,将连接放回连接池

奇怪的是为啥把自己删除后,req还可能收到连接呢?

因为put连接时,会先拿出一个阻塞连接的req,如果这里删除req在put拿出req:

  • 之前:那没问题,put不可能再放该req发送连接
  • 之后:那有可能put往该req发送了连接,因此需要再检查下req中是否有连接,如果有归还

也解释了为啥阻塞队列要用map

  • 用于快速找到自己的req,并删除

最后看看put:

func (db *DB) putConnDBLocked(dc *driverConn, err error) bool {
   if db.closed {
      return false
   }
   if db.maxOpen > 0 && db.numOpen > db.maxOpen {
      return false
   }
   
   // 有阻塞的请求,转移连接
   if c := len(db.connRequests); c > 0 {
      var req chan connRequest
      var reqKey uint64
      for reqKey, req = range db.connRequests {
         break
      }
      delete(db.connRequests, reqKey) // Remove from pending requests.
      if err == nil {
         dc.inUse = true
      }
      req <- connRequest{
         conn: dc,
         err:  err,
      }
      return true
      
      
   // 判断能否放回空闲队列   
   } else if err == nil && !db.closed {
      if db.maxIdleConnsLocked() > len(db.freeConn) {
         db.freeConn = append(db.freeConn, dc)
         db.startCleanerLocked()
         return true
      }
      db.maxIdleClosed++
   }
   return false
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/8191.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

高效的实现金蝶云星空ERP与自研MES系统数据集成

一、项目背景 随着企业数字化转型的不断深入&#xff0c;数据集成变得愈发重要。金蝶云星空ERP与自研MES系统之间的数据集成是企业提高管理效率、降低运营成本的关键。为了实现这一目标&#xff0c;企业选择了轻易云数据集成平台进行数据集成。 二、项目实施过程 低耦合、高内…

二叉树的前序遍历(力扣144)

目录 题目描述&#xff1a; 解法一&#xff1a;递归法 解法二&#xff1a;迭代法 解法三&#xff1a;Morris 遍历 二叉树的前序遍历 题目描述&#xff1a; 给你二叉树的根节点 root &#xff0c;返回它节点值的 前序 遍历。 示例 1&#xff1a; 输入&#xff1a;root […

Unity反编译:AssetStudio资源浏览器及代码查看器

前言 假如你手上有Unity发布出来的exe文件、apk文件或者webGL文件&#xff0c;但就是没有工程源文件&#xff0c;那么&#xff0c;如何从这些文件里面一窥究竟呢&#xff1f;这就需要资源提取工具以及代码反编译工具&#xff01; 本文所涉软件【文中附有下载链接】&#xff1…

【接口测试工具】Eolink Apikit 快速入门教程

Eolink Apikit 下载安装【官方版】&#xff1a;https://www.eolink.com/apikit 发起 API 测试 进入 API 文档详情页&#xff0c;点击上方 测试 标签&#xff0c;进入 API 测试页&#xff0c;系统会根据 API 文档自动生成测试界面并且填充测试数据。 填写请求参数 首先填写好请…

【创作赢红包】python学习——【第七弹】

前言 上一篇文章 python学习——【第六弹】中介绍了 python中的字典操作&#xff0c;这篇文章接着学习python中的可变序列 集合 集合 1&#xff1a; 集合是python语言提供的内置数据结构&#xff0c;具有无序性&#xff08;集合中的元素无法通过索引下标访问&#xff0c;并且…

UDP协议详解

目录 UDP协议报文结构 端口号 报文长度 校验和 生成校验和的算法 MD5的特点 UDP协议报文结构 UDP会把载荷数据(也就是通过 UDP socekt,send方法拿来的数据基础上,再前面拼装(相当于字符串拼接此处是二进制的)上几个字节的报头 UDP报头里包含了一些特定的属性,这些属性携带…

阿里云linux云服务器 安装指定版本node.js

我们在实例管理中找到自己的服务器 然后点击右侧的 远程连接 接着点击理解登录 进入命令窗口 我们在这上面输入 curl -h阿里云的服务器都还是最好会有 curl的 然后 我们输入 curl -o- https://raw.githubusercontent.com/nvm-sh/nvm/v0.34.0/install.sh | bash下把nvm下下…

量化注意事项和模型设计思想

量化的注意事项 1、量化检测器时&#xff0c;尽量不要对Detect Head进行量化&#xff0c;一旦进行量化可能会引起比较大的量化误差&#xff1b; 2、量化模型时&#xff0c;模型的First&Second Layer也尽可能不进行量化&#xff08;精度损失具有随机性&#xff09;&#xf…

【软件设计师06】数据结构与算法基础

数据结构与算法基础 考点&#xff1a;数组与矩阵、线性表、广义表、树与二叉树、图、排序与查找、算法基础与常见的算法 1. 数组 数组类型存储地址计算一维度数组a[n]a[i]的存储地址为ai*len二维数组a[m][n]a[i][j]的存储地址&#xff1b;按行存储&#xff1a;a(i*nj)*len&a…

Spring原理学习(二):Bean的生命周期和Bean后处理器

〇、前言 倘若是为了面试&#xff0c;请背下来下面这段&#xff1a; spring的bean的生命周期主要是创建bean的过程&#xff0c;一个bean的生命周期主要是4个步骤&#xff1a;实例化、属性注入、初始化、销毁。但是对于一些复杂的bean的创建&#xff0c;spring会在bean的生命周期…

如何搭建chatGPT4.0模型-国内如何用chatGPT4.0

国内如何用chatGPT4.0 在国内&#xff0c;目前可以通过以下途径使用 OpenAI 的 ChatGPT 4.0&#xff1a; 自己搭建模型&#xff1a;如果您具备一定的技术能力&#xff0c;可以通过下载预训练模型和相关的开发工具包&#xff0c;自行搭建 ChatGPT 4.0 模型。OpenAI提供了相关的…

旅游心得Traveling Experience

前言 加油 原文 旅游心得常用会话 ❶ Share photos of the trip with friends. 与朋友分享旅游的照片。 ❷ We’ll go to the Great Wall, if you prefer. 你如果愿意的话,我们去长城。 ❸ Would you go to the church or the synagogue or the mosque? 你会去教堂,犹太…

二结(4.11)IO流学习

FIle类只能对文件本身操作&#xff0c;不能读写文件里面存储的数据 文件保存的位置叫路径&#xff0c;而数据传输叫IO流 Java I/O流&#xff08;Input/Output stream&#xff09;在Java应用程序中用于读取和写入数据&#xff0c;可分为基本流和高级流两类 关于什么是输出流、…

CSC中加学者交换项目申报即将开始

3月31日&#xff0c;国家留学基金委&#xff08;CSC&#xff09;发布了2023-2024年度中加学者交换项目遴选通知。根据通知精神&#xff0c;选派规模&#xff1a;100人月&#xff0c;留学及资助期限&#xff1a;4-12个月&#xff0c;网上报名及申请受理时间为2023年4月11日至6月…

SpringCloud学习6(Spring Cloud Alibaba)断路器Sentinel熔断降级

文章目录服务熔断降级Sentinel高并发请求模拟&#xff08;这里我们使用contiperf来进行测试&#xff09;修改tomcat配置最大线程数引入测试依赖编写测试代码服务雪崩服务雪崩的容错方案&#xff08;隔离、超时、限流、熔断、降级&#xff09;隔离机制&#xff1a;超时机制&…

Baumer工业相机堡盟工业相机如何设置网口的IP地址(工业相机连接的网口设置IP地址步骤)

Baumer工业相机堡盟工业相机如何设置网口的IP地址&#xff08;工业相机连接的网口设置IP地址步骤&#xff09;Baumer工业相机Baumer工业相机设置网络端口IP地址匹配设置网络端口IP地址和工业相机IP地址匹配第一次打开CameraExplorer软件确认问题为IP地址不匹配问题打开网络连接…

C++ - 继承 | 菱形继承

之前的文章中我们简要的讲述了C中继承部分的知识&#xff0c;但是还没有完全的讲完&#xff0c;在本文中将会讲到菱形继承的问题。 复杂的菱形继承 单继承&#xff1a;一个子类只有一个直接父类时称这个继承关系为单继承。 多继承&#xff1a;一个子类有两个或以上直接父类时…

最新阿里、腾讯、华为、字节等大厂的薪资和职级对比,看看你差了多少...

互联网大厂新入职员工各职级薪资对应表(技术线)~ 最新阿里、腾讯、华为、字节跳动等大厂的薪资和职级对比 上面的表格不排除有很极端的收入情况&#xff0c;但至少能囊括一部分同职级的收入。这个表是“技术线”新入职员工的职级和薪资情况&#xff0c;非技术线(如产品、运营、…

Android基础四大组件之Activity的启动过程源码解析

前言 Activity是Android中一个很重要的概念&#xff0c;堪称四大组件之首&#xff0c;关于Activity有很多内容&#xff0c;比如生命周期和启动Flags&#xff0c;这二者想要说清楚&#xff0c;恐怕又要写两篇长文&#xff0c;更何况分析它们的源码呢。不过本文的侧重点不是它们…

面试官:你可以用 for of 遍历 Object 吗?

本文以 用 for of遍历 Object 为引 来聊聊 迭代器模式。 什么是迭代器模式 迭代器模式提供一种方法顺序访问一个聚合对象中的各个元素&#xff0c;而又不暴露该对象的内部表示。 ——《设计模式&#xff1a;可复用面向对象软件的基础》 可以说迭代器模式就是为了遍历存在的。提…