在golang 并发编程里,经常会听到一句话:不要通过共享内存进行通信,通过通信来共享内存。下面我们会介绍下channel, 通过源码的方式去了解channel是怎么工作的。
基本结构
流程图
代码解读
type hchan struct {
qcount uint // 队列中的总数据
dataqsiz uint // 环形队列的大小
buf unsafe.Pointer // 指向数据数组 qsiz 的元素
elemsize uint16 // 循环队列中元素的大小;
closed uint32 // chan关闭标志
elemtype *_type // 循环队列中元素的类型
sendx uint // 待发送的数据在循环队列buf中的索引
recvx uint // 待接收的数据在循环队列buf中的索引
recvq waitq // 等待发送数据的 goroutine
sendq waitq // 等待接收数据的 goroutine
// 锁保护hchan中的所有字段,以及此chan 上被阻止的sudogs中的几个字段。
lock mutex
}
type waitq struct {
first *sudog
last *sudog
}
// sudog代表等待列表中的g,例如用于在通道上进行发送/接收。
type sudog struct {
g *g
next *sudog
prev *sudog
elem unsafe.Pointer // 数据元素 (可能指向栈)
// 以下字段永远不会同时访问。
// 对于chan,waitlink仅由g访问。
// 对于信号量,所有字段(包括上面的那些)只有在持有semaRoot锁时才能访问。
acquiretime int64
releasetime int64
ticket uint32
// isSelect表示g在 select 上
isSelect bool
// success 表示通道 c 上的通信是否成功。
// 如果 goroutine 被唤醒是因为通道 c 上传递了值,则为 true,
// 如果唤醒是因为通道 c 关闭,则为 false。
success bool
parent *sudog // semaRoot binary tree
waitlink *sudog // g.waiting list or semaRoot
waittail *sudog // semaRoot
c *hchan // channel
}
创建
例子
// make(chan 类型, 大小)
chBuf := make(chan int, 1024) // 缓冲队列
ch := make(chan int) // 无缓存队列(也叫同步队列)
我很重要: channel带缓存的异步,不带缓存同步
思维图
代码解读
func makechan(t *chantype, size int) *hchan {
elem := t.elem
// 检查数据是不是超过64kb
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
// 检查缓存是否溢出
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
// 当buf中存储的元素不包含指针时,chan不包含对GC感兴趣的指针。
// buf指向相同的分配,elemtype固定不变。
// SudoG 的引用来自其所属的线程,因此无法收集。
// TODO(dvyukov,rlh):重新思考收集器何时可以移动分配的对象。
var c *hchan
switch {
case mem == 0: // 队列或元素大小为零
// 只为hchan分配内存
c = (*hchan)(mallocgc(hchanSize, nil, true))
// 竞态检查,利用这个地址进行同步操作.
c.buf = c.raceaddr()
case elem.ptrdata == 0://元素不是指针
// 分配一块连续的内存给hchan和buf
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default: // 默认
// 单独为 hchan 和缓冲区分配内存
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.size) // 循环队列中元素的大小
c.elemtype = elem // 元素类型
c.dataqsiz = uint(size)
lockInit(&c.lock, lockRankHchan) // 锁优先级
if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
}
return c
}
channel 创建的时候分以下三种情况:
1- 队列或元素大小为零,调用mallocgc 在堆上给chan 创建一个hchansize大小的内存空间
2- 元素不是指针,调用mallocgc 在堆上给chan 创建一个 大小hchansize+ mem连续的内存空间(队列+buf缓存空间)
3- 默认情况下,调用mallocgc 单独为 hchan 分配内存
发送
代码解读
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if c == nil { // 如果chan为nil
if !block { // 非阻塞直接返回flase
return false
}
// 阻塞直接挂起抛出异常
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
if debugChan {
print("chansend: chan=", c, "\n")
}
if raceenabled {
racereadpc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(chansend))
}
// Fast path:在不获取锁的情况下检查失败的非阻塞操作。
// 在观察到通道未关闭后,我们观察到 channel 未准备好发送。(c.closed和full())
//当 channel 不为 nil,并且 channel 没有关闭时,
// 如果没有缓冲区且没有接收者rec或者缓冲区已经满了,返回 false。
if !block && c.closed == 0 && full(c) {
return false
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
lock(&c.lock) // 加锁
if c.closed != 0 { // 如果已经关闭了,就抛出异常
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
// 有等待的接收者。
if sg := c.recvq.dequeue(); sg != nil {
// 我们将要发送的值直接传递给send,绕过通道缓冲区(如果有的话)。
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
// 缓冲区存在空余空间时
if c.qcount < c.dataqsiz {
qp := chanbuf(c, c.sendx) // 找到要发送数据到循环队列buf的索引位置
if raceenabled {
racenotify(c, c.sendx, nil)
}
// 数据拷贝到循环队列中
typedmemmove(c.elemtype, qp, ep)
// 将待发送数据索引加1,
c.sendx++
// 如果到了末尾,从0开始 (循环队列)
if c.sendx == c.dataqsiz {
c.sendx = 0
}
// chan中元素个数加1
c.qcount++
// 释放锁返回true
unlock(&c.lock)
return true
}
if !block { // 缓冲区没有空间,直接返回false
unlock(&c.lock)
return false
}
// channel上阻塞。一些receiver将为我们完成操作。
gp := getg() // 获取发送数据使用的 G
// acquireSudog() 获取一个 sudog,如果缓存没有获取的到,就新建一个。
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// 在分配 elem 和将 mysg 加入 gp.waiting 队列,
// 没有发生堆栈分裂,copystack 可以找到它。
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false // 是否在 select
mysg.c = c
gp.waiting = mysg
gp.param = nil
// 调用 c.sendq.enqueue 方法将配置好的 sudog 加入待发送的等待队列
c.sendq.enqueue(mysg)
atomic.Store8(&gp.parkingOnChan, 1)
// 调用gopark方法挂起当前goroutine,状态为waitReasonChanSend,阻塞等待channel接收者的激活
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
// 确保发送的值保持活动状态,直到接收者将其复制出来
KeepAlive(ep)
// G 被唤醒
if mysg != gp.waiting { // 如果sudog 结构体中等待的 g 和获取到不一致
throw("G waiting list is corrupted")
}
gp.waiting = nil。// 等待请空
gp.activeStackChans = false
closed := !mysg.success
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)
if closed {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
return true
}
- chan 为nil 非阻塞的话就返回false, 阻塞panic
- 当 channel 不为 nil且 channel 没有关闭时,如果没有缓冲区且没有接收者rec或者缓冲区已经满了,返回 false。
- 如果存在等待的接收者,通过 send 直接将数据发送给阻塞的接收者
- 如果缓冲区存没有满,chanbuf + typedmemmove 将发送的数据写入 chan 的缓冲区;
- 如果缓冲区已满时,把发送数据的goroutin sendq中等待其他 G 接收数据;
func full(c *hchan) bool {
// c.dataqsiz是不可变的(在创建通道后不可再写操作),因此在通道操作期间的任何时候读取都是安全的。
if c.dataqsiz == 0 { // 数据是空的
return c.recvq.first == nil//指针读取是近似原子性的
}
// 数据满了
return c.qcount == c.dataqsiz//指针读取是近似原子性的
}
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if raceenabled {
if c.dataqsiz == 0 {
racesync(c, sg)
} else {
// 如果我们通过缓冲区,即使我们直接复制。
//请注意,我们只需要在raceenabled时增加头/尾位置。
racenotify(c, c.recvx, nil)
racenotify(c, c.recvx, sg)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz -- 发送的索引位置获取原理
}
}
if sg.elem != nil {// 直接把要发送的数据拷贝到receiver的内存地址
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 将等待接收数据的 G 标记成可运行状态 Grunnable (goready-ready: casgstatus(gp, _Gwaiting, _Grunnable))
//把该 G 放到发送方所在的处理器的 runnext 上等待执行, 该处理器在下一次调度时会立刻唤醒数据的接收方(runqput方法)
// runqput 尝试将 g 放入本地可运行队列:
// 如果 next 为 false,runqput 会将 g 添加到可运行队列的尾部。
// 如果 next 为 true,runqput 将 g 放入 _p_.runnext 槽中。 这里为true
// 如果运行队列已满,runnext 会将 g 放入全局队列。
// 仅由所有者 P 执行。
goready(gp, skip+1) // 唤醒等待的接收者goroutine
}
1- 直接把要发送的数据拷贝到receiver的内存地址
2- 将等待接收数据的 G 标记成可运行状态 Grunnable 。把该 G 放到发送方所在的处理器的 runnext 上等待执行, 该处理器在下一次调度时会立刻唤醒数据的接收方
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
//src 在我们的栈上,dst 是另一个栈上的一个插槽。
// 一旦我们从sg中读取了sg.elem,如果目标堆栈被复制(收缩),它将不再被更新。
// 因此,请确保在读取和使用之间不会发生抢占点。
dst := sg.elem
typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
// 不需要cgo写屏障检查,因为dst始终是Go内存。
memmove(dst, src, t.size)
}
sendDirect:数据拷贝到了接收者的内存地址上
// chanbuf(c, i) 是指向缓冲区中第 i 个槽的指针
func chanbuf(c *hchan, i uint) unsafe.Pointer {
return add(c.buf, uintptr(i)*uintptr(c.elemsize))
}
接收
代码解读
//chanrecv在通道c上接收数据,并将接收到的数据写入ep。
//ep可能为零,在这种情况下,收到的数据将被忽略。
//如果 block == false 且没有可用元素,则返回 (false, false)。
//否则,如果c是闭的,则返回零*ep并返回(true,false)。
//否则,用元素填充 *ep 并返回 (true, true)。
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// raceenabled:不需要检查ep,因为它始终位于堆栈上,或者是由reflect分配的新内存
if debugChan {
print("chanrecv: chan=", c, "\n")
}
if c == nil {
if !block { // 如果c为空且是非阻塞调用,那么直接返回 (false, false)
return
}
// 若果阻塞直接挂起,抛出错误
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
// Fast path: 在不获取锁的情况下检查失败的非阻塞操作
if !block && empty(c) { // 通过empty()判断是无缓冲chan或者是chan中没有数据
// 在观察到channel未准备好接收后,看channel是否已关闭。
if atomic.Load(&c.closed) == 0 { // 如果chan没有关闭,则直接返回 (false, false)
return
}
// 如果chan关闭, 为了防止检查期间的状态变化,二次调用empty()进行原子检查二次调用empty()进行原子检查,
// 如果是无缓冲chan或者是chan中没有数据,返回 (true, false)
// 通道已不可逆地关闭。 为了防止检查期间的状态变化,重新检查通道是否有任何待接收的数据
if empty(c) {
if raceenabled {
raceacquire(c.raceaddr())
}
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
lock(&c.lock)
// 如果已经关闭且chan中没有数据,返回 (true,false)
if c.closed != 0 && c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
// 从sendq 队列获取等待发送的 G
if sg := c.sendq.dequeue(); sg != nil {
// 在 channel 的sendq队列中找到了等待发送的 G,取出队头等待的G。
// 找到一个等待发送的sender 。如果缓冲区大小为 0,则直接从sender接收值 。
// 否则,从队列头部接收,并将发送者的值添加到队列尾部(由于队列已满,这两个值都映射到同一个缓冲区下标)。
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
// 如果缓冲区有数据
if c.qcount > 0 {
// 直接从队列接收
qp := chanbuf(c, c.recvx)
if raceenabled {
racenotify(c, c.recvx, nil)
}
// 接收数据地址ep不为空,直接从缓冲区复制数据到ep
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++ // 待接收索引加1
if c.recvx == c.dataqsiz { // 如果循环队列到了末尾,从0开始
c.recvx = 0
}
c.qcount-- // 缓冲区数据减1
unlock(&c.lock)
return true, true
}
if !block { // 如果是select非阻塞读取的情况,直接返回(false, false)
unlock(&c.lock)
return false, false
}
//没有可用的发送者:阻塞channel。
gp := getg() // 获取当前 goroutine 的指针,用于绑定给一个 sudog
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
// Signal to anyone trying to shrink our stack that we're about
// to park on a channel. The window between when this G's status
// changes and when we set gp.activeStackChans is not safe for
// stack shrinking.
atomic.Store8(&gp.parkingOnChan, 1)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
// someone woke us up
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
success := mysg.success
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, success
}
- 如果chan 为nil ,非阻塞的话返回(false, false),阻塞的panic
- chan已经关闭并且缓存没有数据,直接返回(false, false)
- 如果是无缓冲chan或者是chan中没有数据,返回 (true, false)
- chan 的sendq队列中存在挂起的G, 会将recvx 索引的数据拷贝到接收变量内存中并将sendq队列中G 数据拷到缓存
- 如果缓冲区有数据,直接读取recvx索引的数据
- 如果没有可用的发送者,获取当前 G 的指针,用于绑定给一个 sudog 并加入到recvq,等待调度器唤醒
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if c.dataqsiz == 0 {
if raceenabled {
racesync(c, sg)
}
if ep != nil {
// 从sender 复制数据
recvDirect(c.elemtype, sg, ep)
}
} else {
// 队列已满。从队列的头部取走该项目。
// 让发送方将其项目排入队列的尾部。
// 由于队列已满,这两个位置是相同的。
qp := chanbuf(c, c.recvx)
if raceenabled {
racenotify(c, c.recvx, nil)
racenotify(c, c.recvx, sg)
}
// 将数据从队列复制到receiver
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// 将数据从sender复制到队列
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, skip+1)
}
- 如果 chan 不存在缓冲区:调用 recvDirect 将 发送队列中 G 存储的 elem 数据拷贝到目标内存地址中;
- 如果 chan 存在缓冲区:
- 1- 将队列中的数据拷贝到receiver 的内存地址;
- 2- 将 sender 头的数据拷贝到缓冲区中,释放一个阻塞的 sender;
func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {
// dst 在我们的栈或堆上,src 在另一个栈上。
// chan已锁定,因此在此操作期间src将不会移动。
src := sg.elem
typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
memmove(dst, src, t.size)
}
// empty报告从c读取是否会阻塞(即通道为空)。它使用对可变状态的单个原子读取。
func empty(c *hchan) bool {
// c.dataqsiz 是不可变的
if c.dataqsiz == 0 {
// 发送队列为空
return atomic.Loadp(unsafe.Pointer(&c.sendq.first)) == nil
}
return atomic.Loaduint(&c.qcount) == 0
}
关闭
代码解读
func closechan(c *hchan) {
if c == nil { // 关闭空的chan,会panic
panic(plainError("close of nil channel"))
}
lock(&c.lock)
if c.closed != 0 { // 关闭已经关闭的chan,会panic
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
if raceenabled {
callerpc := getcallerpc()
racewritepc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(closechan))
racerelease(c.raceaddr())
}
// chan的closed置为关闭状态
c.closed = 1
var glist gList // 申明一个存放所有接收者和发送者goroutine的list
// 处理所有的recv(等待发送数据的 goroutine)
for {
sg := c.recvq.dequeue() // 将缓存队列待处理的recv 拿出来
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp) // 放到 glist
}
// release all writers (they will panic)
for { // 获取所有发送者
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
unlock(&c.lock)
// 唤醒所有的glist中的goroutine
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}
1- 如果channel 是nil 空指针或者已经关闭,会pnaic
2- close操作会做一些收尾的工作:将 recvq , sendq 数据放到 glist(释放队列)中,统一进行释放掉。
3- 有一个点需要注意,先释放 sendq 中的recver 不会存在问题,向关闭的channel 接收数据,最多返回nil。但是如果recvq 的 sender 就会出现问题,因为不能往关闭 channel 发送数据(panic)
总结
- channel 总体上分三类:
1- 同步:没有缓存
2- 异步: 有缓存
3- chan struct{} 类型的异步 Channel — struct{} 类型不占用内存空间,不需要实现缓冲区和直接发送
- 读/写/关闭情况可以下面这个表概括