Go协程池gopool源码解析

1、gopool简介

Repository:https://github.com/bytedance/gopkg/tree/develop/util/gopool

gopool is a high-performance goroutine pool which aims to reuse goroutines and limit the number of goroutines.

It is an alternative to the go keyword.

gopool的用法非常简单,将曾经我们经常使用的go func(){...}替换为gopool.Go(func(){...})即可

此时gopool将会使用默认的配置来管理你启动的协程,也可以选择针对业务场景配置池子大小以及扩容上限

old:

go func() {
	// do your job
}()

new:

gopool.Go(func(){
	// do your job
})

2、核心数据结构

1)、Pool

Pool是一个定义了协程池的接口,代码如下:

// util/gopool/pool.go
type Pool interface {
	// Name returns the corresponding pool name.
	// 协程池的名称
	Name() string
	// SetCap sets the goroutine capacity of the pool.
	// 设置协程池内goroutine的容量
	SetCap(cap int32)
	// Go executes f.
	// 执行f函数
	Go(f func())
	// CtxGo executes f and accepts the context.
	// 带ctx,执行f函数
	CtxGo(ctx context.Context, f func())
	// SetPanicHandler sets the panic handler.
	// 设置发生panic时调用的函数
	SetPanicHandler(f func(context.Context, interface{}))
}

gopool提供了Pool这个接口的默认实现pool,代码如下:

// util/gopool/pool.go
type pool struct {
	// The name of the pool
	// 协程池的名字
	name string

	// capacity of the pool, the maximum number of goroutines that are actually working
	// 协程池实际工作的goroutine的最大数量
	cap int32
	// Configuration information
	// 配置信息
	config *Config
	// linked list of tasks
	// task队列的元信息,每一个task代表一个待执行的函数
	taskHead  *task
	taskTail  *task
	taskLock  sync.Mutex
	taskCount int32

	// Record the number of running workers
	// 当前有多少个worker在运行中,每个worker代表一个goroutine
	workerCount int32

	// This method will be called when the worker panic
	// 协程池中的协程引发的panic会由该函数处理
	panicHandler func(context.Context, interface{})
}

pool数据结构如下图:

2)、task
// util/gopool/pool.go
type task struct {
	// 当前task的ctx
	ctx context.Context
	// 当前task需要执行的函数f
	f func()
	// 指向下一个task的指针
	next *task
}

task是一个链表结构,可以把它理解为一个待执行的任务,包含了当前task需要执行的函数f func()以及指向下一个task的指针

一个协程池pool对应了一组task,pool维护了指向链表的头尾的两个指针:taskHead和taskTail以及链表的长度taskCount和对应的锁taskLock

3)、worker
// util/gopool/worker.go
type worker struct {
	pool *pool
}

一个worker就是逻辑上的一个执行器,它对应到一个协程池pool

当一个worker被唤起,将会开启一个goroutine,不断从pool中的task链表获取任务并执行,代码如下:

// util/gopool/worker.go
func (w *worker) run() {
	go func() {
		for {
			var t *task
			// 操作pool中的task链表前,加锁保证并发安全
			w.pool.taskLock.Lock()
			if w.pool.taskHead != nil {
				// 拿到taskHead准备执行
				t = w.pool.taskHead
				// 更新链表的head以及数量
				w.pool.taskHead = w.pool.taskHead.next
				atomic.AddInt32(&w.pool.taskCount, -1)
			}
			if t == nil {
				// if there's no task to do, exit
				// 如果前一步拿到的taskHead为空,说明无任务需要执行,清理后返回(关闭goroutine)
				w.close()
				w.pool.taskLock.Unlock()
				w.Recycle()
				return
			}
			w.pool.taskLock.Unlock()
			// 执行任务,针对panic会recover,并调用配置的handler
			func() {
				defer func() {
					if r := recover(); r != nil {
						msg := fmt.Sprintf("GOPOOL: panic in pool: %s: %v: %s", w.pool.name, r, debug.Stack())
						logger.CtxErrorf(t.ctx, msg)
						if w.pool.panicHandler != nil {
							w.pool.panicHandler(t.ctx, r)
						}
					}
				}()
				t.f()
			}()
			t.Recycle()
		}
	}()
}

