目录
基本使用
channel 数据结构
阻塞的协程队列
协程节点
构建 channel
写流程
读流程
非阻塞与阻塞
closechan(关闭)
基本使用
创建无缓存 channel
c := make(chan int) //创建无缓冲的通道 c
c := make(chan int,0) //创建无缓冲的通道 c
创建有缓存 channel
c := make(chan int, 3) //创建无缓冲的通道 c
例子:
package main
import (
"fmt"
"time"
)
func main() {
c := make(chan int, 3) //创建有缓冲的通道 c
//内置函数 len 返回未被读取的缓冲元素数量,cap 返回缓冲区大小
fmt.Printf("len(c)=%d, cap(c)=%d\n", len(c), cap(c))
go func() {
defer fmt.Println("子go程结束")
for i := 0; i < 3; i++ {
c <- i
fmt.Printf("子go程正在运行[%d]: len(c)=%d, cap(c)=%d\n", i, len(c), cap(c))
}
}()
time.Sleep(2 * time.Second) //延时2s
for i := 0; i < 3; i++ {
num := <-c //从c中接收数据,并赋值给num
fmt.Println("num = ", num)
}
fmt.Println("main进程结束")
}
channel 数据结构
type hchan struct {
qcount uint // total data in the queue
dataqsiz uint // size of the circular queue
buf unsafe.Pointer // points to an array of dataqsiz elements
elemsize uint16
closed uint32
elemtype *_type // element type
sendx uint // send index
recvx uint // receive index
recvq waitq // list of recv waiters
sendq waitq // list of send waiters
lock mutex
}
hchan:channel 数据结构
• qcount:当前 channel 中存在多少个元素;
• dataqsize: 当前 channel 能存放的元素容量;
• buf:channel 中用于存放元素的环形缓冲区;
• elemsize:channel 元素类型的大小;
• closed:标识 channel 是否关闭;
• elemtype:channel 元素类型;
• sendx:发送元素进入环形缓冲区的 index;
• recvx:接收元素所处的环形缓冲区的 index;
• recvq:因接收而陷入阻塞的协程队列;
• sendq:因发送而陷入阻塞的协程队列;
lock mutex 锁
阻塞的协程队列
type waitq struct {
first *sudog
last *sudog
}
waitq:阻塞的协程队列
• first:队列头部
• last:队列尾部
协程节点
sudog:用于包装协程的节点
type sudog struct {
g *g
next *sudog
prev *sudog
elem unsafe.Pointer // data element (may point to stack)
isSelect bool
c *hchan
}
• g:goroutine,协程;
• next:队列中的下一个节点;
• prev:队列中的前一个节点;
• elem: 读取/写入 channel 的数据的容器;
• isSelect:标识当前协程是否处在 select 多路复用的流程中;
• c:标识与当前 sudog 交互的 chan.
构建 channel
func makechan(t *chantype, size int) *hchan {
elem := t.elem
// ...
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
var c *hchan
switch {
case mem == 0:
// Queue or element size is zero.
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector uses this location for synchronization.
c.buf = c.raceaddr()
case elem.ptrdata == 0:
// Elements do not contain pointers.
// Allocate hchan and buf in one call.
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// Elements contain pointers.
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
lockInit(&c.lock, lockRankHchan)
return
}
写流程
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// ...
//加锁
lock(&c.lock)
// ...
//写时存在阻塞读协程
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
//写时不存在阻塞读协程,且缓冲区不满仍有空间
if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
qp := chanbuf(c, c.sendx)
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
//写时不存在阻塞读协程,且缓冲区满了没有空间
// ...
gp := getg()
mysg := acquireSudog()
mysg.elem = ep
mysg.g = gp
mysg.c = c
gp.waiting = mysg
c.sendq.enqueue(mysg)
atomic.Store8(&gp.parkingOnChan, 1)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
gp.waiting = nil
closed := !mysg.success
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true
}
总结:
1.当写时存在阻塞读协程,我们直接用
2.当写时不存在阻塞读协程,且缓冲区不满仍有空间时,我们直接加入环形缓冲区中
3.当写时不存在阻塞读协程,且缓冲区满了没用空间时,加入阻塞写协程队列中
注意:
1.有阻塞读协程和缓冲区满之间只有一个条件符合
2.对于未初始化的 chan,写入操作会引发死锁
3.对于已关闭的 chan,写入操作会引发 panic.
读流程
读流程与写流程差不多,不同点:
1.加入的是阻塞读队列
2.当环形缓冲区有和无数据时会有不同的操作
注意:
1.读空channel, park挂起,引起死锁
2.channel 已关闭且内部无元素,直接解锁返回
非阻塞与阻塞
区别:
非阻塞模式下,读/写 channel 方法通过一个 bool 型的响应参数,用以标识是否读取/写入成功.
• 所有需要使得当前 goroutine 被挂起的操作,在非阻塞模式下都会返回 false;
• 所有是的当前 goroutine 会进入死锁的操作,在非阻塞模式下都会返回 false;
• 所有能立即完成读取/写入操作的条件下,非阻塞模式下会返回 true.
何时进入非阻塞
默认情况下,读/写 channel 都是阻塞模式,只有在 select 语句组成的多路复用分支中,
与 channel 的交互会变成非阻塞模式:
在sudog:用于包装协程的节点
• isSelect:标识当前协程是否处在 select 多路复用的流程中;
closechan(关闭)
func closechan(c *hchan) {
if c == nil {
panic(plainError("close of nil channel"))
}
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
c.closed = 1
var glist gList
// release all readers
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
glist.push(gp)
}
// release all writers (they will panic)
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
glist.push(gp)
}
unlock(&c.lock)
// Ready all Gs now that we've dropped the channel lock.
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
关闭未初始化过的 channel 会 panic;
• 加锁;
• 重复关闭 channel 会 panic;
• 将阻塞读协程队列中的协程节点统一添加到 glist;
• 将阻塞写协程队列中的协程节点统一添加到 glist;
• 唤醒 glist 当中的所有协程.
防止还有协程挂起,没有被唤醒的资源浪费
参考:小徐先生1212 -- Golang Channel实现原理