golang线程池ants-实现架构

1、总体架构

ants协程池,在使用上有多种方式(使用方式参考这篇文章:golang线程池ants-四种使用方法),但是在实现的核心就一个,如下架构图:

总的来说,就是三个数据结构: Pool、WorkerStack、goWorker以及这三个结构实现的方法,了解了这些,基本上对ants的实现原理就了如指掌了。

2、详细实现

2.1 worker的设计实现

worker结构如下:

type goWorker struct {
	// pool who owns this worker.
	pool *Pool

	// task is a job should be done.
	task chan func()

	// lastUsed will be updated when putting a worker back into queue.
	lastUsed time.Time
}

该结构设计非常简单,三个成员:归属的线程池、执行函数、该worker最后一次运行时间,goWorker结构实现如下接口:

type worker interface {
	run()
	finish()
	lastUsedTime() time.Time
	inputFunc(func())
	inputParam(interface{})
}

核心函数run,该函数从管道task里获取到任务函数,并执行,执行完成后,将此worker放回协程池(此时worker阻塞等待任务到来,调用函数:w.pool.revertWorker(w)放回池子中),以便复用:

func (w *goWorker) run() {
	w.pool.addRunning(1)
	go func() {
		defer func() {
			if w.pool.addRunning(-1) == 0 && w.pool.IsClosed() {
				w.pool.once.Do(func() {
					close(w.pool.allDone)
				})
			}
			w.pool.workerCache.Put(w)
			if p := recover(); p != nil {
				if ph := w.pool.options.PanicHandler; ph != nil {
					ph(p)
				} else {
					w.pool.options.Logger.Printf("worker exits from panic: %v\n%s\n", p, debug.Stack())
				}
			}
			// Call Signal() here in case there are goroutines waiting for available workers.
			w.pool.cond.Signal()
		}()

		for f := range w.task {
			if f == nil {
				return
			}
			f()
			if ok := w.pool.revertWorker(w); !ok {
				return
			}
		}
	}()
}

finish函数,调用该函数,代表此worker的生命周期结束:

func (w *goWorker) finish() {
	w.task <- nil
}

这个时候run函数从遍历task管道中结束,进入defer函数,worker放入workerCache,备用。

inputFunc很容易理解,将任务放入管道,让worker去执行:

func (w *goWorker) inputFunc(fn func()) {
	w.task <- fn
}

2.2 workerStack结构

type workerStack struct {
	items  []worker
	expiry []worker
}

该结构就两个成员,都为worker的切片,items切片用于存储正常执行的worker,expiry存放过期的worker,workStack结构实现了如下接口:

type workerQueue interface {
	len() int
	isEmpty() bool
	insert(worker) error
	detach() worker
	refresh(duration time.Duration) []worker // clean up the stale workers and return them
	reset()
}

len函数:返回正在运行worker的长度

isEmpty函数:判断是否有正在运行的worker

insert函数:将worker插入切片。

detach函数:获取一个worker。

refresh:更新所有worker,淘汰过期worker。

reset:清除所有worker。

重点看refresh函数:

func (wq *workerStack) refresh(duration time.Duration) []worker {
	n := wq.len()
	if n == 0 {
		return nil
	}

	expiryTime := time.Now().Add(-duration)
	index := wq.binarySearch(0, n-1, expiryTime)

	wq.expiry = wq.expiry[:0]
	if index != -1 {
		wq.expiry = append(wq.expiry, wq.items[:index+1]...)
		m := copy(wq.items, wq.items[index+1:])
		for i := m; i < n; i++ {
			wq.items[i] = nil
		}
		wq.items = wq.items[:m]
	}
	return wq.expiry
}

 这个函数用于根据给定的时间间隔duration来刷新工作队列中的过期项。主要执行以下步骤:

  1. 获取队列长度:首先,通过调用wq.len()获取工作队列wq中当前元素的数量n。如果队列为空(即n == 0),则直接返回nil,表示没有过期项。

  2. 计算过期时间:通过time.Now().Add(-duration)计算出一个时间点,这个时间点是duration时间之前的时间,即认为是“过期”的时间点。

  3. 二分查找:使用wq.binarySearch(0, n-1, expiryTime)在队列中查找第一个过期项的位置(即第一个最后使用时间早于expiryTime的项)。这个函数返回一个索引,如果找到这样的项,则返回该项的索引;如果没有找到,则返回-1

  4. 清理过期项

    • 首先,清空wq.expiry切片,用它来存储所有过期的项。
    • 如果找到了过期项(即index != -1),则将wq.items中从0index(包含index)的所有项(即所有过期项)追加到wq.expiry中。
    • 然后,使用copy函数将wq.items中从index+1n-1的所有项向前移动,覆盖掉前面的过期项。这里mcopy函数返回的值,表示实际复制的元素数量,即队列中剩余的非过期项的数量。
    • 接下来,遍历wq.items中从mn-1的所有位置,将它们设置为nil。
    • 最后,通过wq.items = wq.items[:m]更新wq.items的长度,去除所有过期的项。
  5. 返回过期项:函数返回wq.expiry,这是一个包含所有被移除的过期项的切片。