3、核心API

来看下使用gopool的核心APIGo(f func()),实现如下:

// util/gopool/gopool.go
func Go(f func()) {
	CtxGo(context.Background(), f)
}

func CtxGo(ctx context.Context, f func()) {
	defaultPool.CtxGo(ctx, f)
}
// util/gopool/pool.go
func (p *pool) CtxGo(ctx context.Context, f func()) {
	// 创建一个task对象,将ctx和待执行的函数赋值
	t := taskPool.Get().(*task)
	t.ctx = ctx
	t.f = f
	// 将task插入pool的链表的尾部,更新链表数量
	p.taskLock.Lock()
	if p.taskHead == nil {
		p.taskHead = t
		p.taskTail = t
	} else {
		p.taskTail.next = t
		p.taskTail = t
	}
	p.taskLock.Unlock()
	atomic.AddInt32(&p.taskCount, 1)
	// The following two conditions are met:
	// 1. the number of tasks is greater than the threshold.
	// 2. The current number of workers is less than the upper limit p.cap.
	// or there are currently no workers.
	// 以下任意条件满足时,创建新的worker并唤起执行
	// 1.待执行的task超过了扩容阈值(默认值为1)且当前运行的worker数量小于上限(默认值为10000)
	// 2.无worker运行
	if (atomic.LoadInt32(&p.taskCount) >= p.config.ScaleThreshold && p.WorkerCount() < atomic.LoadInt32(&p.cap)) || p.WorkerCount() == 0 {
		// worker数量+1
		p.incWorkerCount()
		// 创建一个新的worker,并把当前pool赋值
		w := workerPool.Get().(*worker)
		w.pool = p
		// 唤起worker执行
		w.run()
	}
}

以下任意条件满足时,会扩容worker:

  1. 待执行的task超过了扩容阈值(默认值为1)且当前运行的worker数量小于上限(默认值为10000)
  2. 无worker运行

gopool自行维护一个defaultPool,这是一个默认的pool结构体,在引入包的时候就进行初始化。当我们直接调用gopool.Go()时,本质上是调用了defaultPool的同名方法

// util/gopool/gopool.go
var defaultPool Pool

func init() {
	defaultPool = NewPool("gopool.DefaultPool", 10000, NewConfig())
}
// util/gopool/config.go
const (
	defaultScalaThreshold = 1
)

type Config struct {
	// threshold for scale.
	// new goroutine is created if len(task chan) > ScaleThreshold.
	// defaults to defaultScalaThreshold.
	// 控制扩容的阈值,一旦待执行的task超过此值,且worker数量未达到上限,就开始启动新的worker
	ScaleThreshold int32
}

func NewConfig() *Config {
	c := &Config{
		ScaleThreshold: defaultScalaThreshold,
	}
	return c
}

defaultPool的名称为gopool.DefaultPool,池子容量一万,扩容阈值为1

当调用gopool.Go()时,gopool就会更新维护的任务链表,并且判断是否需要扩容worker:

  • 若此时已经有很多worker启动(底层一个worker对应一个goroutine),不需要扩容,就直接返回
  • 若判断需要扩容,就创建一个新的worker,并调用worker.run()方法启动,各个worker会异步地检查pool里面的任务链表是否还有待执行的任务,如果有就执行

gopool中三个角色的定位:

  • task是一个待执行的任务节点,同时还包含了指向下一个任务的指针,链表结构
  • worker是一个实际执行任务的执行器,它会异步启动一个goroutine执行协程池里面未执行的task
  • pool是一个逻辑上的协程池,对应了一个task链表,同时负责维护task状态的更新,以及在需要的时候创建新的worker

gopool核心实现原理如下图:

4、使用sync.Pool进行性能优化

gopool中多次使用了sync.Pool来池化对象的创建,复用woker和task对象

task池化:

// util/gopool/pool.go
var taskPool sync.Pool

func init() {
	taskPool.New = newTask
}

func newTask() interface{} {
	return &task{}
}

func (t *task) Recycle() {
	t.zero()
	taskPool.Put(t)
}

worker池化:

