💢欢迎来到张胤尘的开源技术站
💥开源如江河,汇聚众志成。代码似星辰,照亮行征程。开源精神长,传承永不忘。携手共前行,未来更辉煌💥
文章目录
- 通道
- 通道声明
- 初始化
- 缓冲机制
- 无缓冲通道
- 代码示例
- 带缓冲通道
- 代码示例
- 通道操作
- 发送数据
- 接收数据
- 单向通道
- 单向通道的用途
- 代码示例
- 多路复用
- 代码示例
- 通道复用器的实现
- 超时机制
- `time.After`
- `context`
- 关闭通道
- 检查通道是否关闭
- 获取通道长度
- 获取通道容量
- 源码解析
- 通道结构体
- 创建通道
- 函数原型
- 函数内容
- 发送数据
- 函数原型
- 函数内容
- 接收数据
- 函数原型
- 函数内容
- 关闭通道
- 函数原型
- 函数内容
通道
在传统的并发编程中,多个线程或进程之间通常通过共享内存来通信。这种模型虽然高效,但容易引发 竞争条件 和 死锁 等问题。为了避免这些问题,程序员在开发时需要使用复杂的同步机制(例如:锁、信号量等)来保护共享数据。但是 golang
采用了不同的思路:避免共享内存,通过通信来实现并发。
示意图所下所示:
通道就是 golang
中实现 Goroutine
之间通信的机制。它是一种 类型化的通道,允许多个 Goroutine
之间安全地传递数据。通道是 Golang
并发模型的核心,它解决了传统并发编程中共享内存带来的复杂性和风险。
本篇文章主要介绍的是通道,有关于协程、并发编程等相关知识点请关注后续文章《
Golang
协程》、《Golang
并发编程》。
通道声明
使用 var
关键字声明通道变量。如下所示:
var ch chan int
声明后,ch
是一个未初始化的通道,其默认值为 nil
。
初始化
使用 make
函数初始化通道,如下所示:
ch = make(chan int)
也可以使用 make
函数创建一个带缓冲区的通道,如下所示:
ch = make(chan int, 10)
缓冲机制
通道分为 无缓冲通道 和 带缓冲通道,它们在行为和使用场景上有一些关键区别。
无缓冲通道
在初始化的小结中,使用 make
函数进行初始化通道,当没有指定通道的容量,或者说通道的容量大小为 0 时,称为无缓冲通道。
无缓冲通道在发送数据和接收数据时存在如下的特点:
- 发送操作:发送数据时,发送方会阻塞,直到有接收方准备接收数据。
- 接收操作:接收方会阻塞,直到有发送方发送数据。
- 无缓冲通道的发送和接收操作是同步的,必须有发送方和接收方同时准备好,才能完成通信。
代码示例
package main
import (
"fmt"
"sync"
"time"
)
func main() {
unbufferedChan := make(chan int) // 创建无缓冲通道
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
fmt.Println("receiver is waiting...") // receiver is waiting...
time.Sleep(time.Second * 3) // 模拟3秒的准备时间
data := <-unbufferedChan // 在这行函数执行之前,发送方处于阻塞状态
fmt.Println("received:", data)
}()
unbufferedChan <- 42 // 发送方阻塞,直到接收方准备好
close(unbufferedChan) // 关闭通道
wg.Wait()
}
带缓冲通道
当指定了通道的容量,例如 make(chan int, 10)
,则称为带缓冲通道。
带缓冲通道在发送数据和接收数据时存在如下的特点:
- 发送操作:发送数据时,如果缓冲区未满,数据会被放入缓冲区,发送方不会阻塞;如果缓冲区已满,发送方会阻塞,直到缓冲区有空间。
- 接收操作:接收方从缓冲区中取出数据,如果缓冲区为空,接收方会阻塞,直到有数据可用。
- 带缓冲通道的发送和接收操作是异步的,发送方和接收方不需要同时准备好。缓冲区的存在允许数据在发送方和接收方之间暂时存储。
代码示例
package main
import (
"fmt"
"sync"
"time"
)
func main() {
bufferedChan := make(chan int, 2) // 创建带缓冲通道,容量大小为2
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
fmt.Println("receiver is waiting...")
// for {
// select {
// case data, ok := <-bufferedChan:
// if ok {
// fmt.Println("received:", data)
// } else {
// fmt.Println("bufferedChan closed")
// return
// }
// }
// }
for {
time.Sleep(time.Second * 5) // 模拟接收方不从缓冲通道中接收数据
}
}()
bufferedChan <- 42
bufferedChan <- 43
bufferedChan <- 44 // 如果接收方一直未接收数据,则在此处会发送阻塞
fmt.Println("send data over ...")
close(bufferedChan) // 关闭通道
wg.Wait()
}
通道操作
发送数据
使用 <-
操作符将数据发送到通道,如下所示:
package main
func main() {
ch := make(chan int)
ch <- 10 // 向通道中发送一个 10
close(ch) // 关闭通道
}
接收数据
使用 <-
操作符从通道中读取数据,如下所示:
package main
import "fmt"
func main() {
ch := make(chan int, 1)
ch <- 10 // 向通道中发送一个 10
data := <-ch // 通道中读取一个数据
fmt.Println(data) // 10
close(ch) // 关闭通道
}
单向通道
golang
中提供了单向通道这么一种特殊的通道类型,它只能用于发送或接收数据,而不能同时进行发送和接收操作。单向通道在类型声明上与普通通道(双向通道)有所不同,主要用于限制通道的使用方式,从而提高代码的可读性和安全性。
单向通道有如下两种类型:
- 只发送通道:用于发送数据,但不能接收数据。
chan<- Type
- 只接收通道:用于接收数据,但不能发送数据。
<-chan Type
单向通道的用途
单向通道的主要用途是限制通道的使用范围,避免在函数或方法中滥用通道的发送和接收功能。例如:
- 当一个函数只需要从通道中读取数据时,使用只接收通道可以明确表示该函数的意图。
- 当一个函数只需要向通道中写入数据时,使用只发送通道可以避免意外读取通道中的数据。
代码示例
package main
import (
"fmt"
"sync"
)
func producer(ch chan<- int, wg *sync.WaitGroup) { // ch 参数:一个只发送通道
defer wg.Done()
for i := 0; i < 5; i++ {
ch <- i // 向通道发送数据
fmt.Printf("sent: %d\n", i)
}
close(ch) // 关闭通道
}
func consumer(ch <-chan int, wg *sync.WaitGroup) { // ch 参数:一个只接收通道
defer wg.Done()
for {
select {
case data, ok := <-ch: // 读取通道数据
if ok {
fmt.Printf("received: %d\n", data)
} else {
fmt.Println("chan closed ...")
return
}
}
}
}
func main() {
var wg sync.WaitGroup
ch := make(chan int) // 创建一个双向通道
wg.Add(2)
// received: 0
// sent: 0
// sent: 1
// received: 1
// received: 2
// sent: 2
// sent: 3
// received: 3
// received: 4
// sent: 4
// chan closed ...
go consumer(ch, &wg) // 接收者
go producer(ch, &wg) // 发送者
wg.Wait()
}
多路复用
通道的多路复用机制是一种用于处理多个通道操作的技术,它允许程序同时等待多个通道的读写操作。这种机制的核心是 select...case
语句,它提供了对多个通道操作的并发处理能力。
select {
case <-ch1:
// 处理 ch1 的读操作
case data := <-ch2:
// 处理 ch2 的读操作,并将读取到的数据赋值给 data
case ch3 <- value:
// 向 ch3 中发送值
default:
// 如果没有通道准备好,则执行默认逻辑
}
另外需要强调的是,如果多个通道同时准备好,select...case
会随机选择一个通道执行操作;如果没有任何一个通道准备好,则 select
会陷入阻塞,除非有 default
分支执行。
需要注意的是,上面提到的 通道准备好 是个口语,对于接收操作来说需要满足以下两个条件中的任意一个:
- 通道中有数据可读:如果通道的缓冲区中有数据,或者有发送操作正在等待发送数据到通道中,那么接收操作就准备好。
- 通道已关闭:即使通道中没有数据,如果通道已经被关闭,接收操作也会立即返回零值,并且
ok
为false
(如果使用了data, ok := <-ch
的形式)。
对发送操作来说也需要满足以下两个条件中的任意一个:
- 通道中有空间可写:如果通道是无缓冲的,并且有等待接收的协程,或者通道是缓冲的且缓冲区中有空闲位置,那么发送操作就准备好。
- 通道已关闭:如果通道已经被关闭,发送操作会触发
panic("send on closed channel")
。
代码示例
通过 select
同时处理多个通道,如下所示:
package main
import (
"fmt"
"time"
)
func main() {
ch1 := make(chan string) // 创建通道 ch1
ch2 := make(chan string) // 创建通道 ch2
go func() { // 每间隔 1 秒,向通道 ch1 中发送数据,一共发送 10 条数据
defer close(ch1)
for i := 1; i <= 10; i++ {
ch1 <- "message from ch1"
time.Sleep(1 * time.Second)
}
}()
go func() { // 每间隔 2 秒,向通道 ch2 中发送数据,一共发送 10 条数据
defer close(ch2)
for i := 1; i <= 10; i++ {
ch2 <- "message from ch2"
time.Sleep(2 * time.Second)
}
}()
// no data reception ...
// recived: message from ch2
// recived: message from ch1
// no data reception ...
// no data reception ...
// recived: message from ch1
// no data reception ...
// recived: message from ch2
// recived: message from ch1
// no data reception ...
// recived: message from ch1
// no data reception ...
// no data reception ...
// recived: message from ch1
// recived: message from ch2
// no data reception ...
// recived: message from ch1
// no data reception ...
// recived: message from ch1
// recived: message from ch2
// no data reception ...
// recived: message from ch1
// no data reception ...
// no data reception ...
// recived: message from ch1
// recived: message from ch2
// no data reception ...
// recived: message from ch1
// no data reception ...
// recived: message from ch2
// ch1 chan closed ...
// recived: message from ch2
// recived: message from ch2
// recived: message from ch2
// recived: message from ch2
// ch2 chan closed ...
// both channels closed. exiting...
ch1_s, ch2_s := true, true
for ch1_s || ch2_s {
select {
case data, ok := <-ch1:
if ok {
fmt.Println("recived: ", data)
} else if ch1_s {
fmt.Println("ch1 chan closed ...")
ch1_s = false
}
case data, ok := <-ch2:
if ok {
fmt.Println("recived: ", data)
} else if ch2_s {
fmt.Println("ch2 chan closed ...")
ch2_s = false
}
default:
fmt.Println("no data reception ...")
time.Sleep(time.Second)
}
}
fmt.Println("both channels closed. exiting...")
}
通道复用器的实现
在某些复杂场景中,可能需要手动实现通道复用器。例如,可以将多个通道的输出合并到一个通道中,从而简化后续的处理逻辑。
下面是一个简单的通道复用器实现思路:
package main
import (
"fmt"
"sync"
"time"
)
func send(ch chan<- int) { // 模拟数据发送,每秒发送一个数字
defer close(ch)
for i := 1; i <= 10; i++ {
ch <- i
time.Sleep(time.Second)
}
}
func multiplexer(channels ...<-chan int) <-chan int { // 合并多个通道数据,函数返回最终的一个只读通道
out := make(chan int)
go func() {
defer close(out)
var wg sync.WaitGroup
for _, ch := range channels {
wg.Add(1)
go func(ch <-chan int) {
defer wg.Done()
for {
select {
case data, ok := <-ch:
if ok {
out <- data // 数据发送到最终的只读通道
} else {
return
}
}
}
}(ch)
}
wg.Wait()
}()
return out
}
func main() {
ch1 := make(chan int) // 创建通道 ch1
ch2 := make(chan int) // 创建通道 ch2
ch3 := make(chan int) // 创建通道 ch3
go send(ch1)
go send(ch2)
go send(ch3)
ret_ch := multiplexer(ch1, ch2, ch3) // 接收返回的通道
for data := range ret_ch { // 从只读通道中获取合并后的数据
fmt.Printf("ret received: %d\n", data)
}
fmt.Println("ret chan closed ...")
}
超时机制
在 golang
中,通道的超时机制可以通过 time.After
或 context
两种方式来实现。
time.After
time.After
是一个返回通道的函数,会在指定的超时时间后向通道发送一个时间值。结合 select...case
语句,可以实现超时逻辑。如下所示:
package main
import (
"fmt"
"time"
)
func main() {
ch := make(chan string) // 创建通道
go func() {
time.Sleep(3 * time.Second) // 模拟耗时操作
ch <- "任务完成"
}()
select {
case res := <-ch:
fmt.Println(res)
case <-time.After(2 * time.Second): // 设置超时为2秒
fmt.Println("超时退出") // 最终打印 超时退出
}
}
time.After
是一种非常简洁且易于理解的超时机制,特别适合简单的超时场景。但是需要注意的是,time.After
会在后台启动一个定时器,即使 select
提前退出,定时器也不会立刻回收,可能导致轻微的资源泄漏。
context
context
是 golang
中用于传递可取消信号、超时时间等的工具。通过 context.WithTimeout
创建一个带有超时功能的上下文,其 Done()
方法返回一个通道,用于超时控制,如下所示:
package main
import (
"context"
"fmt"
"time"
)
func main() {
ch := make(chan string) // 创建通道
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel() // cancel() 会释放与上下文相关的资源,避免内存泄漏
go func() {
time.Sleep(3 * time.Second) // 模拟耗时操作
ch <- "任务完成"
}()
select {
case res := <-ch:
fmt.Println(res)
case <-ctx.Done(): // 监听超时信号
fmt.Println("超时退出", ctx.Err())
}
}
在上面的这个代码中:
context.WithTimeout
创建了一个带有超时的上下文,并设置超时时间为 2 秒。defer cancel()
确保在函数返回时调用cancel()
,释放资源。- 如果任务在超时前完成,
cancel()
会被调用,终止所有监听ctx.Done()
的协程。
总的来说,context
更适用于复杂的并发场景,例如多个任务的超时控制、任务取消等;time.After
更适用于简单的超时控制,例如单个任务的超时。
关闭通道
使用 close
函数关闭通道,如下所示:
package main
func main() {
ch := make(chan int, 5)
close(ch) // 关闭通道
}
需要注意的是,当关闭通道后,不能再向通道发送数据,但可以继续从通道中接收数据。
检查通道是否关闭
接收数据时会有两个返回值,一个是数据另一个是布尔值,用于判断通道是否关闭,如下所示:
package main
import (
"fmt"
"sync"
)
func main() {
ch := make(chan int, 5)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case data, ok := <-ch: // data 表示数据,ok 表示通道是否关闭
if ok {
// received: 11
// received: 10
// received: 15
fmt.Println("received:", data)
} else {
fmt.Println("ch closed") // ch closed
return
}
}
}
}()
ch <- 11
ch <- 10
ch <- 15
close(ch) // 关闭通道
wg.Wait()
}
获取通道长度
使用 len
函数获取通道的当前长度,如下所示:
package main
import "fmt"
func main() {
ch := make(chan int, 5)
fmt.Println(len(ch)) // 0
ch <- 10
fmt.Println(len(ch)) // 1
close(ch) // 关闭通道
}
获取通道容量
使用 cap
函数获取通道的当前容量,如下所示:
package main
import "fmt"
func main() {
ch := make(chan int, 5)
fmt.Println(cap(ch)) // 5
ch1 := make(chan int)
fmt.Println(cap(ch1)) // 0
close(ch) // 关闭通道 ch
close(ch1) // 关闭通道 ch1
}
源码解析
针对通道的源代码进行解析,从以下几个方面:
- 创建通道
- 发送数据
- 接收数据
- 关闭通道
通道结构体
源码位置:src/runtime/chan.go
type hchan struct {
qcount uint // total data in the queue
dataqsiz uint // size of the circular queue
buf unsafe.Pointer // points to an array of dataqsiz elements
elemsize uint16
closed uint32
timer *timer // timer feeding this chan
elemtype *_type // element type
sendx uint // send index
recvx uint // receive index
recvq waitq // list of recv waiters
sendq waitq // list of send waiters
// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex
}
qcount
:当前通道中存储的数据数量,对于无缓冲通道,qcount
的值通常为 0 或 1。dataqsiz
:对于缓冲通道,dataqsiz
表示缓冲区可以存储的最大元素数量。对于无缓冲通道,dataqsiz
为 0。buf
:指向缓冲区的指针。缓冲区是一个数组,用于存储通道中的数据。仅对缓冲通道有效。无缓冲通道的buf
为nil
。elemsize
:通道中每个元素的大小(以字节为单位)。closed
:标记通道是否已关闭。关闭的通道不能再次发送数据,但可以继续接收数据直到缓冲区为空。timer
:指向一个定时器,该定时器与通道相关联(例如,用于超时操作)。elemtype
:指向通道中元素的类型信息,用于在运行时检查通道中存储的数据类型是否正确。sendx
:用于管理缓冲区的环形队列,记录下一次发送操作在缓冲区中的索引。recvx
:用于管理缓冲区的环形队列,记录下一次接收操作在缓冲区中的索引。recvq
:存储等待接收的协程队列,在发送操作中,如果缓冲区已满且没有等待接收的协程,则发送协程会被加入到sendq
。sendq
:存储等待发送的协程队列,在接收操作中,如果缓冲区为空且没有等待发送的协程,则接收协程会被加入到recvq
。lock
:保护hchan
结构体中所有字段的互斥锁,确保通道操作的线程安全性。在发送、接收和关闭操作中,lock
用于防止多个协程同时修改通道的状态。
创建通道
在 golang
的运行时中,创建通道的代码会被编译为对 makechan
的调用。如下所示:
package main
func main() {
ch := make(chan int, 1)
}
编译成汇编代码如下所示:
0x001a 00026 CALL runtime.makechan(SB)
以上汇编代码只是部分截取,请注意甄别。
makechan
函数是运行时中用于创建通道的核心函数。它初始化了一个 hchan
结构体,并根据指定的类型和缓冲区大小分配内存。
源码位置:src/runtime/chan.go
函数原型
func makechan(t *chantype, size int) *hchan {
// ...
}
t *chantype
:通道的类型信息,包含通道中元素的类型。size int
:通道的缓冲区大小。如果为 0,则创建无缓冲通道;如果大于 0,则创建有缓冲通道。- 返回一个初始化后的
hchan
结构体指针。
函数内容
func makechan(t *chantype, size int) *hchan {
elem := t.Elem // 从通道类型 t 中获取通道中元素的类型信息
// compiler checks this but be safe.
// 检查通道中元素的大小是否超过 64KB,如果超过,抛出异常
if elem.Size_ >= 1<<16 {
throw("makechan: invalid channel element type")
}
// 确保 hchan 的大小是最大对齐单位的倍数,并且元素的对齐要求不超过最大对齐单位
// maxAlign = 8
if hchanSize%maxAlign != 0 || elem.Align_ > maxAlign {
throw("makechan: bad alignment")
}
// 计算缓冲区的总大小:elem.Size_ * size
mem, overflow := math.MulUintptr(elem.Size_, uintptr(size))
// 检查是否发生溢出,或者缓冲区大小超过最大分配限制,或者缓冲区大小小于0,抛出异常
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
// buf points into the same allocation, elemtype is persistent.
// SudoG's are referenced from their owning thread so they can't be collected.
// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
// 内存分配
var c *hchan
switch {
case mem == 0:
// 如果缓冲区大小为 0(无缓冲通道)或元素大小为 0,仅分配 hchan 的内存
// Queue or element size is zero.
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector uses this location for synchronization.
// 初始化通道的缓冲区的指针
// buf 指向 hchan 结构体本身,表示没有独立的缓冲区
c.buf = c.raceaddr()
case !elem.Pointers():
// 如果元素类型不包含指针(如基本类型或数组),将 hchan 和缓冲区分配在同一块内存中
// Elements do not contain pointers.
// Allocate hchan and buf in one call.
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
// 初始化通道的缓冲区的指针
// 将 c.buf 指向 hchan 结构体之后的内存区域,该区域用于存储缓冲区数据
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// 如果元素类型包含指针(如结构体或切片),分别分配 hchan 和缓冲区的内存
// 因为 gc 需要跟踪指针类型的内存分配
// Elements contain pointers.
c = new(hchan)
// 初始化通道的缓冲区的指针
c.buf = mallocgc(mem, elem, true)
}
// 设置通道的元素大小
c.elemsize = uint16(elem.Size_)
// 设置通道的元素类型
c.elemtype = elem
// 设置通道的缓冲区大小
c.dataqsiz = uint(size)
// 初始化通道的互斥锁
lockInit(&c.lock, lockRankHchan)
// 如果启用了通道调试模式,打印调试信息
if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.Size_, "; dataqsiz=", size, "\n")
}
// 返回通道指针
return c
}
具体分配内存 mallocgc
函数原型如下所示:
func mallocgc(size uintptr, typ *_type, needzero bool) unsafe.Pointer {
// ...
}
size
:分配的内存大小。typ
:分配内存的类型信息。needzero
:是否需要将分配的内存清零。
更多关于
mallocgc
函数的内容本文章不再赘述,感兴趣的同学请关注后续文章《Golang
内存模型》。
发送数据
在 golang
的运行时中,发送操作的代码会被编译为对 chansend1
的调用。如下所示:
package main
func main() {
ch := make(chan int, 1)
ch <- 1
}
编译成汇编代码如下所示:
0x001a 00026 CALL runtime.makechan(SB)
# ...
0x0026 00038 CALL runtime.chansend1(SB)
以上汇编代码只是部分截取,请注意甄别。
而在 chansend1
的函数内部,又是对 chansend
的调用,如下所示:
源码位置:src/runtime/chan.go
// entry point for c <- x from compiled code.
//
//go:nosplit
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc())
}
函数原型
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// ...
}
c *hchan
:指向通道的指针。ep unsafe.Pointer
:指向数据的指针。block bool
:是否阻塞发送。- 如果
block == true
,发送操作会在通道缓冲区满或没有接收方时阻塞,直到可以发送数据。 - 如果
block == false
,发送操作是非阻塞的。如果当前无法发送数据(缓冲区满或没有接收方),函数会立即返回false
。
- 如果
callerpc uintptr
:程序计数器(PC)的值,主要用于调试和竞态检测。- 发送操作是否成功。
函数内容
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// 检查通道指针 c 是否为 nil
if c == nil {
if !block {
// 如果通道为 nil 且是非阻塞发送,直接返回 false
return false
}
// 如果是阻塞发送,则协程会阻塞并抛出异常
gopark(nil, nil, waitReasonChanSendNilChan, traceBlockForever, 2)
throw("unreachable")
}
// 如果启用了通道调试模式,打印调试信息
if debugChan {
print("chansend: chan=", c, "\n")
}
// 如果启用了竞态检测器,记录对通道的读操作
if raceenabled {
racereadpc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(chansend))
}
// Fast path: check for failed non-blocking operation without acquiring the lock.
//
// After observing that the channel is not closed, we observe that the channel is
// not ready for sending. Each of these observations is a single word-sized read
// (first c.closed and second full()).
// Because a closed channel cannot transition from 'ready for sending' to
// 'not ready for sending', even if the channel is closed between the two observations,
// they imply a moment between the two when the channel was both not yet closed
// and not ready for sending. We behave as if we observed the channel at that moment,
// and report that the send cannot proceed.
//
// It is okay if the reads are reordered here: if we observe that the channel is not
// ready for sending and then observe that it is not closed, that implies that the
// channel wasn't closed during the first observation. However, nothing here
// guarantees forward progress. We rely on the side effects of lock release in
// chanrecv() and closechan() to update this thread's view of c.closed and full().
// 如果是非阻塞发送并且通道未关闭但缓冲区已满,直接返回 false
// 前置判断,避免进入锁的开销
if !block && c.closed == 0 && full(c) {
return false
}
// 如果启用了阻塞剖析,记录当前时间戳
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
// 锁定通道的互斥锁,确保操作的原子性
lock(&c.lock)
// 如果通道已关闭,解锁并抛出异常
// 阅读到此处时,考虑是否改为使用原子操作,代替 lock/unlock?
// 可?不可?
// 记录吧!!!
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
// 如果有等待接收的协程,直接将数据发送给接收协程,跳过缓冲区
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
// 调用 send 函数完成数据传递,并解锁
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
// 如果没有等待接收的协程,则判断缓冲区是否有空间
if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
// 计算缓冲区中下一个写入位置的指针
qp := chanbuf(c, c.sendx)
if raceenabled {
// 通知竞态检测器当前操作的上下文信息,对缓冲区的写入操作被正确跟踪
racenotify(c, c.sendx, nil)
}
// 将要发送的数据从 ep 复制到缓冲区的 qp 位置
typedmemmove(c.elemtype, qp, ep)
// 更新发送索引为下一个写入位置
c.sendx++
// 如果 c.sendx 达到缓冲区大小,则将其重置为 0,实现环形缓冲区的效果
if c.sendx == c.dataqsiz {
c.sendx = 0
}
// 每次成功写入数据后,c.qcount 增加 1
c.qcount++
// 解锁通道的互斥锁
unlock(&c.lock)
// 写入成功,返回 true
return true
}
// 如果缓冲区已满且是非阻塞发送,解锁并返回 false
if !block {
unlock(&c.lock)
return false
}
// 下面是阻塞发送的代码逻辑
// Block on the channel. Some receiver will complete our operation for us.
// 获取当前正在运行的协程的指针
gp := getg()
// 分配一个 sudog 对象,sudog 是 golang 运行时中用于表示协程在通道操作中等待的结构体
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
// 下面这些是初始化 sudog 对象属性
mysg.elem = ep // 指向要发送的数据指针
mysg.waitlink = nil
mysg.g = gp // 指向当前协程
mysg.isSelect = false // 标记是否是 select 操作
mysg.c = c // 指向当前通道
gp.waiting = mysg // 将当前协程的 sudog 对象设置为等待状态
gp.param = nil
// 将 sudog 对象加入通道的发送等待队列
c.sendq.enqueue(mysg)
// Signal to anyone trying to shrink our stack that we're about
// to park on a channel. The window between when this G's status
// changes and when we set gp.activeStackChans is not safe for
// stack shrinking.
// 标记当前协程即将阻塞在通道操作
gp.parkingOnChan.Store(true)
// 阻塞当前协程,直到被接收操作唤醒
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceBlockChanSend, 2)
// Ensure the value being sent is kept alive until the
// receiver copies it out. The sudog has a pointer to the
// stack object, but sudogs aren't considered as roots of the
// stack tracer.
// 确保发送的数据在接收协程拷贝之前不会被回收,因为 ep 指向的数据有可能是栈上的数据
// 而栈上的数据可能在协程阻塞后被回收
KeepAlive(ep)
// 下面是协程被唤醒后的处理代码
// someone woke us up.
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
// 清空协程的等待状态
gp.waiting = nil
// 标记协程不再阻塞在通道操作上
gp.activeStackChans = false
// 发送操作是否成功,mysg.success 表示 true 成功,false 失败
closed := !mysg.success
gp.param = nil
if mysg.releasetime > 0 {
// 记录协程阻塞的时间
blockevent(mysg.releasetime-t0, 2)
}
// 清空 sudog 对象的通道指针
mysg.c = nil
releaseSudog(mysg)
if closed {
// 如果通道未关闭但协程被唤醒,抛出异常
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
// 通道在发送操作完成前被关闭,报错 !!!
// 这也就是为什么说向一个已经关闭通道写数据会报错,原因就在这里
panic(plainError("send on closed channel"))
}
// 发送操作成功完成
return true
}
send
函数的作用是将数据直接发送给等待接收的协程,并唤醒等待接收的协程。
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
// 竞态检测
if raceenabled {
// 无缓冲通道
if c.dataqsiz == 0 {
// 通知竞态检测器当前操作的上下文
racesync(c, sg)
} else {
// 有缓冲通道
// Pretend we go through the buffer, even though
// we copy directly. Note that we need to increment
// the head/tail locations only when raceenabled.
// 即使数据是直接发送的,竞态检测器也会模拟数据通过缓冲区的流程
// 调用 racenotify,记录接收索引(c.recvx)的变化。
racenotify(c, c.recvx, nil)
racenotify(c, c.recvx, sg)
// 更新接收索引 c.recvx 和发送索引 c.sendx,模拟环形缓冲区的行为
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
}
// 如果 sg.elem 不为 nil,调用 sendDirect 将数据从发送方 ep 直接发送到接收方 sg.elem
if sg.elem != nil {
sendDirect(c.elemtype, sg, ep)
// 清空 sg.elem,表示数据已发送
sg.elem = nil
}
// 获取等待接收的协程
gp := sg.g
// 调用解锁函数,释放通道的锁
unlockf()
// 将 sudog 对象传递给接收协程,用于后续处理
gp.param = unsafe.Pointer(sg)
// 标记发送操作成功
sg.success = true
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 将 gp 协程标记为可运行状态,并将其加入调度队列中等待执行
goready(gp, skip+1)
}
sendDirect
函数的作用是将数据直接从发送方的内存位置发送到等待接收的协程 sudog
对象 。
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
// src is on our stack, dst is a slot on another stack.
// Once we read sg.elem out of sg, it will no longer
// be updated if the destination's stack gets copied (shrunk).
// So make sure that no preemption points can happen between read & use.
// 获取接收方的内存位置,以便将数据直接发送到该位置
dst := sg.elem
// 触发写屏障
// 确保垃圾回收器能够正确追踪目标内存区域中的指针
typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.Size_)
// No need for cgo write barrier checks because dst is always
// Go memory.
// 这是一个底层函数,用于在内存中安全地移动数据
// dst 目标地址,src 源地址
memmove(dst, src, t.Size_)
}
关于更多内存管理的知识点,请关注后续文章《
Golang
内存模型》。
接收数据
在 golang
的运行时中,接收数据操作的代码会被编译为对 chanrecv1
的调用。如下所示:
package main
import "fmt"
func main() {
ch := make(chan int, 1)
ch <- 1
data := <-ch
fmt.Println(data)
}
编译成汇编代码如下所示:
0x001a 00026 CALL runtime.makechan(SB)
# ...
0x0043 00067 CALL runtime.chanrecv1(SB)
以上汇编代码只是部分截取,请注意甄别。
而在 chanrecv1
的函数内部,又是对 chanrecv
的调用,如下所示:
源码位置:src/runtime/chan.go
// entry points for <- c from compiled code.
//
//go:nosplit
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)
}
函数原型
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// ...
}
c *hchan
:指向目标通道的指针。ep unsafe.Pointer
:指向接收数据的内存位置。如果为nil
,表示仅检查通道状态而不接收数据。block bool
:是否允许阻塞接收。如果为false
,则在无法立即接收数据时返回。selected bool
:表示是否成功接收数据。在select
语句中,用于标记是否选择了当前通道。received bool
:表示是否实际接收到数据。如果通道关闭且缓冲区为空,received
为false
。
函数内容
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// raceenabled: don't need to check ep, as it is always on the stack
// or is new memory allocated by reflect.
// 如果启用了通道调试模式,打印调试信息
if debugChan {
print("chanrecv: chan=", c, "\n")
}
// 如果通道为 nil
if c == nil {
if !block {
// 非阻塞接收,直接返回
return
}
// 如果是阻塞接收,协程会阻塞并抛出异常
gopark(nil, nil, waitReasonChanReceiveNilChan, traceBlockForever, 2)
throw("unreachable")
}
// 如果通道关联了一个定时器,则调用 maybeRunChan 来处理定时器事件
if c.timer != nil {
c.timer.maybeRunChan()
}
// 如果是非阻塞接收且通道为空
// Fast path: check for failed non-blocking operation without acquiring the lock.
if !block && empty(c) {
// After observing that the channel is not ready for receiving, we observe whether the
// channel is closed.
//
// Reordering of these checks could lead to incorrect behavior when racing with a close.
// For example, if the channel was open and not empty, was closed, and then drained,
// reordered reads could incorrectly indicate "open and empty". To prevent reordering,
// we use atomic loads for both checks, and rely on emptying and closing to happen in
// separate critical sections under the same lock. This assumption fails when closing
// an unbuffered channel with a blocked send, but that is an error condition anyway.
// 检查通道是否关闭
if atomic.Load(&c.closed) == 0 {
// Because a channel cannot be reopened, the later observation of the channel
// being not closed implies that it was also not closed at the moment of the
// first observation. We behave as if we observed the channel at that moment
// and report that the receive cannot proceed.
// 如果通道未关闭,直接返回
return
}
// The channel is irreversibly closed. Re-check whether the channel has any pending data
// to receive, which could have arrived between the empty and closed checks above.
// Sequential consistency is also required here, when racing with such a send.
// 如果通道已关闭并且为非阻塞并且缓冲区为空
if empty(c) {
// The channel is irreversibly closed and empty.
if raceenabled {
raceacquire(c.raceaddr())
}
// ep 不是空,则清空目标内存 ep
if ep != nil {
typedmemclr(c.elemtype, ep)
}
// 返回成功接收数据(true),未实际接收到数据(false)
return true, false
}
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
// 锁定通道的互斥锁,确保操作的原子性
lock(&c.lock)
// 如果通道已关闭
if c.closed != 0 {
// 缓冲区为空
if c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
// 这里是通道已经关闭了,而且缓冲区已经为空
// 考虑是否可以采用原子操作来代替这里的 lock/unlock 操作的操作?
// ...
// 和通道关闭时,发送者的检测同理
// 虽然理论上可以通过原子操作来避免加锁,但在实际实现中,锁的使用是为了确保线程安全和一致性,另外,虽然原子操作避免了锁的开销,但它们仍然有一定的性能开销
// 即使使用原子操作,也需要确保在检查 c.closed 和 c.qcount 时不会出现竞态条件。例如,如果一个协程在检查 c.closed 后修改了 c.qcount,可能会导致不一致的行为
// 记录吧!!!
// 解锁
unlock(&c.lock)
// 清空目标内存 ep
if ep != nil {
typedmemclr(c.elemtype, ep)
}
// 返回成功接收数据(true),未实际接收到数据(false)
return true, false
}
// The channel has been closed, but the channel's buffer have data.
} else {
// 通道未关闭,并且有等待发送的协程
// Just found waiting sender with not closed.
if sg := c.sendq.dequeue(); sg != nil {
// Found a waiting sender. If buffer is size 0, receive value
// directly from sender. Otherwise, receive from head of queue
// and add sender's value to the tail of the queue (both map to
// the same buffer slot because the queue is full).
// 调用 recv 函数直接从发送协程接收数据,跳过缓冲区
// 解锁通道
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
// 返回成功接收数据(true),实际接收到数据(true)
return true, true
}
}
// 如果缓冲区中有数据
if c.qcount > 0 {
// Receive directly from queue
// 得到缓冲区地址
qp := chanbuf(c, c.recvx)
if raceenabled {
racenotify(c, c.recvx, nil)
}
// 从缓冲区中读取数据到目标内存
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// 清空缓冲区中的数据
typedmemclr(c.elemtype, qp)
// 更新接收索引 c.recvx
c.recvx++
// 如果接收索引 == 缓冲区大小,则从 0 重新开始
if c.recvx == c.dataqsiz {
c.recvx = 0
}
// 更新缓冲区计数 c.qcount
c.qcount--
// 解锁
unlock(&c.lock)
// 返回成功接收数据(true),实际接收到数据(true)
return true, true
}
// 如果是非阻塞接收且缓冲区为空,解锁通道并返回未成功接收数据(false),未实际接收到数据(false)
if !block {
unlock(&c.lock)
return false, false
}
// 下面是进行阻塞接收数据代码逻辑
// no sender available: block on this channel.
// 获取当前协程
gp := getg()
// 分配一个 sudog 对象
mysg := acquireSudog()
// 下面是针对 mysg 对象属性的初始化
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
// 将协程加入接收等待队列
c.recvq.enqueue(mysg)
// 如果通道关联了定时器,调用 blockTimerChan
if c.timer != nil {
blockTimerChan(c)
}
// Signal to anyone trying to shrink our stack that we're about
// to park on a channel. The window between when this G's status
// changes and when we set gp.activeStackChans is not safe for
// stack shrinking.
// 标记协程即将阻塞在通道操作上
gp.parkingOnChan.Store(true)
// 调用 gopark 阻塞当前协程,直到被发送操作唤醒
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceBlockChanRecv, 2)
// 下面是协程被唤醒之后的操作流程
// someone woke us up
// 检查协程是否被正确唤醒
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
// 如果通道关联了定时器,调用 unblockTimerChan
if c.timer != nil {
unblockTimerChan(c)
}
// 清理协程状态
gp.waiting = nil
gp.activeStackChans = false
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
// 获取接收操作的结果
success := mysg.success
gp.param = nil
// 释放 sudog 对象
mysg.c = nil
releaseSudog(mysg)
// 返回接收操作结果
return true, success
}
recv
函数用于从通道中读取数据,并将其传递给等待接收的协程。
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
// 如果通道无缓冲区
if c.dataqsiz == 0 {
if raceenabled {
racesync(c, sg)
}
// 并且目标内存位置不是 nil, 则直接拷贝到目标位置
if ep != nil {
// copy data from sender
recvDirect(c.elemtype, sg, ep)
}
} else {
// 通道有缓冲区
// Queue is full. Take the item at the
// head of the queue. Make the sender enqueue
// its item at the tail of the queue. Since the
// queue is full, those are both the same slot.
// 计算缓冲区中接收位置的指针
qp := chanbuf(c, c.recvx)
if raceenabled {
racenotify(c, c.recvx, nil)
racenotify(c, c.recvx, sg)
}
// copy data from queue to receiver
if ep != nil {
// 如果目标内存位置不是 nil, 将数据从缓冲区复制到接收方的内存位置
typedmemmove(c.elemtype, ep, qp)
}
// copy data from sender to queue
// 将发送方的数据复制到缓冲区的尾部
typedmemmove(c.elemtype, qp, sg.elem)
// 增加接收索引并处理环形缓冲区的边界条件
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
// 更新发送索引,使其指向下一个可用位置
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
// 清空 sudog 对象中的数据指针
sg.elem = nil
// 获取发送协程的指针
gp := sg.g
// 调用解锁函数,释放通道的锁
unlockf()
gp.param = unsafe.Pointer(sg)
// 标记接收操作成功
sg.success = true
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 将 gp 协程标记为可运行状态,并将其加入调度队列中等待执行
goready(gp, skip+1)
}
recvDirect
函数的作用是从发送方直接接收数据,并将其复制到接收方的目标内存位置。
func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {
// dst is on our stack or the heap, src is on another stack.
// The channel is locked, so src will not move during this
// operation.
// 指向发送方的内存位置
src := sg.elem
// 写屏障
// 确保目标内存位置中的指针被垃圾回收器正确追踪
typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.Size_)
// 将数据从发送方的内存位置复制到接收方的内存位置
memmove(dst, src, t.Size_)
}
关于更多内存管理的知识点,请关注后续文章《
Golang
内存模型》。
关闭通道
在 golang
的运行时中,关闭通道操作的代码会被编译为对 closechan
的调用。如下所示:
package main
import "fmt"
func main() {
ch := make(chan int, 1)
close(ch)
}
编译成汇编代码如下所示:
0x001a 00026 CALL runtime.makechan(SB)
# ...
0x0020 00032 CALL runtime.closechan(SB)
以上汇编代码只是部分截取,请注意甄别。
函数原型
源码位置:src/runtime/chan.go
func closechan(c *hchan) {
// ...
}
c *hchan
:指向要关闭的通道的指针。
函数内容
func closechan(c *hchan) {
// 如果通道为 nil,直接抛出异常
if c == nil {
panic(plainError("close of nil channel"))
}
// 锁定通道的互斥锁,确保关闭操作的原子性
lock(&c.lock)
if c.closed != 0 {
// 如果通道已经关闭,解锁,抛出异常
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
if raceenabled {
callerpc := getcallerpc()
racewritepc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(closechan))
racerelease(c.raceaddr())
}
// 将通道的 closed 标志设置为 1,表示通道已关闭
c.closed = 1
// 需要被唤醒的协程集合
var glist gList
// release all readers
for {
// 遍历通道的接收等待队列,释放所有等待接收的协程
sg := c.recvq.dequeue()
if sg == nil {
break
}
// 如果 sg.elem 不为 nil
if sg.elem != nil {
// 清空接收方的内存位置
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 获取到接收者协程
gp := sg.g
gp.param = unsafe.Pointer(sg)
// 标记获取操作失败
sg.success = false
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
// 将协程加入到 glist 中,稍后唤醒
glist.push(gp)
}
// release all writers (they will panic)
for {
// 遍历通道的发送等待队列,释放所有等待发送的协程
sg := c.sendq.dequeue()
if sg == nil {
break
}
// 清空 sg.elem,避免内存泄漏
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 获取到发送者协程
gp := sg.g
gp.param = unsafe.Pointer(sg)
// 标记发送操作失败
sg.success = false
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
// 将协程加入到 glist 中,稍后唤醒
glist.push(gp)
}
// 解锁通道的互斥锁
unlock(&c.lock)
// Ready all Gs now that we've dropped the channel lock.
// 遍历 glist,唤醒所有等待的协程
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
// 调用 goready 将协程标记为可运行状态
goready(gp, 3)
}
}
🌺🌺🌺撒花!
如果本文对你有帮助,就点关注或者留个👍
如果您有任何技术问题或者需要更多其他的内容,请随时向我提问。