需要注意的是,wq.items是一个切片,用于存储工作项;wq.expiry也是一个切片,用于临时存储过期的项。

 2.3 Pool结构

pool结构的定义源码稍作改了一下,之前poolCommon的结构就是Pool的结构,目前最新版本做了一个简单的封装。

type Pool struct {
	poolCommon
}
type poolCommon struct {
	// capacity of the pool, a negative value means that the capacity of pool is limitless, an infinite pool is used to
	// avoid potential issue of endless blocking caused by nested usage of a pool: submitting a task to pool
	// which submits a new task to the same pool.
	capacity int32

	// running is the number of the currently running goroutines.
	running int32

	// lock for protecting the worker queue.
	lock sync.Locker

	// workers is a slice that store the available workers.
	workers workerQueue

	// state is used to notice the pool to closed itself.
	state int32

	// cond for waiting to get an idle worker.
	cond *sync.Cond

	// done is used to indicate that all workers are done.
	allDone chan struct{}
	// once is used to make sure the pool is closed just once.
	once *sync.Once

	// workerCache speeds up the obtainment of a usable worker in function:retrieveWorker.
	workerCache sync.Pool

	// waiting is the number of goroutines already been blocked on pool.Submit(), protected by pool.lock
	waiting int32

	purgeDone int32
	purgeCtx  context.Context
	stopPurge context.CancelFunc

	ticktockDone int32
	ticktockCtx  context.Context
	stopTicktock context.CancelFunc

	now atomic.Value

	options *Options
}

创建一个线程池:

// NewPool instantiates a Pool with customized options.
func NewPool(size int, options ...Option) (*Pool, error) {
	if size <= 0 {
		size = -1
	}

	opts := loadOptions(options...)

	if !opts.DisablePurge {
		if expiry := opts.ExpiryDuration; expiry < 0 {
			return nil, ErrInvalidPoolExpiry
		} else if expiry == 0 {
			opts.ExpiryDuration = DefaultCleanIntervalTime
		}
	}

	if opts.Logger == nil {
		opts.Logger = defaultLogger
	}

	p := &Pool{poolCommon: poolCommon{
		capacity: int32(size),
		allDone:  make(chan struct{}),
		lock:     syncx.NewSpinLock(),
		once:     &sync.Once{},
		options:  opts,
	}}
	p.workerCache.New = func() interface{} {
		return &goWorker{
			pool: p,
			task: make(chan func(), workerChanCap),
		}
	}
	if p.options.PreAlloc {
		if size == -1 {
			return nil, ErrInvalidPreAllocSize
		}
		p.workers = newWorkerQueue(queueTypeLoopQueue, size)
	} else {
		p.workers = newWorkerQueue(queueTypeStack, 0)
	}

	p.cond = sync.NewCond(p.lock)

	p.goPurge()
	p.goTicktock()

	return p, nil
}

看如下几行代码:

	p.workerCache.New = func() interface{} {
		return &goWorker{
			pool: p,
			task: make(chan func(), workerChanCap),
		}
	}