// util/gopool/worker.go
var workerPool sync.Pool

func init() {
	workerPool.New = newWorker
}

func newWorker() interface{} {
	return &worker{}
}

func (w *worker) Recycle() {
	w.zero()
	workerPool.Put(w)
}

参考:

解析 Golang 协程池 gopool 设计与实现

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/519430.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

数的基础概念

数的基本概念 结点 每一个圆圈就表示一个结点&#xff0c;每个结点里而会存放一些数据(点权值) 边 连接两个结点的一条线&#xff0c;我们认为上面的是父亲&#xff0c;下面的是儿子&#xff0c;边也可以存储权值(边权)。在树中&#xff0c;边的条数严格等于点…

探索未来智慧酒店网项目接口架构

在数字化时代&#xff0c;智慧酒店已成为酒店业发展的重要趋势之一。智慧酒店网项目接口架构作为支撑智慧酒店运营的核心技术之一&#xff0c;其设计和优化对于提升用户体验、提高管理效率具有重要意义。本文将深入探讨智慧酒店网项目接口架构的设计理念和关键要素。 ### 智慧…

备战蓝桥杯---线段树应用2

来几个不那么模板的题&#xff1a; 对于删除&#xff0c;我们只要给那个元素附上不可能的值即可&#xff0c;关键问题是怎么处理序号变化的问题。 事实上&#xff0c;当我们知道每一个区间有几个元素&#xff0c;我们就可以确定出它的位置&#xff0c;因此我们可以再维护一下前…

Unity 代码控制播放序列帧动画的实现

在Unity中有些应用场景的动画是通过序列帧实现的。 如下图即为一个英雄攻击的一个动画&#xff1a; 那么如何实现该序列帧动画的播放呢&#xff0c;我们可以通过代码来控制播放。 1、把以上序列帧导入编辑器中&#xff0c;并修改图片属性&#xff0c;如下图所示&#xff0c;其…

利用Idea实现Ajax登录(maven工程)

一、新建一个maven工程&#xff08;不会建的小伙伴可以参考Idea引入maven工程依赖(保姆级)-CSDN博客&#xff09;&#xff0c;工程目录如图 ​​​​​​​ js文件可以上up网盘提取 链接&#xff1a;https://pan.baidu.com/s/1yOFtiZBWGJY64fa2tM9CYg?pwd5555 提取码&…

UI自动化测试重点思考(上)--元素定位/验证码/测试框架

UI自动化测试重点思考--元素定位 Selenium定位元素selenium中如何判断元素是否存在&#xff1f;定位页面元素webdriver打开页面id定位name定位class_name定位tag_name 定位xpath定位css_selector定位link_text 定位partial_link 定位总结 selenium中元素定位的难点&#xff1f;…

K8S - Deployment 的版本回滚

当前状态 先看deployment rootk8s-master:~# kubectl get deploy -o wide --show-labels NAME READY UP-TO-DATE AVAILABLE AGE CONTAINERS IMAGES …

武汉星起航:深耕全球市场,拓展国际影响力,共赢未来跨境新篇章

在瞬息万变的跨境电商领域&#xff0c;武汉星起航凭借其敏锐的创新意识和卓越的执行力&#xff0c;为行业注入了新的活力。作为一家追求卓越、勇于创新的企业&#xff0c;武汉星起航深知创新是成功的关键。公司通过不断探索新的商业模式、引入先进技术和优化运营流程&#xff0…

【pycharm】在debug循环时,如何快速debug到指定循环次数

【pycharm】在debug循环时&#xff0c;如何快速debug到指定循环次数 【先赞后看养成习惯】求关注收藏点赞&#x1f600; 在 PyCharm 中&#xff0c;可以使用条件断点来实现在特定循环次数后停止调试。这可以通过在断点处右键单击&#xff0c;然后选择 “Add Breakpoint” -&g…

【精品教程】护网HVV实战教程资料合集(持续更新,共20节)

以下是资料目录&#xff0c;如需下载&#xff0c;请前往星球获取&#xff1a; 01-HW介绍.zip 02-HTTP&Burp课程资料.zip 03-信息收集_3.zip 04-SQL注入漏洞_2.zip 05-命令执行漏洞.zip 06-XSS漏洞.zip 07-CSRF.zip 08-中间件漏洞.zip 09-SSRF.zip 10-XXE.zip 11-Java反序列…

