深入理解 Go 语言 Goroutine 的工作原理

一、设计思路

1、设计描述
  • 启动服务之时先初始化一个 Goroutine Pool 池,这个 Pool 维护了一个类似栈的 LIFO 队列,里面存放负责处理任务的 Worker
  • 然后在 client 端提交 task 到 Pool 中之后,在 Pool 内部,接收 task 之后的核心操作是
    • 检查当前 Worker 队列中是否有可用的 Worker,如果有,取出执行当前的 task
    • 没有可用的 Worker,判断当前在运行的 Worker 是否已超过该 Pool 的容量
    • 每个 Worker 执行完任务之后,放回 Pool 的队列中等待

2、Pool struct 
type sig struct{}

type f func() error

// pool从客户端获取任务,它限制 goroutines 总数,并且回收再使用
type Pool struct {
	capacity int32                   // 协程池容量
	running int32                    // 正在运行的goroutine数量
	expiryDuration time.Duration     // 为每个worker设置一个过期时间
	workers []*Worker                // 存放空闲 worker,请求进入 Pool先检查workers若有则取出绑定任务执行
	release chan sig                 // 当关闭该 Pool 支持通知所有 worker 退出运行以防 goroutine 泄露
	lock sync.Mutex                  // 同步操作锁
	once sync.Once                   // 确保 Pool 关闭操作只会执行一次
}
3、初始化 Pool 并启动定期清理过期 worker 任务
//新建一个线程池实例
func NewPool(size int) (*Pool,error) {
    return NewTimingPool(size,DefaultCleanIntervalTime)
}

//产生一个带有自定义定时器的线程池实例
func NewTimingPool(size, expiry int) (*Pool,error) {
    if size <= 0{
        return nil,ErrInvalidPoolSize
    }
    if expiry <= {
        return nil,ErrInvalidPoolExpiry
    }

   p := &Pool{
		capacity:       int32(size),
		freeSignal:     make(chan sig, math.MaxInt32),
		release:        make(chan sig, 1),
		expiryDuration: time.Duration(expiry) * time.Second,
	}
	// 启动定期清理过期worker任务,独立goroutine运行,进一步节省系统资源
	p.monitorAndClear()
	return p, nil
}
4、提交任务到 Pool
  • 第一个 if 判断当前 Pool 是否已被关闭,若是则不再接受新任务,否则获取一个 Pool 中可用的 worker,绑定该 task 执行
func (p *Pool) Submit(task f) error {
	if len(p.release) > 0 {
		return ErrPoolClosed
	}
	w := p.getWorker()
	w.task <- task
	return nil
}
5、获取可用 worker(核心)
  • p.getWorker()源码
//返回一个可用的 worker 来运行这些任务
func (p *Pool) getWorker() *Worker {
    var w *Worker
    waiting :=false    //标志变量,判断当前正在运行的 worker 数量是否已到达 Pool 的容量上限
    p.lock。Lock()    //加锁,检测队列中是否有可用 worker,并进行相应操作
 idleWorkers := p.workers
	n := len(idleWorkers) - 1
	if n < 0 {	 // 当前队列中无可用worker
		// 判断运行worker数目已达到该Pool的容量上限,置等待标志
		waiting = p.Running() >= p.Cap()
	} else {		// 当前队列有可用worker,从队列尾部取出一个使用
		w = idleWorkers[n]
		idleWorkers[n] = nil
		p.workers = idleWorkers[:n]
	}

	p.lock.Unlock()	  // 检测完成,解锁
	if waiting {	 // Pool容量已满,新请求等待
		for {		 // 利用锁阻塞等待直到有空闲worker
			p.lock.Lock()
			idleWorkers = p.workers
			l := len(idleWorkers) - 1
			if l < 0 {
				p.lock.Unlock()
				continue
			}
			w = idleWorkers[l]
			idleWorkers[l] = nil
			p.workers = idleWorkers[:l]
			p.lock.Unlock()
			break
		}
		// 当前无空闲worker但是Pool还没有满,则可以直接新开一个worker执行任务
	} else if w == nil {
		w = &Worker{
			pool: p,
			task: make(chan f, 1),
		}
		w.run()
		// 运行worker数加一
		p.incRunning()
	}
	return w
}
6、执行任务
  • 结合前面的 p.Submit(task f) 和 p.getWorker() ,提交任务到 Pool 之后,获取一个可用 worker
  • 每新建一个 worker 实例之时都需要调用 w.run() 启动一个 goroutine 监听 worker 的任务列表 task ,一有任务提交进来就执行
  • 所以,当调用 worker 的 sendTask(task f) 方法提交任务到 worker 的任务队列之后,马上就可以被接收并执行
  • 当任务执行完之后,会调用 w.pool.putWorker(w *Worker) 方法将这个已经执行完任务的 worker 从当前任务解绑放回 Pool 中,以供下个任务可以使用
  • 至此,一个任务从提交到完成的过程就此结束,Pool 调度将进入下一个循环。