workerCache为sync.Pool类型,sync.Pool是Go语言标准库中提供的一个对象池化的工具,旨在通过复用对象来减少内存分配的频率并降低垃圾回收的开销,从而提高程序的性能。其内部维护了一组可复用的对象。当你需要一个对象时,可以尝试从sync.Pool中获取。如果sync.Pool中有可用的对象,它将返回一个;否则,它会调用你提供的构造函数来创建一个新对象,sync.PoolNew字段是一个可选的函数,用于在池中无可用对象时创建新的对象。

这里这样写即为:当无可用的worker时,则通过New函数创建一个新的worker。

创建workder列表,内部其实就是创建了了一个切片,类型为workerStack,用于管理所有的worker。

p.workers = newWorkerQueue(queueTypeStack, 0)

NewPool函数执行完成后,一个协程池就创建完成了。

协程池创建完成后,需要用来处理任务,如何将任务函数传递到worker去执行呢?看如下函数:

// Submit submits a task to this pool.
//
// Note that you are allowed to call Pool.Submit() from the current Pool.Submit(),
// but what calls for special attention is that you will get blocked with the last
// Pool.Submit() call once the current Pool runs out of its capacity, and to avoid this,
// you should instantiate a Pool with ants.WithNonblocking(true).
func (p *Pool) Submit(task func()) error {
	if p.IsClosed() {
		return ErrPoolClosed
	}

	w, err := p.retrieveWorker()
	if w != nil {
		w.inputFunc(task)
	}
	return err
}

函数的入参为一个无返回值、无入参的函数,因此所有需要worker执行的函数都是func()类型,w, err := p.retrieveWorker(),取出一个空闲worker,取出成功后,将任务传递到worker内部:w.inputFunc(task),注意,当线程池中所有worker都忙碌时,inputFunc函数阻塞,一直到有worker空闲。

其他主要的函数,从池中获取worker的函数:

func (p *Pool) retrieveWorker() (w worker, err error) {
	p.lock.Lock()

retry:
	// First try to fetch the worker from the queue.
	if w = p.workers.detach(); w != nil {
		p.lock.Unlock()
		return
	}

	// If the worker queue is empty, and we don't run out of the pool capacity,
	// then just spawn a new worker goroutine.
	if capacity := p.Cap(); capacity == -1 || capacity > p.Running() {
		p.lock.Unlock()
		w = p.workerCache.Get().(*goWorker)
		w.run()
		return
	}

	// Bail out early if it's in nonblocking mode or the number of pending callers reaches the maximum limit value.
	if p.options.Nonblocking || (p.options.MaxBlockingTasks != 0 && p.Waiting() >= p.options.MaxBlockingTasks) {
		p.lock.Unlock()
		return nil, ErrPoolOverload
	}

	// Otherwise, we'll have to keep them blocked and wait for at least one worker to be put back into pool.
	p.addWaiting(1)
	p.cond.Wait() // block and wait for an available worker
	p.addWaiting(-1)

	if p.IsClosed() {
		p.lock.Unlock()
		return nil, ErrPoolClosed
	}

	goto retry
}

这个函数,获取worker有三个逻辑:

  • 当池中有空闲worker,直接获取。
  • 当池中没有空闲worker,从缓存workerCache中取出过期的worker使用,复用资源,降低开销。
  • 等待有worker执行完任务释放。(阻塞情况)

revertWorker,将worker放回池中,以执行下次的任务。 

func (p *Pool) revertWorker(worker *goWorker) bool {
	if capacity := p.Cap(); (capacity > 0 && p.Running() > capacity) || p.IsClosed() {
		p.cond.Broadcast()
		return false
	}

	worker.lastUsed = p.nowTime()

	p.lock.Lock()
	// To avoid memory leaks, add a double check in the lock scope.
	// Issue: https://github.com/panjf2000/ants/issues/113
	if p.IsClosed() {
		p.lock.Unlock()
		return false
	}
	if err := p.workers.insert(worker); err != nil {
		p.lock.Unlock()
		return false
	}
	// Notify the invoker stuck in 'retrieveWorker()' of there is an available worker in the worker queue.
	p.cond.Signal()
	p.lock.Unlock()

	return true
}

以上就为ants线程池实现的主要技术细节,希望对各位热爱技术的同学们提供一些些帮助。

3、总结