2024 最新版 Proteus 8.17 安装汉化教程

前言 大家好&#xff0c;我是梁国庆。 今天给大家带来的是目前 Proteus 的最新版本——Proteus 8.17。 时间&#xff1a;2024年4月4日 获取 Proteus 安装包 我已将本篇所使用的安装包打包上传至百度云&#xff0c;扫描下方二维码关注「main工作室」&#xff0c;后台回复【…

Netty客户端发送数据给服务器的两个通道(1)

EventLoopGroup group new NioEventLoopGroup();// 设置的连接group。 Bootstrap bootstrap new Bootstrap().group(group).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000) // 超时时间。 .channel(NioSocketChannel.class).handler(new ChannelInitializer() { Ov…

【stm32】I2C通信协议

【stm32】I2C通信协议 概念及原理 如果我们想要读写寄存器来控制硬件电路&#xff0c;就至少需要定义两个字节数据 一个字节是我们要读写哪个寄存器&#xff0c;也就是指定寄存器的地址 另一个字节就是这个地址下存储寄存器的内容 写入内容就是控制电路&#xff0c;读出内容就…

vue 浅解watch cli computed props ref vue slot axios nexttick devtools说明使用

Vue.js 是一个强大的前端框架&#xff0c;它提供了很多有用的功能和工具。你提到的这些特性&#xff08;watch、cli、computed、props、ref、slot、axios、nextTick、devtools&#xff09;在 Vue 中各自扮演着不同的角色。下面我会逐一解释这些特性如何在 Vue 中使用&#xff1…

蓝桥杯每日一题:约数个数(质因数)

题目描述&#xff1a; 输入 n 个整数&#xff0c;依次输出每个数的约数的个数。 输入格式 第一行包含整数 n。 第二行包含 n 个整数 ai。 输出格式 共 n 行&#xff0c;按顺序每行输出一个给定整数的约数的个数。 数据范围 1≤n≤1000, 1≤ai≤10^9 输入样例&#xff…

分布式任务调度框架XXL-JOB

1、概述 XXL-JOB是一个分布式任务调度平台&#xff0c;其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线&#xff0c;开箱即用。 官方文档&#xff1a;https://www.xuxueli.com/xxl-job/#%E4%BA%8C%E3%80%81%E5%BF%AB%E9%80%9F%E…

c# wpf Template ContentTemplate

1.概要 1.1 定义内容的外观 2.2 要点分析 2.代码 <Window x:Class"WpfApp2.Window1"xmlns"http://schemas.microsoft.com/winfx/2006/xaml/presentation"xmlns:x"http://schemas.microsoft.com/winfx/2006/xaml"xmlns:d"http://schem…

深入浅出 -- 系统架构之分布式系统底层的一致性

在分布式领域里&#xff0c;一致性成为了炙手可热的名词&#xff0c;缓存、数据库、消息中间件、文件系统、业务系统……&#xff0c;各类分布式场景中都有它的身影&#xff0c;因此&#xff0c;想要更好的理解分布式系统&#xff0c;必须要理解“一致性”这个概念。 其实关于…

Day83:服务攻防-开发组件安全JacksonFastJson各版本XStreamCVE环境复现

目录 J2EE-组件Jackson-本地demo&CVE 代码执行 (CVE-2020-8840) 代码执行 (CVE-2020-35728&#xff09; J2EE-组件FastJson-本地demo&CVE FastJson < 1.2.24 FastJson < 1.2.47 FastJson < 1.2.80 (利用条件比较苛刻) J2EE-组件XStream-靶场&CVE …

redis进阶入门配置与持久化

一、Redis.conf详解 容量单位 1、配置大小单位&#xff0c;开头定义了一些基本的度量单位&#xff0c;只支持bytes&#xff0c;不支持bit,不区分大小写&#xff0c;G和GB有区别 2、对 大小写 不敏感 可以使用 include 组合多个配置问题 网络配置 bind 127.0.0.1 # 绑定的i…