// Worker是运行任务的实际执行者,它启动一个接受任务并执行函数调用的goroutine
type Worker struct {
	pool *Pool   // 每个pool对应一个worker
	task chan f  // 任务是一项应该完成的工作
	recycleTime time.Time	 // 当将一个worker放回队列时,recycleTime将被更新。
}

// Run启动一个goroutine以重复执行函数调用的过程
func (w *Worker) run() {
	go func() {
		// 循环监听任务列表,一旦有任务立马取出运行
		for f := range w.task {
			if f == nil {
				// 退出goroutine,运行worker数减一
				w.pool.decRunning()
				return
			}
			f()
			// worker回收复用
			w.pool.putWorker(w)
		}
	}()
}
7、worker回收(goroutine 复用)
// putWorker将一个worker放回空闲池,回收goroutines
func (p *Pool) putWorker(worker *Worker) {
	// 写入回收时间,亦即该worker的最后一次结束运行的时间
	worker.recycleTime = time.Now()
	p.lock.Lock()
	p.workers = append(p.workers, worker)
	p.lock.Unlock()
}
8、动态扩容或者缩小池容量
// ReSize更改此池的容量
func (p *Pool) ReSize(size int) {
	if size == p.Cap() {
		return
	}
	atomic.StoreInt32(&p.capacity, int32(size))
	diff := p.Running() - size
	if diff > 0 {
		for i := 0; i < diff; i++ {
			p.getWorker().task <- nil
		}
	}
}
9、定期清理过期 Worker
  • 定期检查空闲 worker 队列中是否有已过期的 worker 并清理
  • 因为采用了 LIFO 后进先出队列存放空闲 worker,所以该队列默认已经是按照 worker 的最后运行时间由远及近排序
  • 可以方便地按顺序取出空闲队列中的每个 worker 并判断它们的最后运行时间与当前时间之差是否超过设置的过期时长
  • 若是,则清理掉该 goroutine,释放该 worker,并且将剩下的未过期 worker 重新分配到当前 Pool 的空闲 worker 队列中,进一步节省系统资源
//  定期清理过期 Worker
func (p *Pool) periodicallyPurge() {
	heartbeat := time.NewTicker(p.expiryDuration)
	for range heartbeat.C {
		currentTime := time.Now()
		p.lock.Lock()
		idleWorkers := p.workers
		if len(idleWorkers) == 0 && p.Running() == 0 && len(p.release) > 0 {
			p.lock.Unlock()
			return
		}
		n := 0
		for i, w := range idleWorkers {
			if currentTime.Sub(w.recycleTime) <= p.expiryDuration {
				break
			}
			n = i
			w.task <- nil
			idleWorkers[i] = nil
		}
		n++
		if n >= len(idleWorkers) {
			p.workers = idleWorkers[:0]
		} else {
			p.workers = idleWorkers[n:]
		}
		p.lock.Unlock()
	}
}

二、pool 使用

1、公共池
package main

import (
	"fmt"
	"sync"
	"time"

	"github.com/panjf2000/ants/v2"
)

func demoFunc() {
	time.Sleep(10 * time.Millisecond)
	fmt.Println("Hello World!")
}

func main() {
	// 在retrieveWorker()中可能有一些调用者在等待,因此我们需要唤醒它们来防止那些无限阻塞的调用者
	defer ants.Release()
	var wg sync.WaitGroup
	syncCalculateSum := func() {
		demoFunc()
		wg.Done()
	}
	for i := 0; i < 1000; i++ {
		wg.Add(1)
		_ = ants.Submit(syncCalculateSum)
	}
	wg.Wait()
	fmt.Printf("running goroutines: %d\n", ants.Running())
	fmt.Printf("finish all tasks.\n")
}

/*
	Hello World!
	Hello World!
	running goroutines: 1000
	finish all tasks.
 */
2、方法绑定池
package main

import (
	"fmt"
	"github.com/panjf2000/ants/v2"
	"sync"
)

func myFunc(i interface{}) {
	fmt.Printf("run with %d\n", i)
}

func main() {
	defer ants.Release()
	var wg sync.WaitGroup
	// 使用池和函数,设置goroutine pool的容量为10,超时时间为1秒。
	p, _ := ants.NewPoolWithFunc(10, func(i interface{}) {
		myFunc(i)
		wg.Done()
	})
	defer p.Release()
	// 逐个提交任务
	for i := 0; i < 1000; i++ {
		wg.Add(1)
		_ = p.Invoke(int32(i))
	}
	wg.Wait()
	fmt.Printf("running goroutines: %d\n", p.Running())
}

/*
run with 976
run with 990
run with 971
running goroutines: 10
 */

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/245557.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【计算机组成体系结构】只读存储器ROM

一、ROM分类 二、计算机中重要的ROM 运行时操作系统在主存中&#xff0c;但是由于RAM断电后数据会丢失&#xff0c;所以操作系统都存储在辅存中&#xff0c;在开机时由CPU读入主存&#xff0c;而BIOS芯片就是用来存储自举装入程序的&#xff0c;它用于开机时引导把操作系统装入…

C#中简单的继承和多态

今天我们来聊一聊继承&#xff0c;说实话今天也是我第一次接触。 继承的概念是什么呢&#xff1f;就是一个类可以继承另一个类的属性和方法&#xff08;成员&#xff09; 继承是面向对象编程中的一个非常重要的特性。 好了&#xff0c;废话不多说&#xff0c;下面切入正题&a…

ArrayList与顺序表(带完整实例)

【本节目标】 1. 线性表 2. 顺序表 3. ArrayList的简介 4. ArrayList使用 5. ArrayList的扩容机制 6. 扑克牌 1.线性表 线性表&#xff08;linear list&#xff09;是n个具有相同特性的数据元素的有限序列。 线性表是一种在实际中广泛使用的数据结构&#xff0c;常见的线性表…

输入框内容部分不可修改el-input部分动态内容不可修改完整代码 省市区联动,详细地址中省市区选择后不可更改

<tr><e-td :required"!read" label><span>地区&#xff1a;</span></e-td><td><divclass"table-cell-flex"style"width:450px"v-if"!read && this.data.nationCode 148"><el-f…

【ECharts】从零实现echarts地图完整代码(纯前端,包含地图资源)

最终效果 标题环境搭建 这里忽略创建vue项目的操作过程&#xff0c;请自行搭建 vue2 项目、less 环境 安装下载 echarts 这里我们选择npm下载 npm install echarts安装成功后&#xff0c;在 main.js 中把echarts配置到this上 // 引入 echarts import * as Echarts from ech…

【每日一题】用邮票贴满网格图

文章目录 Tag题目来源题目解读解题思路方法一&#xff1a;二维前缀和二维差分 写在最后 Tag 【二维前缀和】【二维差分】【矩阵】【2023-12-14】 题目来源 2132. 用邮票贴满网格图 题目解读 在 01 矩阵中&#xff0c;判断是否可以用给定尺寸的邮票将所有 0 位置都覆盖住&…

JDK安装测试记录

jdk安装测试记录 1.下载JDK2.安装3.适配环境变量4.测试 1.下载JDK 写这篇主要是记录以后自己安装方便&#xff0c;做个记录 Oracle官网 本地下载 需要注然后才能下载: 2.安装 选择默认或者自定义的路径: 后面JRE也可重新定义到自定义的文件夹&#xff1a; 3.适配环境变量 …

Python接口测试环境搭建过程详解

环境搭建 python 安装&#xff1a;建议使用python3.7 pycharm安装 requests安装 &#xff1a;pip3 install requests requests 基本使用 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 usage: >>> import requests >>> r requests.get…

Python爬虫之Cookie 与 Session 的区别

文章目录 一、 含义二、有效时长&#xff1a;三、面试中可能会遇到的问题点四、在反爬技术中的应用关于Python技术储备一、Python所有方向的学习路线二、Python基础学习视频三、精品Python学习书籍四、Python工具包项目源码合集①Python工具包②Python实战案例③Python小游戏源…