ants协程池是一个高性能、易用的Go语言协程池库,它通过复用goroutines、自动调度任务、定期清理过期goroutines等方式,帮助开发者更加高效地管理并发任务。无论是处理网络请求、数据处理还是其他需要高并发性能的场景,ants协程池都是一个值得推荐的选择。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/778436.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

性能测试相关理解(一)

根据学习全栈测试博主的课程做的笔记 一、说明 若未特别说明&#xff0c;涉及术语都是jmeter来说&#xff0c;线程数&#xff0c;就是jmeter线程组中的线程数 二、软件性能是什么 1、用户关注&#xff1a;响应时间 2、业务/产品关注&#xff1a;响应时间、支持多少并发数、…

Oracle 11.2.0.1升级到11.2.0.4并做rman备份异机恢复

下载好数据库升级包&#xff0c;想去Oracle官网下载的&#xff0c;提示没有授权 只能在csdn找付费的了&#xff0c;9块1个&#xff0c;下载了前2个。 注意&#xff0c;总共有7个包&#xff0c;如果Oracle是安装在linux服务器&#xff0c;且无图形界面管理的 只需要第一&#xf…

Java面试八股之如何提高MySQL的insert性能

如何提高MySQL的insert性能 提高MySQL的INSERT性能可以通过多种策略实现&#xff0c;以下是一些常见的优化技巧&#xff1a; 批量插入&#xff1a; 而不是逐条插入&#xff0c;可以使用单个INSERT语句插入多行数据。例如&#xff1a; INSERT INTO table_name (col1, col2) V…

C++笔试强训2

文章目录 一、选择题二、编程题 一、选择题 和笔试强训1的知识点考的一样&#xff0c;因为输出的是double类型所以后缀为f,m.n对其30个字符所以m是30&#xff0c;精度是4所以n是4&#xff0c;不加符号默认是右对齐&#xff0c;左对齐的话前面加-号&#xff0c;所以答案是-30.4f…

最新扣子(Coze)实战案例:使用扩图功能,让你的图任意变换,完全免费教程

&#x1f9d9;‍♂️ 大家好&#xff0c;我是斜杠君&#xff0c;手把手教你搭建扣子AI应用。 &#x1f4dc; 本教程是《AI应用开发系列教程之扣子(Coze)实战教程》&#xff0c;完全免费学习。 &#x1f440; 微信关注公从号&#xff1a;斜杠君&#xff0c;可获取完整版教程。&a…

深入探索C语言中的结构体:定义、特性与应用

&#x1f525; 个人主页&#xff1a;大耳朵土土垚 目录 结构体的介绍结构体定义结构成员的类型结构体变量的定义和初始化结构体成员的访问结构体传参 结构体的介绍 在C语言中&#xff0c;结构体是一种用户自定义的数据类型&#xff0c;它允许开发者将不同类型的变量组合在一起…

MySQL数据库树状结构查询

一、树状结构 MySQL数据库本身并不直接支持树状结构的存储&#xff0c;但它提供了足够的灵活性&#xff0c;允许我们通过不同的方法来模拟和实现树状数据结构。具体方法看下文。 数据库表结构&#xff1a; 实现效果 查询的结果像树一样 二、使用 以Catalog数据表&#xff0c…

lua入门(1) - 基本语法

本文参考自&#xff1a; Lua 基本语法 | 菜鸟教程 (runoob.com) 需要更加详细了解的还请参看lua 上方链接 交互式编程 Lua 提供了交互式编程模式。我们可以在命令行中输入程序并立即查看效果。 Lua 交互式编程模式可以通过命令 lua -i 或 lua 来启用&#xff1a; 如下图: 按…

深度解析Ubuntu版本升级:LTS版本升级指南

深度解析Ubuntu版本升级&#xff1a;Ubuntu版本生命周期及LTS版本升级指南 Ubuntu是全球最受欢迎的Linux发行版之一&#xff0c;其版本升级与维护策略直接影响了无数用户的开发和生产环境。Canonical公司为Ubuntu制定了明确的生命周期和发布节奏&#xff0c;使得社区、企业和开…

Micron近期发布了32Gb DDR5 DRAM

