文章目录
- 前言
- 协程
- goroutine 调度
- 使用 goroutine
- 通道
- 无缓冲通道
- 有缓冲通道
- 单向通道
- select 多路复用
- sync
- sync.WaitGroup
- sync.Mutex
- sync.RWMutex
- sync.Once
- sync.Map
项目代码地址:05-GoroutineChannelSync
前言
Go 1.22 版本于不久前推出,更新的新特性可以参考官文。从此篇章开始,后续 go 版本更为 1.22.0 及以上,自行官网下载。
协程
常见的并发模型
- 线程与锁模型
- Actor 模型
- CSP 模型
- Fork 与 Join 模型
Go 语言天生支持并发,主要通过基于通信顺序过程(Communicating Sequential Processes, CSP)的 goroutine 和通道 channel 实现,同时也支持传统的多线程共享内存的并发方式。
goroutine 会以一个很小的栈开始其生命周期,一般只需要 2 KB。goroutine 由 Go 运行时(runtime)调度,Go 运行时会智能地将 m 个 goroutine 合理的分配给 n 个操作系统线程,实现类似 m:n 的调度机制,不再需要开发者在代码层面维护线程池。
goroutine 调度
操作系统线程的调度:操作系统线程在被内核调度时挂起当前执行的线程,并将它的寄存器内容保存到内存中,然后选出下一次要执行的线程,并从内存中恢复该线程的寄存器信息,恢复现场并执行该线程,这样就完成一次完整的线程上下文切换。
goroutine 调度:区别于操作系统线程的调度,goroutine 调度在 Go 语言运行时层面实现,完全由 Go 语言本身实现,按照一定规则将所有的 goroutine 调度到操作系统线程上执行。
goroutine 调度器采用 GPM 调度模型,如下所示:
-
G:表示 goroutine,每执行一次go f()就创建一个 G,包含要执行的函数和上下文信息。
-
全局队列(Global Queue):存放等待运行的 G。
-
P:表示 goroutine 执行所需的资源,最多有 GOMAXPROCS 个。GOMAXPROCS 默认 CPU 核心数,指定需要使用多少个操作系统线程来同时执行代码。
-
P 的本地队列:同全局队列类似,存放的也是等待运行的G,存的数量有限,不超过256个。新建 G 时,G 优先加入到 P 的本地队列,如果本地队列满了会批量移动部分 G 到全局队列。
-
M:线程想运行任务就得获取 P,从 P 的本地队列获取 G,当 P 的本地队列为空时,M 也会尝试从全局队列或其他 P 的本地队列获取 G。M 运行 G,G 执行之后,M 会从 P 获取下一个 G,不断重复下去。
-
Goroutine 调度器和操作系统调度器是通过 M 结合起来的,每个 M 都代表了1个内核线程,操作系统调度器负责把内核线程分配到 CPU 的核上执行。
参考:https://www.liwenzhou.com/posts/Go/concurrence/
使用 goroutine
启动 goroutine 只需要在函数前加 go 关键字:
func f(msg string) {
for i := 0; i < 3; i++ {
fmt.Println(msg, ":", i)
}
}
func functino01() {
go f("goroutine")
go func(msg string) {
fmt.Println(msg)
}("going")
time.Sleep(time.Second)
fmt.Println("done")
}
going
goroutine : 0
goroutine : 1
goroutine : 2
done
使用 time.Sleep
等待协程 goroutine 的运行不优雅,同时也不够精确,后续会采用 sync 包提供的常用并发原语,对协程的运行状态进行控制。
在 go 1.22.0 版本后,如下使用可正常在协程闭包函数中捕获外部的变量,而不是每个 loop 仅一份变量了。
参考:https://zhuanlan.zhihu.com/p/674158675
func function05() {
for i := 0; i < 5; i++ {
go func() {
fmt.Println(i)
}() // 正常输出 0~4 中的数字,而不是全是 4
}
time.Sleep(time.Second)
}
通道
通道 channel 是一种特殊类型,遵循先入先出(FIFO)的特性,用于 goroutine 之间的同步、通信。
声明 channel 语法如下:
chan T // 双向通道
chan <- T // 只能发送的通道
<- chan T // 只能接收的通道
channel 是一个引用类型,在被初始化前值为 nil,需要使用 make 函数进行初始化。缓冲区大小可选:
- 有缓冲通道:
make(chan T, capacity int)
- 无缓冲通道:
make(chan T)
、make(chan T, 0)
通道共有三种操作,发送、接受、关闭:
- 定义通道
ch := make(chan int)
- 发送一个值到通道中
ch <- 10
- 从通道中接收值
v := <- ch // 从 ch 接收值赋给 v
v, ok := <- ch // 多返回值,ok 表示通道是否被关闭
<- ch // 从 ch 接收值,忽略结果
- 关闭通道
close(ch)
tips
- 对一个关闭的通道发送值会导致 panic
- 对一个关闭的通道一直获取值会直到通道为空
- 重复关闭通道会 panic
- 通道值可以被垃圾回收
- 对一个关闭并且没值的通道接收值,会获取对应类型零值
无缓冲通道
又称阻塞通道,同步通道。
无缓冲通道必须至少有一个接收方才能发送成功,即发送操作会阻塞,直到另一个 goroutine 在该通道上接收。相反,接收操作先执行,也会阻塞至有 goroutine 往通道发送数据。
发送方和接收方要同步就绪,只有在两者都 ready 的情况下,数据才能在两者间传输。
等待一秒后,主程才能获取到 ch 中的数据
func function02() {
ch := make(chan int, 0)
go func() {
time.Sleep(time.Second)
ch <- 1
}()
v := <-ch
fmt.Println(v)
}
等待一秒后,协程中才能获取到 ch 中的数据
func function03() {
ch := make(chan int)
go func() {
v := <-ch
fmt.Println(v)
}()
time.Sleep(time.Second)
ch <- 1
time.Sleep(time.Second)
}
有缓冲通道
又称异步通道
有缓冲通道可以通过 cap
获取通道容量,len
获取通道内元素数量。如果通道元素数量达到上限,那么继续往通道发送数据也会被阻塞,直至有 goroutine 从通道获取数据。
通常选择使用 for range
循环从通道中接收值,当通道被关闭后,通道内所有值被接收完毕后会自动退出循环。
func function04() {
ch := make(chan int, 2)
fmt.Println(len(ch), cap(ch)) // 0 2
ch <- 1
ch <- 2
go func() {
for v := range ch {
fmt.Println(v)
}
}() // 1 2 3
ch <- 3
time.Sleep(time.Second)
}
- 多返回值模式
基本格式:value, ok := <- ch
ok
:如果为 false
表示 value 为无效值(通道关闭后的默认零值);如果为 true
表示 value 为通道中的实际数据值。
func function06() {
ch := make(chan int, 1)
ch <- 1
close(ch)
go func() {
for {
if v, ok := <-ch; ok {
fmt.Println(v)
} else {
break
}
}
}()
time.Sleep(time.Second)
}
单向通道
通常会在函数参数中限制通道只能用于接收或发送。控制通道在函数中只读或只写,提升程序的类型安全。
// Producer 生产者
func Producer() <-chan int {
ch := make(chan int, 1)
go func() {
for i := 0; i < 3; i++ {
ch <- i
}
close(ch) // 任务完成关闭通道
}()
return ch
}
// Consumer 消费者
func Consumer(ch <-chan int) int {
sum := 0
for v := range ch {
sum += v
}
return sum
}
func function07() {
ch := Producer()
sum := Consumer(ch)
fmt.Println(sum) // 3
}
在函数传参及赋值过程中,全向通道可以转为单向通道,但单向通道不可转为全向通道。
func function08() {
ch := make(chan int, 1)
go func(ch chan<- int) {
for i := 0; i < 2; i++ {
ch <- i
}
close(ch)
}(ch)
for v := range ch {
fmt.Println(v)
} // 0 1
}
Go 语言采用的并发模型是 CSP,提倡通过通信实现内存共享,而不是通过共享内存实现通信。
CSP 模型由并发执行的实体所组成,实体之间通过发送消息进行通信。
Go 通过 channel 实现 CSP 通信模型,主要用于 goroutine 之间的消息传递和事件通知。
select 多路复用
在从多个通道获取数据的场景下, 需要使用 select
选择器,使用方式类似于 switch
语句,有一系列的 case
分支和一个默认分支。
基本格式:
select {
case <- ch1:
...
case data := <- ch2:
...
case ch3 <- 3:
...
default:
...
}
select
会一直等待,直到其中某个 case
的通信操作完成,执行该 case
语句。
- 可处理一个或多个 channel 的接收和发送
- 如果多个 case 同时满足,select 随机选择一个执行
func function09() {
now := time.Now()
ch1 := make(chan string)
ch2 := make(chan string)
go func() {
time.Sleep(1 * time.Second)
ch1 <- "one"
}()
go func() {
time.Sleep(2 * time.Second)
ch2 <- "two"
}()
for i := 0; i < 2; i++ {
select {
case msg1 := <-ch1:
fmt.Println(msg1)
case msg2 := <-ch2:
fmt.Println(msg2)
}
} // one two
fmt.Println(time.Since(now)) // 2.0003655s
}
sync
在上述示例中,使用了大量的 time.Sleep
等待 goroutine 的结束。但还有更好的方式,使用内置的 sync 包管理协程的运行状态。
sync.WaitGroup
使用 wait group
等待多个协程完成,如果 WaitGroup
计数器恢复为 0,即所有协程的工作都完成:
var (
x int64
wg sync.WaitGroup
)
func function10() {
add := func() {
defer wg.Done()
for i := 0; i < 5000; i++ {
x = x + 1
}
}
wg.Add(2)
go add()
go add()
wg.Wait()
fmt.Println(x)
}
使用 go run -race main.go
可查看代码是否存在竞态问题,上述代码存在两个 goroutine 操作同一个资源,输出结果不定。
方法 | 作用 |
---|---|
WaitGroup.Add(delta) | 计数器值 +delta,建议在 goroutine 外部累加计数器 |
WaitGroup.Done() | 计数器值 -1 |
WaitGroup.Wait() | 阻塞代码,直到计数器值减为 0 |
注意:WaitGroup
对象不是一个引用类型,在通过函数传值的时候需要使用地址。
sync.Mutex
互斥锁是一种常用的控制共享资源访问的方法,它能够保证同一时间只有一个 goroutine 可以访问共享资源。
方法 | 作用 |
---|---|
Mutex.Lock() | 获取互斥锁 |
Mutex.Unlock() | 释放互斥锁 |
使用互斥锁对代码修改如下:
var (
x int64
wg sync.WaitGroup
mtx sync.Mutex
)
func function11() {
add := func() {
defer wg.Done()
for i := 0; i < 5000; i++ {
mtx.Lock() // 修改数据前,加锁
x = x + 1
mtx.Unlock() // 修改完数据后,释放锁
}
}
wg.Add(2)
go add()
go add()
wg.Wait()
fmt.Println(x) // 10000
}
sync.RWMutex
读写互斥锁,某些场景中读操作较为频繁,不涉及对数据的修改时,读写锁可能是更好的选择。
方法 | 作用 |
---|---|
RWMutex.Lock() | 获取写锁 |
RWMutex.Unlock() | 释放写锁 |
RWMutex.RLock() | 获取读锁 |
RWMutex.RUnlock() | 释放读锁 |
读写锁分为两种:读锁和写锁。当一个 goroutine 获取到读锁之后,其他的 goroutine 如果是获取读锁会继续获得锁,如果是获取写锁就会等待;而当一个 goroutine 获取写锁之后,其他的 goroutine 无论是获取读锁还是写锁都会等待。
sync.Once
在高并发场景下,可以使用 sync.Once
,保证操作只执行一次。当且仅当第一次访问某个变量时,进行初始化。变量初始化过程中,所有读都被阻塞,直到初始化完成。
sync.Once
其实内部包含一个互斥锁和一个布尔值,互斥锁保证布尔值和数据的安全,而布尔值用来记录初始化是否完成。这样设计就能保证初始化操作的时候是并发安全的,并且初始化操作也不会被执行多次。
sync.Once
仅提供了一个方法 Do
,参数 f
是对象初始化函数。
func (o *Once) Do(f func())
单例模式:
type Singleton struct{}
var (
instance *Singleton
once sync.Once
wg sync.WaitGroup
)
func GetInstance() *Singleton {
once.Do(func() {
instance = &Singleton{}
fmt.Println("Get Instance")
})
return instance
}
func function12() {
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
_ = GetInstance()
}()
} // Get Instance
wg.Wait()
}
程序只会输出一次 Get Instance
,说明 sync.Once
是线程安全的,支持并发,仅会执行一次初始化数据的函数。
sync.Map
Go 内置的 map
不是并发安全的,下述代码多个 goroutine 对 map 操作会出现竞态问题,报错不能正常运行。
var (
mp = make(map[string]interface{})
wg sync.WaitGroup
)
func function13() {
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
key := strconv.Itoa(i)
mp[key] = i
fmt.Println(key, mp[key])
}()
}
wg.Wait()
}
sync.Map
是并发安全版 map,不过操作数据不再是直接通过 []
获取插入数据,而需要使用其提供的方法。
方法 | 作用 |
---|---|
Map.Store(key, value interface{}) | 存储 key-value 数据 |
Map.Load(key interface{}) (value interface{}, ok bool) | 查询 key 对应的 value |
Map.LoadOrStore(key, value interface{}) (actual interface{}, loaded bool) | 查询 key 对应的 value,如果不存在则存储 key-value 数据 |
Map.LoadAndDelete(key interface{}) (value interface{}, loaded bool) | 查询并删除 key |
Map.Delete(key interface{}) | 删除 key |
Map.Range(f func(key, value interface{}) bool) | 对 map 中的每个 key-value 依次调用 f |
使用 sync.Map
修改上述代码,即可正确运行。
func function14() {
m := sync.Map{}
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
key := strconv.Itoa(i)
m.Store(key, i)
v, ok := m.Load(key)
if ok {
fmt.Println(key, v)
}
}()
}
wg.Wait()
}
LoadOrStore
、LoadAndDelete
示例代码:
// LoadOrStore、LoadAndDelete
func function15() {
m := sync.Map{}
//m.Store("cauchy", 19)
v, ok := m.LoadOrStore("cauchy", 20)
fmt.Println(v, ok) // 注释: 20 false;没注释: 19 true
v, ok = m.Load("cauchy")
fmt.Println(v, ok) // 注释: 20 true;没注释: 19 true
v, ok = m.LoadAndDelete("cauchy")
fmt.Println(v, ok) // 注释: 20 true;没注释: 19 true
v, ok = m.Load("cauchy")
fmt.Println(v, ok) // nil false
}
Range
示例代码:
Map.Range
可无序遍历 sync.Map
中的所有 key-value
键值对,如果返回 false
则终止迭代。
func function16() {
m := sync.Map{}
m.Store(3, 3)
m.Store(2, 2)
m.Store(1, 1)
cnt := 0
m.Range(func(key, value any) bool {
cnt++
fmt.Println(key, value)
return true
})
fmt.Println(cnt) // 3
}