ThinkPHP连接ORACLE数据库教程

目录 概念基本步骤详细操作问题排除参考 概念 要连接Oracle数据库&#xff0c;必须有两个东西&#xff0c;一个PHP官方写的扩展&#xff0c;一个Oracle官方写的客户端PHP是通过扩展去操作oralce客户端连接的服务端数据库&#xff0c;所以两个都不能少&#xff0c;而且版本必须…

ArcGIS导入excel中的经纬度信息,绘制矢量

1.首先整理坐标信息 2.其次转成2003格式的excel文件 3.导入arcgis&#xff0c;点击右键添加excel数据 4.显示xy数据 5.显示经度和纬度信息 6&#xff1a;点击【地理坐标系】->【World】->【WGS 1984】->【确定】 7.投影带的确定方式&#xff1a; 因为自己一直…

字节跳动面经题

字节跳动面经题 1、了解anchor-free? "Anchor-free"是一个指向一类目标检测方法的术语&#xff0c;与传统的"anchor-based"方法相对应。在传统的目标检测中&#xff0c;通常会使用一系列预定义的锚框&#xff08;anchors&#xff09;作为模型的基础。这些…

verilog基本语法-时序逻辑基础-记忆单元

概述: 组合逻辑虽然可以构造各种功能电路&#xff0c;但是他有一个缺点就是输入改变时&#xff0c;输出会立即发生改变。因此历史信息不能被保存下来。两个能够保存信息的存储单元被设计出来&#xff0c;用于保存历史信息。一个是锁存器&#xff0c;另外一个是触发器。锁存器是…

(基础篇)通过node增删改查连接mysql数据库

一定要会最基础的sql建表一定要会最基础的sql建表一定要会最基础的sql建表 首先说一下准备工作 一、准备工具 1.mysql数据库Navicat可视化工具&#xff08;数据库表单已经建好&#xff09; 我这里用的小皮工具直接开启的本地mysql 2.vscode (不用说基本上都有) 3.node.js …

仿牛客论坛的一些细节改进

私信列表的会话头像链接到个人主页 原来的不足 点击私信列表的会话头像应该要能跳转到该目标对象的个人主页。 原来的代码&#xff1a; <a href"profile.html"><img th:src"${map.target.headerUrl}" class"mr-4 rounded-circle user-he…

智能优化算法应用:基于黄金正弦算法3D无线传感器网络(WSN)覆盖优化 - 附代码

智能优化算法应用&#xff1a;基于黄金正弦算法3D无线传感器网络(WSN)覆盖优化 - 附代码 文章目录 智能优化算法应用&#xff1a;基于黄金正弦算法3D无线传感器网络(WSN)覆盖优化 - 附代码1.无线传感网络节点模型2.覆盖数学模型及分析3.黄金正弦算法4.实验参数设定5.算法结果6.…

web前端实现LED功能、液晶显示时间、数字

MENU 效果演示html部分JavaScript部分css部分 效果演示 html部分 <div id"app"><!-- 页面 --><div class"time-box"><!-- 时 --><div class"house-box"><bit-component :num"houseTem"></bit…

【MODBUS】Modbus是什么?

Modbus协议&#xff0c;从字面理解它包括Mod和Bus两部分&#xff0c;首先它是一种bus&#xff0c;即总线协议&#xff0c;和12C、SP|类似&#xff0c;总线就意味着有主机&#xff0c;有从机&#xff0c;这些设备在同一条总线上。 Modbus支持单主机&#xff0c;多个从机&#xf…

Colorful Grid Codeforces Round 910 (Div. 2) C

Problem - C - Codeforces 题目大意&#xff1a;有一个n*m的网格&#xff0c;要求从(1,1)走到(n,m)&#xff0c;同时要求路径的长度必须为k1&#xff0c;然后给每个两点之间的路径染成红色或蓝色&#xff0c;要求任意两个相邻线段颜色不能相同&#xff0c;求涂色的方案 3<…

将程序注册为系统服务

cmd中执行命令&#xff1a; sc create Redis binpath "C:\guet_run1\Redis-x64-5.0.14.1\redis-server.exe" type own start auto displayname "Redis"注意&#xff0c;命令中所有的等号和值之间需要一个空格&#xff08;等号前不要空格&#xff0c;等号后…