Micron Technology近期发布了一项内存技术的重大突破——一款32Gb DDR5 DRAM芯片&#xff0c;这项创新不仅将存储容量翻倍&#xff0c;还显著提升了针对人工智能&#xff08;AI&#xff09;、机器学习&#xff08;ML&#xff09;、高性能计算&#xff08;HPC&#xff09;以及数…

Wireshark网络抓包工具入门指南

目录 引言 安装抓包工具 抓包基础概念 抓包步骤 流程 抓包工具头的分析 14.3 以太网的完整帧格式 粘包与拆包现象解析及解决方案 发生原因 解决方案 14.3.1以太网头 14.3.2 IP头 14.3.3 UDP头 14.3.4 TCP头 引言 Wireshark是一款功能强大的开源网络协议分析器&am…

SpringMVC:SpringMVC执行流程

文章目录 一、介绍二、什么是MVC 一、介绍 Spring MVC 是一种基于Java的Web框架&#xff0c;它采用了MVC&#xff08;Model - View - Controller&#xff09;设计模式&#xff0c;通过吧Model、View和Controller分离&#xff0c;将Web层进行职责解耦&#xff0c;把复杂的Web应…

C++|哈希应用->布隆过滤器

目录 一、概念 二、模拟实现 三、布隆过滤器扩展应用 上一篇章学习了位图的使用&#xff0c;但它只适用于整数&#xff0c;对于要查询字符串是否在不在&#xff0c;位图并不能解决。所以针对这一问题&#xff0c;布隆过滤器可以派上用场&#xff0c;至于布隆过滤器是什么&am…

G2.【C语言】EasyX绘制颜色窗口

1.窗口 窗口&#xff1a;宽度*高度&#xff08;单位都是像素&#xff09; #include <stdio.h> #include <easyx.h> int main() {initgraph(640, 480);getchar();return 0; } 640是宽&#xff0c;480是高 2.操作窗口的三个按钮 #include <stdio.h> #incl…

推荐Bulk Image Downloader插件下载网页中图片链接很好用

推荐&#xff1a;Bulk Image Downloader chome浏览器插件下载图片链接&#xff0c;很好用。 有个网页&#xff0c;上面放了数千的gif的电路图&#xff0c;手工下载会累瘫了不可。想找一个工具分析它的静态链接并下载&#xff0c;找了很多推荐的下载工具&#xff0c;都是不能分…

Github:git提交代码到github

创建 GitHub 仓库 a. 登录到您的 GitHub 账户。 b. 点击右上角的 "" 图标&#xff0c;选择 "New repository"。 c. 填写仓库名称&#xff08;例如 "Mitemer"&#xff09;。 d. 添加项目描述&#xff08;可选&#xff09;。 e. 选择仓库为 &…

Feign-未完成

Feign Java中如何实现接口调用&#xff1f;即如何发起http请求 前三种方式比较麻烦&#xff0c;在发起请求前&#xff0c;需要将Java对象进行序列化转为json格式的数据&#xff0c;才能发送&#xff0c;然后进行响应时&#xff0c;还需要把json数据进行反序列化成java对象。 …

MySQL的count()方法慢

前言 mysql用count方法查全表数据&#xff0c;在不同的存储引擎里实现不同&#xff0c;myisam有专门字段记录全表的行数&#xff0c;直接读这个字段就好了。而innodb则需要一行行去算。 比如说&#xff0c;你有一张短信表(sms)&#xff0c;里面放了各种需要发送的短信信息。 …

C语言图书馆管理系统(管理员版)

案例&#xff1a;图书馆管理系统&#xff08;管理员版&#xff09; 背景&#xff1a; 随着信息技术的发展和普及&#xff0c;传统的图书馆管理方式已经无法满足现代图书馆高效、便捷、智能化的管理需求。传统的手工登记、纸质档案管理不仅耗时耗力&#xff0c;而且容易出现错…

拉普拉斯逆变换

https://www.bilibili.com/video/BV17i4y1475Y?p21&vd_source2e6b4ba548ec9462b2f9633ff700e9b9 CV 17 陈永平教授关于拉普拉斯逆变换的式子的推导 最关键的两步 想到取一个合适的contour L R L_R LR​部分是实部 γ \gamma γ要大于所有极点的实部,这样就可以搞一个大…