参考:逐步学习Go-协程goroutine – FOF编程网
什么是线程?
简单来说线程就是现代操作系统使用CPU的基本单元。线程基本包括了线程ID,程序计数器,寄存器和线程栈。线程共享进程的代码区,数据区和操作系统的资源。
线程为什么很“重”
因为线程会有很多上下文,操作系统需要调度线程执行不能让一个线程执行完才执行另一个线程(那其他线程就“饿死”了)。
线程调度就涉及线程切换:停止当前正在运行的线程,保存线程的状态(上下文),选择另一个线程并加载这个上下文并执行这个线程。线程切换比较耗时因为内核或者操作系统级别的线程有很多上下文,主要涉及的切换有:
- 程序计数器
- 寄存器
- CPU缓存
- CPU调度
- 线程状态管理
- …
所以线程切换比较耗时。
什么是协程?
协程并不是一个新概念了,协程已经被很多语言使用了,比如:Java 19的VirtualThread,Python的asyncio, JavaScript ES6中的async/await, C#, Kotlin,…
协程就是轻量级线程,协程和操作系统的重量级线程的关系是M:N,一般M\<N。减少调度和切换开销。
协程还有什么优势?
- 内存占用小,据Go说,Go创建一个协程只需要2KB内存
- 切换成本低,线程切换只涉及用户程序的调度,不涉及线程哪些切换的内容,所以很快
- 创建销毁快,用户程序创建和销毁,所以很快
协程和线程的映射关系
我们可以把线程是协程的CPU,协程需要执行需要调度到某个线程上执行。
协程最终还是使用线程来执行,所以协程需要对应一个线程来执行自己的代码,那么这个映射关系是什么?
- 一对一
- 一对多
- 多对一
- 多对多
一对一
如何来理解一对一关系?我觉得这是在某一时刻,一个协程都由一个线程来管理和执行。
一对多
如果理解一对多关系?我觉得这是在一个时间段内,一个协程可能会被调度到多个线程上执行,但是在某一个时间点一个协程不会被调度到两个或者更多线程执行。
多对一
如何理解多对一关系?我觉得是多个协程在一个时间段内会被调度到同一个线程执行。
多对多
协程运行时是M:N模型,就是M个协程映射到N个线程上。
Go中的协程
进入正题,Go中提供了协程模型和API,没有可以直接操作的线程模型和API。
Go的协程特性
Go的协程遵守我们上面提到的协程特性:
- 轻量级
- 并发执行
- 异步执行
- 复用:这个复用指的是复用操作系统线程
- 协程之间通过Channel通信和同步
- 非抢占式调度:Go的协程调度器使用的是非抢占式调度模型,这就表示协程在运行期间是不可中断的,只有协程自己让出CPU,比如:协程休眠,I/O之类的操作协程才会让出CPU
- 高效上下文切换
- 优雅关闭
- 不阻塞主线程,主线程退出,协程也会退出
环境
我们使用go testing和testify来编写测试用例进行协程特性演示。
testify直接使用go get安装就可以了。
go get github.com/stretchr/testify
COPY
这是import的模块:
import (
"fmt"
"runtime"
"testing"
"github.com/stretchr/testify/assert"
)
COPY
创建协程
go中创建协程不需要写接口,不需要写struct,只需要一个go
关键字+执行函数就可以了。
- go+标准函数
- go+闭包/匿名函数
- go+方法(struct)
- interface{}+反射
- 如有其他方式,请留言告知
go+标准函数创建协程
我们先来创建一个Go函数,参数传入一个channel方便我们对channel进行同步控制:
// 标准Go函数
func standardFunc(ch chan bool) {
println("Hello, Standard Function Go Routine")
ch <- true
}
COPY
我们来创建一个Go协程来执行这个标准函数:
// 标准函数创建协程
func TestRoutine_ShouldSuccess_WhenCreateWithStandardFunction(t *testing.T) {
ch := make(chan bool)
// func为标准函数
go standardFunc(ch)
ret := <-ch
assert.True(t, ret)
}
COPY
执行截图:
go+闭包/匿名函数创建协程
这种方式比较方便:
// 闭包/匿名函数创建协程
func TestRoutine_ShouldSuccess_WhenCreateWithAnonymousFunction(t *testing.T) {
ch := make(chan bool)
// func为闭包/匿名函数
go func() {
println("Hello, Anonymous Function Go Routine")
ch <- true
}()
ret := <-ch
assert.True(t, ret)
}
COPY
执行截图:
go+方法(struct)创建协程
我们先定义一个struct,struct有一个channel方便我们进行等待协程执行完成:
type s struct {
ch chan bool
}
COPY
我们定义两个方法:run和wait,run来执行业务,wait等待run执行完成:
type s struct {
ch chan bool
}
func (s *s) run() {
println("Hello, Struct Method Go Routine")
s.ch <- true
}
func (s *s) wait() {
<-s.ch
}
COPY
我们来创建一个协程执行我们的run方法和wait方法:
func TestRoutine_ShouldSuccess_whenCreateWithStructMethod(t *testing.T) {
// 定义struct变量
s := &s{
ch: make(chan bool),
}
// 创建协程
go s.run()
// 等待执行完成
s.wait()
}
}
COPY
运行截图:
interface{}+反射创建协程
我觉得这种方式超级复杂,但是实际业务场景中也特别有用。相当于你可以开发一个调度器,别人提交任务和任务的参数给你,你来控制怎么来调度。
看代码:
我们先定义一个调度函数,参数f是函数,args是f的参数。
func scheduleFunc(wg *sync.WaitGroup, f interface{}, args ...interface{}) {
// 通过反射获取函数的定义
funcVal := reflect.ValueOf(f)
// 然后获取函数的参数
// 使用循环把参数加入到slice中
in := make([]reflect.Value, len(args))
for k, param := range args {
in[k] = reflect.ValueOf(param)
}
wg.Add(1)
// 创建调用函数
// 我们这儿用匿名函数包装了一下
go func() {
defer wg.Done()
funcVal.Call(in)
}()
}
COPY
然后我们定义两个任务函数, task1和task2:
func task1(a string) {
fmt.Printf("Hello: %s\n", a)
}
func task2(a, b string) {
fmt.Printf("Hello: %s-%s\n", a, b)
}
COPY
最后我们来测试一下:
func TestRoutine_ShouldSuccess_whenCreateWithReflect(t *testing.T) {
var wg sync.WaitGroup // 创建一个 WaitGroup
scheduleFunc(&wg, task1, "Hello, goroutine!")
scheduleFunc(&wg, task2, "Hello", "goroutine!")
wg.Wait() // 等待所有 goroutine 结束
}
COPY
运行截图:
package main
import (
"fmt"
"reflect"
)
func worker(data []interface{}) {
funcName := data[0].(string)
funcArgs := data[1:] // Function or method arguments
funcValue := reflect.ValueOf(funcMap[funcName])
funcArgsValues := make([]reflect.Value, len(funcArgs))
for i, arg := range funcArgs {
funcArgsValues[i] = reflect.ValueOf(arg)
}
go funcValue.Call(funcArgsValues)
}
var funcMap = map[string]interface{}{
"printFunc": printFunc,
"printSum": printSum,
}
func printFunc(s string) {
fmt.Println(s)
}
func printSum(a, b int) {
fmt.Println(a + b)
}
func main() {
worker([]interface{}{"printFunc", "Hello, World!"})
worker([]interface{}{"printSum", 1, 2})
// Sleep to wait for goroutines to finish
for {}
}
COPY
设置线程和协程的数量对应关系
默认线程数量:
// 获取Go协程使用的线程数量
func TestGPROC_ShouldReturnDefaultNumer_WhenNotSetProcNumber(t *testing.T) {
// 如果GOMAXPROCS()的参数为0则是获取线程数量,大于0就是设置线程数量
procnum := runtime.GOMAXPROCS(0)
fmt.Printf("default proc number: %d\n", procnum)
}
COPY
设置线程数量
使用代码设置线程数需要使用runtime.GOMAXPROCS设置线程数量:
// 获取Go协程使用的线程数量
func TestGPROC_ShouldReturnSpecificNumer_WhenSetProcNumber(t *testing.T) {
specnum := 4
// 设置线程数量为4,
// 如果GOMAXPROCS()的参数为0则是获取线程数量
runtime.GOMAXPROCS(specnum)
fmt.Printf("set proc number: %d\n", specnum)
assert.Equal(t, specnum, runtime.GOMAXPROCS(0))
}
COPY
环境变量设置
在程序启动前设置环境变量GOMAXPROCS就可以了。
export GOMAXPROCS=4
COPY
关闭协程
- 自行结束
- 手动取消
自行结束
这个和线程类似,协程执行完了就退出了,我们上面的例子都是协程执行完了自动退出。
手动取消
手动取消就需要增加控制机制了,我们来列两个手动取消的例子:
- context传递取消信号
- channel发送取消信号
我们先来定义一个后台任务,这个后台任务每个一秒钟打印一条:“Hello background task”
// 不用太关注api和语法,只需要知道每个一秒钟打印"Hello background task"
func backgroundTask(ctx context.Context) {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done(): // 接收到取消信号,结束 goroutine
return
case <-ticker.C: // 每次 ticker 到时,打印一条消息
println("Hello background task")
}
}
}
COPY
context传递取消信号
直接上代码:
func TestRoutine_ShouldStop_whenSendCancelWithContext(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
go backgroundTask(ctx)
// 让 协程 运行一段时间
time.Sleep(time.Second * 5)
// 发送取消信号
cancel()
// 给协程留一点时间处理信号
time.Sleep(time.Second * 2)
}
COPY
运行截图:
channel发送取消信号
直接上代码:
func signaltask(ch chan bool) {
for {
select {
// 接收到取消信号,结束协程
case <-ch:
return
// 没有接收到取消信号,打印一条消息
default:
println("Hello signal task")
time.Sleep(time.Second * 1)
}
}
}
func TestRoutine_ShouldStop_WhenSendCancelSignal(t *testing.T) {
ch := make(chan bool)
go signaltask(ch)
// 让协程运行5秒钟
time.Sleep(time.Second * 5)
// 发送取消信号
ch <- true
// 给协程留一点时间处理信号
time.Sleep(time.Second * 2)
}
COPY
运行截图: