Go-知识select

Go-知识select

  • 1. select 的特性
    • 1.1 chan读写
    • 1.2 返回值
    • 1.3 default
  • 2. select 经典使用
    • 2.1 永久阻塞
    • 2.2 快速检错
    • 2.3 限时等待
  • 3. 实现原理
    • 3.1 数据结构
    • 3.2 实现逻辑
    • 3.3 原理总结
  • 4. 总结
    • 4.1 大概原理
    • 4.2 参数
    • 4.3 返回值

一个小活动: https://developer.aliyun.com//topic/lingma/activities/202403?taskCode=14508&recordId=40dcecb786f9a65c2e83e95306822ce4#/?utm_content=m_fission_1 「通义灵码 · 体验 AI 编码,开 AI 盲盒」

githubio地址:https://a18792721831.github.io/

select 是Go在语言层面提供的多路I/O复用机制,用于检测多个chan是否就绪。
建议先查看chan的文章:https://jiayq.blog.csdn.net/article/details/135885482

1. select 的特性

1.1 chan读写

select 只能作用于chan,包括数据读取和写入:

func TestSelect(t *testing.T) {
	var c chan string
	c = make(chan string)
	var msg string
	h := "hello"
	select {
	case msg = <-c:
		fmt.Printf("msg = %s\n", msg)
	case c <- h:
		fmt.Printf("say hello\n")
	}
}

在上面的代码中,select有两个case语句,分别对应chan的读取和写入操作。
因为创建的chan没有缓存,在写入chan的同时需要有goroutine去读取,相应的,在读取chan的同时必须有goroutine写入。
上面只有一个goroutine,所以是select阻塞的。
在这里插入图片描述

如果对上面的代码做个修改,设置缓存大小为1:

func TestSelect(t *testing.T) {
	var c chan string
	c = make(chan string, 1)
	var msg string
	h := "hello"
	select {
	case msg = <-c:
		fmt.Printf("msg = %s\n", msg)
	case c <- h:
		fmt.Printf("say hello\n")
	}
}

在这里插入图片描述

因为缓存区为1,此时就能写入了,所以select的写入case是不阻塞的,执行chan写入case。
如果先给设置一个初值呢:

func TestSelect(t *testing.T) {
	var c chan string
	c = make(chan string, 1)
	c <- "hi"
	var msg string
	h := "hello"
	select {
	case msg = <-c:
		fmt.Printf("msg = %s\n", msg)
	case c <- h:
		fmt.Printf("say hello\n")
	}
}

在这里插入图片描述

因为缓存区只有1,已经满了,所以chan无法再写入了,只有chan读取是不阻塞的,所以每次都会执行chan读取。
如果chan既能写,又能读呢:

func TestSelect(t *testing.T) {
	var c chan string
	c = make(chan string, 2)
	c <- "hi"
	var msg string
	h := "hello"
	select {
	case msg = <-c:
		fmt.Printf("msg = %s\n", msg)
	case c <- h:
		fmt.Printf("say hello\n")
	}
}

当两个case都满足的时候,多次执行,发现select选择的case是不确定的:
在这里插入图片描述

在这里插入图片描述

可能要多执行几次才能发现不一样。
如果没有case满足,而且又不希望阻塞呢:

func TestSelect(t *testing.T) {
	var c chan string
	c = make(chan string)
	var msg string
	h := "hello"
	select {
	case msg = <-c:
		fmt.Printf("msg = %s\n", msg)
	case c <- h:
		fmt.Printf("say hello\n")
	default:
		fmt.Printf("default case \n")
	}
}

在这里插入图片描述

此时就不会阻塞了。
如果既有default,又有多个不阻塞的case呢:

func TestSelect(t *testing.T) {
	var c chan string
	c = make(chan string, 2)
	c <- "hi"
	var msg string
	h := "hello"
	select {
	case msg = <-c:
		fmt.Printf("msg = %s\n", msg)
	case c <- h:
		fmt.Printf("say hello\n")
	default:
		fmt.Printf("default case \n")
	}
}

此时只会随机执行case语句,不会执行default语句。
总结下,select的每个case语句只能操作一个chan,要么写入数据,要么读取数据。
如果没有chan可以进行读写操作,在没有default的情况下,select会阻塞,直到任意一个chan解除阻塞。
如果存在多个case不阻塞,那么select会随机挑选一个执行。
如果case阻塞,有default,那么select会执行default。
如果case不阻塞,有default,那么select永远不会执行default。

1.2 返回值

select 为Go语言的预留关键字,不是函数,可以在case语句中声明变量并为变量赋值。
case语句读取chan时,最多可以给两个变量赋值:

func TestSelect(t *testing.T) {
	var c chan string
	c = make(chan string, 2)
	c <- "hi"
	var msg string
	select {
	case <-c: // 0个变量
		fmt.Printf("skip res\n")
	case msg = <-c: // 一个变量
		fmt.Printf("msg = %s\n", msg)
	case msg, ok := <-c: // 两个变量,注意这里的 msg 重新定义了,与select外面的没有关系了,只是同名
		fmt.Printf("msg=%s, ok=%v", msg, ok)
	}
}

在这里插入图片描述

case 语句中chan的读取有两个返回条件,第一个是成功读取到数据,第二个是没有数据,且管道已经被关闭。

chan 读取更多资料:https://jiayq.blog.csdn.net/article/details/135885482
msg 在case中重新声明了, 简短变量声明更多资料:https://jiayq.blog.csdn.net/article/details/136428399

使用两个变量接收chan读取的返回值,可以判断读取到的值是否可信。
如果在上面的代码中增加 close(c),这样case语句中的chan读取到了零值:

func TestSelect(t *testing.T) {
	var c chan string
	c = make(chan string)
	var msg string
	close(c)
	select {
	case <-c:
		fmt.Printf("skip res\n")
	case msg = <-c:
		fmt.Printf("msg = %s\n", msg)
	case msg, ok := <-c:
		fmt.Printf("msg=%s, ok=%v", msg, ok)
	}
}

此时就拿到了零值,如果不判断是否可信,而是直接使用,就使用了预期之外的数据。
在这里插入图片描述

1.3 default

select 中的default语句不能处理chan的读写操作,当select的所有case语句都阻塞时,default语句将被执行。
在1.1中已经尝试了default的例子了。
需要注意的是,在一个select中,default只能出现一次,不限制位置,可以出现在任意顺序。

2. select 经典使用

2.1 永久阻塞

如果我们启动goroutine处理任务,并且不希望main函数退出,需要永久性阻塞main函数。
在Kubernetes项目的多个组件中均有使用select阻塞main函数的例子:

func main(){
    server := webhooktesting.NewTestServer(nil)
    server.StartTLS()
    fmt.Println("servering on", server.URL)
    select {}
}

如果一个select不包含case语句和default语句,那么goroutine就会陷入永久性阻塞中。

2.2 快速检错

如果使用chan在不同的goroutine中传递异常错误,使用select语句可以快速检查chan中是否有错误,避免等待:

    errCh := make(chan error, activate)
    jm.deleteJobPods(&job, activatePods, errCh) // 传入chan用于记录错误
    select{
    case manageJobErr = <- errCh: // 检查是否有错误发生
        if manageJobErr != nil {
            break
        }
    default: // 没有错误,快速结束检查
    }

如果没有异常错误,那么select会执行default语句,不会因为从chan中读取数据导致阻塞。

2.3 限时等待

如果想实现一个有效期的chan,当超过有效期,那么关闭chan,不能在写入了。

func waitForStopOrTimeout(stopCh <-chan struct{}, timeout time.Duration) <-chan struct{} {
    stopChWithTimeout := make(chan struct{})
    go func() {
        select {
        case <- stopCh: // 自然结束
        case <- time.After(timeout): // chan有效期
        }
        close(stopChWithTimeout)
    } ()
    return stopChWithTimeout
}

函数返回一个chan,可以在函数之间传递,但是chan会在指定时间后自动关闭。

3. 实现原理

请先思考:

  • 为什么每个case语句只能处理一个chan?
  • 为什么case语句的执行顺序是随机的(多个case都就绪)?
  • 为什么在case语句中向值为nil的chan中写数据不会触发panic?

3.1 数据结构

select中的case语句对应于runtime包中的scase(select-case)数据结构reflect/value.go:2480

// A runtimeSelect is a single case passed to rselect.
// This must match ../runtime/select.go:/runtimeSelect
type runtimeSelect struct {
	dir SelectDir      // SelectSend, SelectRecv or SelectDefault
	typ *rtype         // channel type
	ch  unsafe.Pointer // channel
	val unsafe.Pointer // ptr to data (SendDir) or ptr to receive buffer (RecvDir)
}

可以看出每个select-case的结构都如上,也就是说,每个case中必须包含chan,而且只能有一个chan。
另外,编译器在处理case语句时,如果case语句中没有chan,则会给出编译错误:

    select case must be receive, send or assign recv

runtime/select.go中也有定义:

// A runtimeSelect is a single case passed to rselect.
// This must match ../reflect/value.go:/runtimeSelect
type runtimeSelect struct {
	dir selectDir
	typ unsafe.Pointer // channel type (not used here)
	ch  *hchan         // channel
	val unsafe.Pointer // ptr to data (SendDir) or ptr to receive buffer (RecvDir)
}

其中的dir是标明了case的类型,总共有:

// These values must match ../reflect/value.go:/SelectDir.
type selectDir int

const (
	_             selectDir = iota
	selectSend              // case Chan <- Send
	selectRecv              // case <-Chan:
	selectDefault           // default
)

总共有4中:chan 的值为nil, 写chan, 读chan, default。
当chan的值为nil,那么不管是读还是写,都会永久阻塞,也就是说,如果case中的chan的值为nil,这类case会永远阻塞,select永远不会选中执行。
这也是为什么在case语句中向值为nil的chan中写数据不会panic。
这也是为什么会忽略第一个值。
default为特殊类型的case语句,其不会操作chan。而且,每个select语句中只能存在一个default语句,并且default语句可以出现在任意位置。
reflect/value.go中,存在对外可见的SelectCase声明:

type SelectCase struct {
	Dir  SelectDir // direction of case
	Chan Value     // channel to use (for send or receive)
	Send Value     // value to send (for send)
}

这个相比较少了很多运行时的定义。

3.2 实现逻辑

reflect/value.go中,有对select的执行逻辑的定义:

// Select executes a select operation described by the list of cases.
// Like the Go select statement, it blocks until at least one of the cases
// can proceed, makes a uniform pseudo-random choice,
// and then executes that case. It returns the index of the chosen case
// and, if that case was a receive operation, the value received and a
// boolean indicating whether the value corresponds to a send on the channel
// (as opposed to a zero value received because the channel is closed).
// Select supports a maximum of 65536 cases.
func Select(cases []SelectCase) (chosen int, recv Value, recvOK bool) {
    // select 的case 最多有65535 个
    // int = int32 = 2^16 - 1 ,最高位是符号位
    // 如果 case 的数量超过 65535,panic
	if len(cases) > 65536 {
		panic("reflect.Select: too many cases (max 65536)")
	}
	// 使用内部运行时的类型拷贝一份case
	var runcases []runtimeSelect
	// 如果 select中的case大于4个
	if len(cases) > 4 {
		// 根据长度申请 slice 的长度,避免扩容
		runcases = make([]runtimeSelect, len(cases))
	} else {
		// 如果少于4个,那么申请 slice 的时候,长度按实际长度申请,容量为4
		runcases = make([]runtimeSelect, len(cases), 4)
	}
	// 初始默认没有 default
	haveDefault := false
	// 遍历 select的case数组
	for i, c := range cases {
	    // 访问遍历的 case
		rc := &runcases[i]
		// 重新读取,可能是防止幻读?
		rc.dir = c.Dir
		// 看看当前遍历的是哪类case
		switch c.Dir {
		// 不在定义中的case类型,panic
		default:
			panic("reflect.Select: invalid Dir")
		// default 类型的 case	
		case SelectDefault: // default
		    // 如果已经有了default,那么会panic,表示一个select中有两个default
			if haveDefault {
				panic("reflect.Select: multiple default cases")
			}
			// 设置有default
			haveDefault = true
			// 如果default中有chan,那么panic,default中不能有chan
			if c.Chan.IsValid() {
				panic("reflect.Select: default case has Chan value")
			}
			// 如果chan有数据,表示写chan,default中不能写chan
			if c.Send.IsValid() {
				panic("reflect.Select: default case has Send value")
			}
		// 写chan类型的case	
		case SelectSend:
			ch := c.Chan
			// chan如果为nil,忽略
			if !ch.IsValid() {
				break
			}
			// 必须是chan类型
			ch.mustBe(Chan)
			// 必须可见
			ch.mustBeExported()
			// 获取dir值
			tt := (*chanType)(unsafe.Pointer(ch.typ))
			// 再次判断chan必须是写chan
			if ChanDir(tt.dir)&SendDir == 0 {
				panic("reflect.Select: SendDir case using recv-only channel")
			}
			// 设置内部类型属性
			rc.ch = ch.pointer()
			rc.typ = &tt.rtype
			// 获取写chan的数据
			v := c.Send
			// 数据是否有效,如果写chan,但是没有数据写,也会panic
			if !v.IsValid() {
				panic("reflect.Select: SendDir case missing Send value")
			}
			v.mustBeExported()
			// 设置内部类型属性
			v = v.assignTo("reflect.Select", tt.elem, nil)
			if v.flag&flagIndir != 0 {
				rc.val = v.ptr
			} else {
				rc.val = unsafe.Pointer(&v.ptr)
			}
		// 读chan类型的case	
		case SelectRecv:
		    // chan是否有效
			if c.Send.IsValid() {
				panic("reflect.Select: RecvDir case has Send value")
			}
			ch := c.Chan
			// chan如果为nil,忽略
			if !ch.IsValid() {
				break
			}
			ch.mustBe(Chan)
			ch.mustBeExported()
			tt := (*chanType)(unsafe.Pointer(ch.typ))
			if ChanDir(tt.dir)&RecvDir == 0 {
				panic("reflect.Select: RecvDir case using send-only channel")
			}
			rc.ch = ch.pointer()
			rc.typ = &tt.rtype
			rc.val = unsafe_New(tt.elem)
		}
	}
	// 将select-case数组转换为runtimeSelectCase类型,调用rselect,得到select命中的case和是否可信
	chosen, recvOK = rselect(runcases)
	// 如果是读chan,那么需要返回值
	if runcases[chosen].dir == SelectRecv {
		tt := (*chanType)(unsafe.Pointer(runcases[chosen].typ))
		t := tt.elem
		p := runcases[chosen].val
		fl := flag(t.Kind())
		if ifaceIndir(t) {
			recv = Value{t, p, fl | flagIndir}
		} else {
			recv = Value{t, *(*unsafe.Pointer)(p), fl}
		}
	}
	// 返回case的赋值
	return chosen, recv, recvOK
}

在这里插入图片描述

reflect/value.go中,select主要是将select-case数组进行规则校验,如果规则校验通过,那么转换为runtimeSelectCase数组,并调用select命中逻辑。
该函数返回select命中的是哪个case,值是什么,是否ok。
Go在运行时,runtime/select.go中的reflect_rselect()函数用于处理select语句
在这里插入图片描述

这里有个知识点://go:linkname
可以看到在reflect/Value.go中的Select函数调用了rselect函数,但是rselect函数没有方法体:
在这里插入图片描述

rselect函数的方法体在runtime/select.go中:
在这里插入图片描述

可以了解下go:linkname

//go:linkname reflect_rselect reflect.rselect
func reflect_rselect(cases []runtimeSelect) (int, bool) {
    // 如果select没有case,那么将陷入永久性阻塞
	if len(cases) == 0 {
	    // 永久性阻塞
		block()
	}
	// 创建一个 slice ,长度是select-case数组的长度(包含default)
	// 用于分离读chan和写chan,将写chan放在前面,读chan放在后面
	sel := make([]scase, len(cases))
	// 创建一个 int 类型的 slice,存储 select-case数组的下标,长度和select-case数组的长度相同
	// 用于记录原始顺序
	orig := make([]int, len(cases))
	// 创建两个变量,统计select-case数组中读chan和写chan的数量
	nsends, nrecvs := 0, 0
	// 存储 default-case的位置
	dflt := -1
	// 对select-case数组进行遍历
	for i, rc := range cases {
		var j int
		switch rc.dir {
		// 如果是default-case,那么记录default下标
		case selectDefault:
			dflt = i
			continue
		// 如果是写chan,那么写chan的数量加1,同时记录当前下标	
		case selectSend:
			j = nsends
			nsends++
		// 如果是读chan,那么读chan的数量加1,同时记录翻转的下标
		// 这里是为了将读chan和写chan进行分类,写chan移动到slice的前面,读chan移动到slice的后面	
		case selectRecv:
			nrecvs++
			j = len(cases) - nrecvs
		}
		// 将传入的select-case做类型转换后,写入新slice
		sel[j] = scase{c: rc.ch, elem: rc.val}
		// 记录原始位置
		orig[j] = i
	}
    // 如果 select-case数组中只有default,那么直接选中并返回
	if nsends+nrecvs == 0 {
		return dflt, false
	}
	// 如果slice中间存在空缺,空缺是default,上面的for-range中,没有将default-case放入slice
	if nsends+nrecvs < len(cases) {
	    // 做合并,将空缺移动到最后,也就是将default-case放到最后
		copy(sel[nsends:], sel[len(cases)-nrecvs:])
		copy(orig[nsends:], orig[len(cases)-nrecvs:])
	}
	// 创建一个2倍读写chan的case长度的slice,不计算default-case
	order := make([]uint16, 2*(nsends+nrecvs))
	var pc0 *uintptr
	// 是否编译时有 -race 
	if raceenabled {
		pcs := make([]uintptr, nsends+nrecvs)
		for i := range pcs {
			selectsetpc(&pcs[i])
		}
		pc0 = &pcs[0]
	}
	// 调用 selectgo 函数选择 case
	chosen, recvOK := selectgo(&sel[0], &order[0], pc0, nsends, nrecvs, dflt == -1)
	// 如果没有选中读写chan,那么select选中default
	if chosen < 0 {
		chosen = dflt
	} else {
	    // select命中的下标(原顺序的下标)
		chosen = orig[chosen]
	}
	// 返回 select命中的下标和可信标记
	return chosen, recvOK
}

可以发现runtime/select.go中的reflect_rselect函数提取出来读写chan的case,
并按照先写后读的顺序进行转换为内部类型的slice,最后调用selectgo函数进行select选择读写chan,
如果读写chan都没有选中,那么select选中default,如果没有default,那么会阻塞等待。
接下来看下runtime/select.go中的selectgo函数:

// cas0是select-chan-case的slice,order0是原顺序的slice,pc0是 -race 的slice
// nsends是写chan的case数量, nrecvs是读chan的case数量
// block是是否阻塞的标志,如果有default,那么没有选中返回-1,不阻塞
// 如果没有default,那么没有选中阻塞,直到有case就绪
func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool) {
    // 是否是debug模式,debug模式会打印一些关键信息
	if debugSelect {
		print("select: cas0=", cas0, "\n")
	}
	// 将cas0指向一个最大长度为65535的数组
	cas1 := (*[1 << 16]scase)(unsafe.Pointer(cas0))
	// 将cas0指向一个最大长度为65535*2的数组
	order1 := (*[1 << 17]uint16)(unsafe.Pointer(order0))
	// 计算select-chan-case的数量
	ncases := nsends + nrecvs
	// 从 cas1中切取select-chan-case切片
	// 这里填个坑:https://jiayq.blog.csdn.net/article/details/135716705
	// 切片扩展表达式3.3
	scases := cas1[:ncases:ncases]
	// 从 order1 中获取chan-case长度的切片,假设chan-case长度为n,这里从[0,675535*2)拿到了 [0,n) 长度的切片
	// 使用切片扩展表达式,设置容量为n
	pollorder := order1[:ncases:ncases]
	// 从 order1 中获取 [n,2n) 长度的切片
	// 使用切片扩展表达式,设置容量为n
	lockorder := order1[ncases:][:ncases:ncases]
	// 获取cpu的纳秒时间戳
	var t0 int64
	if blockprofilerate > 0 {
		t0 = cputicks()
	}
	// 统计有效select-chan-case的数量
	norder := 0
	// 对传入的select-chan-case切片做遍历
	for i := range scases {
		cas := &scases[i]
		// 如果 chan 为空,重新赋值,或者压根没有赋值,值为nil的chan
		// 那么清空chan的数据变量,忽略这个chan-case
		if cas.c == nil {
			cas.elem = nil // allow GC
			continue
		}
		// 获取一个[0, norder]的随机数
		j := fastrandn(uint32(norder + 1))
		// 将随机位置的数字放到递增的norder位置
		// 当前位置是零值,未做初始化,将当前位置设置为之前任意位置的值
		pollorder[norder] = pollorder[j]
		// 将被选定的随机位置放下标位置
		// 将被交换的位置设置为当前下标
		// 实际上可以这么立即,首先将当前下标设置为当前位置,然后选择之前的随机一个位置进行交换
		pollorder[j] = uint16(i)
		// select-chan-case加1
		norder++
	} 
	// 到了这里就对 pollorder 进行了随机shuffle,并且统计出来有效的chan-case数量
	// 只要有效的chan-case数量的切片
	pollorder = pollorder[:norder]
	// 顺便对 lockorder也切一下
	lockorder = lockorder[:norder]
	// 下面两个 for 构成一个【堆排序】
	// 建立大顶堆,大顶堆是为了升序
	// 遍历 lockorder 
	// 大顶堆,根节点最大
	// 这个遍历主要是实现 lockorder 的随机初始化,以及 lockorder 切片对应的chan-case的chan的地址是相等或递减的
	for i := range lockorder {
	    // 临时获取 lockorder 的下标
		j := i
		// 获取 shuffle 切片 pollorder中第一个值
		// 然后获取该值所指示的 chan-case 的chan
		c := scases[pollorder[i]].c
		// 根节点小于左节点
		for j > 0 && scases[lockorder[(j-1)/2]].c.sortkey() < c.sortkey() {
		    // 将 k 设置为 lockorder对于当前位置 一半的位置,堆排序,用数组存储一棵树,怎么存储。
			k := (j - 1) / 2
			// 交换 当前位置和 当前位置一半的位置
			lockorder[j] = lockorder[k]
			// 继续循环,实现 在 lockorder 中对应的 chan-case的chan地址是相等或者递减顺序
			j = k
		}
		// 将 shuffle 切片 pollorder中第一个值设置到 lockorder中的第一个值
		lockorder[j] = pollorder[i]
	}
	// 堆排序
	for i := len(lockorder) - 1; i >= 0; i-- {
	    // 当前索引的 lockorder 的值
		o := lockorder[i]
		// 获取 lockorder 对应的chan-case 的 chan
		c := scases[o].c
		// 将大顶堆的根节点换到数组末尾
		lockorder[i] = lockorder[0]
		// 重新构建大顶堆
		j := 0
		for {
		    // 左节点
			k := j*2 + 1
			// 如果到了已经排序过的位置,不在构建堆(数组最后是最大的,如果到了i,表示到了已经排过序的数据,不需要在参与构建大顶堆了)
			if k >= i {
				break
			}
			// 左节点小于右节点
			if k+1 < i && scases[lockorder[k]].c.sortkey() < scases[lockorder[k+1]].c.sortkey() {
				k++
			}
			// 根节点小于左节点,交换
			if c.sortkey() < scases[lockorder[k]].c.sortkey() {
				lockorder[j] = lockorder[k]
				j = k
				continue
			}
			break
		}
		lockorder[j] = o
	}
	// 是否是debug模式,如果是,输出堆排序后升序chan-case中chan地址的lockorder切片
	if debugSelect {
		for i := 0; i+1 < len(lockorder); i++ {
			if scases[lockorder[i]].c.sortkey() > scases[lockorder[i+1]].c.sortkey() {
				print("i=", i, " x=", lockorder[i], " y=", lockorder[i+1], "\n")
				throw("select: broken sort")
			}
		}
	}
	// 按照 lockorder 的顺序,给chan-case加锁(地址升序排列了,内存访问更快)
	sellock(scases, lockorder)
	var (
		gp     *g
		sg     *sudog
		c      *hchan
		k      *scase
		sglist *sudog
		sgnext *sudog
		qp     unsafe.Pointer
		nextp  **sudog
	)
	// pass 1 - look for something already waiting
	var casi int
	var cas *scase
	var caseSuccess bool
	var caseReleaseTime int64 = -1
	var recvOK bool
	// 对shuffle的切片进行遍历
	for _, casei := range pollorder {
	    // chan-case 的下标
		casi = int(casei)
		// chan-case
		cas = &scases[casi]
		// chan-case 的 chan
		c = cas.c
		// 如果 下标大于 写chan-case的数量,那么表示当前的chan-case是读chan-case
		// 需要注意,如果写chan关闭,就不能在写了,读chan关闭,如果有缓存区,那么还是能够读的
		if casi >= nsends {
		    // 获取读chan的goroutine等待队列
			sg = c.sendq.dequeue()
			// 如果有读chan的等待队列,那么跳转到读取
			// 如果有读chan的等待队列,那么对于读chan表示缓存区为空
			if sg != nil {
				goto recv
			}
			// 如果读chan是带有缓存的,而且缓存区有待读取的数据,那么跳转到数据读取
			if c.qcount > 0 {
				goto bufrecv
			}
			// 如果读chan已经关闭,那么挑战到读chan关闭
			if c.closed != 0 {
				goto rclose
			}
		} else { // 不是读chan-case,那么就是写chan-case
		    // 是否 -race 
			if raceenabled {
				racereadpc(c.raceaddr(), casePC(casi), chansendpc)
			}
			// 如果写chan-case的chan已经关闭了,那么跳转到写chan关闭
			if c.closed != 0 {
				goto sclose
			}
			// 获取写chan的等待队列
			sg = c.recvq.dequeue()
			// 如果写chan的等待队列不为空,那么跳转到写入
			// 如果存在等待队列,那么对于写chan表示缓存区已经满了
			if sg != nil {
				goto send
			}
			// 如果写chan缓存区未满,那么跳转到写入数据
			if c.qcount < c.dataqsiz {
				goto bufsend
			}
		}
	}
	// 如果有default-case ,那么block=false,如果没有default-case,那么block=true
	if !block {
	    // chan-case解锁
		selunlock(scases, lockorder)
		// 返回-1,表示select没有选中 chan-case,需要返回default-case
		casi = -1
		// 跳转到返回
		goto retc
	}
	// pass 2 - enqueue on all chans
	// 能走到这里,表示chan-case没有就绪,而且没有default-case,需要阻塞等待chan-case就绪
	// 获取当前 goroutine 信息
	gp = getg()
	// 如果已经在等待了,异常
	if gp.waiting != nil {
		throw("gp.waiting != nil")
	}
	// 获取等待信息
	nextp = &gp.waiting
	// 对lockorder进行遍历
	for _, casei := range lockorder {
	    // 获取chan-case的chan的升序地址的位置
		casi = int(casei)
		// 获取chan-case
		cas = &scases[casi]
		// 获取 c
		c = cas.c
		// 获取等待的goroutine
		sg := acquireSudog()
		sg.g = gp
		// 等待的 goroutine 是否在select 阻塞中
		sg.isSelect = true
		// 将当前chan-case的数据给等待的select的goroutine
		sg.elem = cas.elem
		sg.releasetime = 0
		if t0 != 0 {
			sg.releasetime = -1
		}
		// 将chan-case的chan赋值给goroutine
		sg.c = c
		// 继续下一个等待的goroutine
		*nextp = sg
		nextp = &sg.waitlink
		// 如果当前索引小于写chan-case的数量,那么说明当前是写chan
		if casi < nsends {
		    // 将等待的goroutine加入chan的等待队列
			c.sendq.enqueue(sg)
		} else {
		    // 否则就是读chan了
			c.recvq.enqueue(sg)
		}
	}
    // 等待goutine 被从chan的等待队列唤醒(其他的goroutine操作chan,会唤醒等待队列)
    // 释放 goroutine占用的P资源
	gp.param = nil
	// 使用原子操作设置goroutine处于等待chan唤醒状态
	atomic.Store8(&gp.parkingOnChan, 1)
	// 设置goroutine为等待状态,selparkcommit是goroutine唤醒的逻辑
	gopark(selparkcommit, nil, waitReasonSelect, traceEvGoBlockSelect, 1)
	// 设置goroutine不是激活状态
	gp.activeStackChans = false
	// 再次加锁
	sellock(scases, lockorder)
	// 设置goroutine未完成
	gp.selectDone = 0
	// 获取goroutine
	sg = (*sudog)(gp.param)
	// 释放goroutine的P资源
	gp.param = nil
	// pass 3 - dequeue from unsuccessful chans
	// otherwise they stack up on quiet channels
	// record the successful case, if any.
	// We singly-linked up the SudoGs in lock order.
	// 如果没有选中chan-case,那么返回 -1,必须有default-case
	casi = -1
	cas = nil
	// case 执行是否成功
	caseSuccess = false
	// 获取goroutine等待列表
	sglist = gp.waiting
	// 链表遍历等待的goroutine
	for sg1 := gp.waiting; sg1 != nil; sg1 = sg1.waitlink {
	    // 退出 select 等待状态
		sg1.isSelect = false
		// 数据清空
		sg1.elem = nil
		// chan释放
		sg1.c = nil
	}
	// 等待状态退出
	gp.waiting = nil
	// 遍历chan-case切片
	for _, casei := range lockorder {
	    // 获取 chan-case
		k = &scases[casei]
		// 如果是goroutine的等待链表头
		if sg == sglist {
			// 获取chan-case的切片的下标
			casi = int(casei)
			// 设置chan-case
			cas = k
			// 获取是否成功的状态
			caseSuccess = sglist.success
			// 如果释放时间大于0,那么获取释放时间
			if sglist.releasetime > 0 {
				caseReleaseTime = sglist.releasetime
			}
		} else {
		    // 获取chan-case的chan
			c = k.c
			// 如果当前索引小于写chan的数量,那么当前chan就是写chan
			if int(casei) < nsends {
			    // 从写chan的等待队列中移除goroutine
				c.sendq.dequeueSudoG(sglist)
			} else {
			    // 从读chan的等待队列中移除goroutine
				c.recvq.dequeueSudoG(sglist)
			}
		}
		// 移动下一个等待的goroutine
		sgnext = sglist.waitlink
		// 从链表移除
		sglist.waitlink = nil
		// 释放goroutine资源
		releaseSudog(sglist)
		// 移动
		sglist = sgnext
	}
	// 如果没有就绪,但是却被唤醒了
	if cas == nil {
		throw("selectgo: bad wakeup")
	}
	// 拿到chan
	c = cas.c
	// 打印调试日志
	if debugSelect {
		print("wait-return: cas0=", cas0, " c=", c, " cas=", cas, " send=", casi < nsends, "\n")
	}
	// 判断是读chan-case还是写chan-case
	if casi < nsends { // 写chan
	    // 如果写chan-case未成功,那么跳转到写chan关闭
		if !caseSuccess {
			goto sclose
		}
	} else {
	    // 如果写chan成功了,那么设置返回可信标志
		recvOK = caseSuccess
	}
	// -race
	if raceenabled {
		if casi < nsends {
			raceReadObjectPC(c.elemtype, cas.elem, casePC(casi), chansendpc)
		} else if cas.elem != nil {
			raceWriteObjectPC(c.elemtype, cas.elem, casePC(casi), chanrecvpc)
		}
	}
	// -msan
	if msanenabled {
		if casi < nsends {
			msanread(cas.elem, c.elemtype.size)
		} else if cas.elem != nil {
			msanwrite(cas.elem, c.elemtype.size)
		}
	}
	// chan-case解锁
	selunlock(scases, lockorder)
	// 跳转到返回
	goto retc
// 读chan读取数据
bufrecv:
	// -race
	if raceenabled {
		if cas.elem != nil {
			raceWriteObjectPC(c.elemtype, cas.elem, casePC(casi), chanrecvpc)
		}
		racenotify(c, c.recvx, nil)
	}
	// -msan
	if msanenabled && cas.elem != nil {
		msanwrite(cas.elem, c.elemtype.size)
	}
	// 设置可信标志
	recvOK = true
	// 从chan的缓存区读取数据
	qp = chanbuf(c, c.recvx)
	// 如果读chan-case的数据区不为空,那么清空并交换
	if cas.elem != nil {
		typedmemmove(c.elemtype, cas.elem, qp)
	}
	// 内存整理
	typedmemclr(c.elemtype, qp)
	// 读取索引(缓存区是环形列表)
	c.recvx++
	// 如果读取指针追上写入指针,表示缓冲区空(数组构成的环形队列,到了尾端了)
	if c.recvx == c.dataqsiz {
	    // 重置指针
		c.recvx = 0
	}
	// 缓存区可读数据减1
	c.qcount--
	// 解锁chan-case
	selunlock(scases, lockorder)
	// 跳转返回
	goto retc
// 写chan发送数据
bufsend:
	if raceenabled {
		racenotify(c, c.sendx, nil)
		raceReadObjectPC(c.elemtype, cas.elem, casePC(casi), chansendpc)
	}
	if msanenabled {
		msanread(cas.elem, c.elemtype.size)
	}
	// 从 写chan-case中将数据写入chan的缓冲区
	typedmemmove(c.elemtype, chanbuf(c, c.sendx), cas.elem)
	// chan写入指针以东南
	c.sendx++
	// 如果写入指针追上读取指针,表示缓冲区满(数组构成的环形队列,到了尾端了)
	if c.sendx == c.dataqsiz {
	    // 重置指针
		c.sendx = 0
	}
	// 缓冲区数据加1
	c.qcount++
	// 解锁chan-case
	selunlock(scases, lockorder)
	// 跳转返回
	goto retc
// 读chan读取
recv:
	// 阻塞读取
	recv(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
	// 打印调试信息
	if debugSelect {
		print("syncrecv: cas0=", cas0, " c=", c, "\n")
	}
	// 设置可信标志
	recvOK = true
	// 跳转返回
	goto retc
// 读关闭的chan,有缓冲区的chan,关闭后可读
rclose:
	// 解锁chan-case
	selunlock(scases, lockorder)
	// 设置可信标志(为什么是false,当数据未读取完的时候,应该是true)
	recvOK = false
	if cas.elem != nil {
		typedmemclr(c.elemtype, cas.elem)
	}
	if raceenabled {
		raceacquire(c.raceaddr())
	}
	// 跳转返回
	goto retc
// 写chan发送
send:
	if raceenabled {
		raceReadObjectPC(c.elemtype, cas.elem, casePC(casi), chansendpc)
	}
	if msanenabled {
		msanread(cas.elem, c.elemtype.size)
	}
	// 阻塞发送
	send(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
	// 打印调试信息
	if debugSelect {
		print("syncsend: cas0=", cas0, " c=", c, "\n")
	}
	// 跳转返回
	goto retc
// 返回
retc:
    // 如果case释放时间大于0,那么发送阻塞事件
	if caseReleaseTime > 0 {
		blockevent(caseReleaseTime-t0, 1)
	}
	// 返回选中的chan-case和可信标志
	return casi, recvOK
// 写关闭的chan,panic
sclose:
	// 解锁chan-case
	selunlock(scases, lockorder)
	// panic,关闭的chan不允许写
	panic(plainError("send on closed channel"))
}

3.3 原理总结

首先在reflect/Value.go中,存在对select的定义,在reflect/Value.go中,调用Select函数将select-case转换为内部类型,
并且会进行select-case的规则校验,调用rselect函数进行select选中。
rselect函数通过//go:linknameruntime/select.go中实现,实现函数为reflect_rselect
runtime/select.go中的reflect_rselect函数中对传入的select-case切片做读chan-case和写chan-case的分类,并且剔除永远阻塞和default-case。
接着调用runtime/select.go中的selectgo函数进行chan-case的选中。
selectgo函数中,对传入的select-case首先进行shuffle,得到pollorder。
然后使用大顶堆,对select-case构建lockorder,lockoder中的chan地址升序。
如果读chan-case的chan的等待队列不为空,那么唤醒等待goroutine,进行数据读取。
如果读chan-case的chan的等待队列为空,但是缓存区存在数据,那么进行缓存区数据读取。
如果读chan-case的chan已经关闭,那么直接读缓存区数据,否则返回不可信标志。
如果写chan-case的chan已经关闭,那么panic。
如果写chan-case的chan的等待队列不为空,那么唤醒并写入。
如果写chan-case的chan的缓冲区不满,那么写入数据。
如果有default,那么没有选中直接返回。上层调用直接返回default-case。
如果没有default,那么阻塞读写。
这里填个坑:https://jiayq.blog.csdn.net/article/details/135716705 ,切片扩展表达式3.3。

4. 总结

  • select仅能操作chan.
  • 每个case语句仅能处理一个chan,要么是读chan,要么是写chan.
  • 多个case语句的执行顺序是随机的。
  • 存在default语句,select将不会阻塞。
  • 使用select读取chan时,应该尽可能检查读取是否成功,确定数据是否可信。

4.1 大概原理

Go在运行时包中提供了selectgo函数用于处理select语句:

func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool) 

selectgo函数会从一组case语句中挑选一个case,并返回命中case的下标,对于读chan-case,还会返回是否成功地读取了数据(第二个返回值对于其他类型case无意义)。
在这里插入图片描述

4.2 参数

编译器会将select中的case语句存储在一个数组中,selectgo的第一个参数cas0就是这个数组的地址,参数nsends,nrecvs表示读chan-case和写chan-case的个数。
selectgo第二个参数order0是一个整型数组地址,长度是chan-case个数的2倍,order0数组是case执行随机性的关键。
order0数组被一分为二,前半部分存放chan-case的随机顺序(源代码中称为pollorder),selectgo函数会将原始的chan-case顺序打乱,这样在检查每个chan-case是否就绪时,就会表现出随机性。
后半部分存放chan加锁的顺序(源代码中称之为lockorder),selectgo会按照chan地址升序的顺序对chan进行加锁,从而避免因重复加锁引发的死锁问题。
在这里插入图片描述

4.3 返回值

当所有的chan-case都不可能就绪时,selectgo会陷入永久的阻塞,此时函数不会返回。一旦selectgo返回,就说明某个chan-case语句就绪了。
第一个返回值代表case的编号,这个编号与代码中出现的顺序一致,而非打乱后的顺序。
第二个返回值代表是否从chan中读取了数据,该值只针对读chan-case有意义。特别注意的是,第二个返回值为true时,仅代表从chan中读取了数据,对于已经关闭了的chan也是如此。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/460429.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

AMRT 3D 数字孪生引擎(轻量化图形引擎、GIS/BIM/3D融合引擎):智慧城市、智慧工厂、智慧建筑、智慧校园。。。

AMRT3D 一、概述 1、提供强大完整的工具链 AMRT3D包含开发引擎、资源管理、场景编辑、UI搭建、项目预览和发布等项目开发所需的全套功能&#xff0c;并整合了动画路径、精准测量、动态天气、视角切换和动画特效等工具。 2、轻量化技术应用与个性化定制 AMRT3D适用于快速开…

openGauss学习笔记-243 openGauss性能调优-SQL调优-典型SQL调优点-子查询调优

文章目录 openGauss学习笔记-243 openGauss性能调优-SQL调优-典型SQL调优点-子查询调优243.1 子查询调优243.1.1 子查询背景介绍243.1.2 openGauss对SubLink的优化243.1.3 更多优化示例 openGauss学习笔记-243 openGauss性能调优-SQL调优-典型SQL调优点-子查询调优 SQL调优是一…

Scala--01--简介、环境搭建

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 1. Scala简介1.1 Scala是什么&#xff1f;官网&#xff1a; [https://scala-lang.org/](https://scala-lang.org/)官方文档&#xff1a; [https://docs.scala-lang.…

WPF布局、控件与样式

视频来源&#xff1a;https://www.bilibili.com/video/BV1HC4y1b76v/ 布局 常用布局属性 HorizontalAlignment&#xff1a;用于设置元素的水平位置VerticalAlignment&#xff1a;用于设置元素的垂直位置Margin&#xff1a;指定元素与容器的边距Height&#xff1a;指定元素的…

音频提取:分享几个常用方法,简单好用

有时候我们会在视频中发现一首非常好听的歌曲&#xff0c;但是我们并不需要视频本身。 这时&#xff0c;我们可以提取视频中的音频&#xff0c;将其转化为音频文件&#xff0c;然后在任何时间、任何地点进行欣赏。 下面给大家分享几个提取视频中音频的几个方法&#xff0c;供…

[嵌入式AI从0开始到入土]16_ffmpeg_ascend编译安装及性能测试

[嵌入式AI从0开始到入土]嵌入式AI系列教程 注&#xff1a;等我摸完鱼再把链接补上 可以关注我的B站号工具人呵呵的个人空间&#xff0c;后期会考虑出视频教程&#xff0c;务必催更&#xff0c;以防我变身鸽王。 第1期 昇腾Altas 200 DK上手 第2期 下载昇腾案例并运行 第3期 官…

css 各种方位计算 - client系列 offset系列 scroll系列 x/y 系列

offset系列 HTMLElement.offsetTop - Web API 接口参考 | MDN 一文读懂offsetHeight/offsetLeft/offsetTop/offsetWidth/offsetParent_heightoffset-CSDN博客 client系列 搞清clientHeight、offsetHeight、scrollHeight、offsetTop、scrollTop-CSDN博客 scroll系列 秒懂scr…

RabbitMQ:1.概述及安装

概述 AMQP协议 MQ Message Queue&#xff08;消息队列&#xff09;是在消息的传输过程中保存消息的容器&#xff0c;多用于系统之间的异步通信 AMQP Advanced Message Queuing Protocol(高级消息队列协议)是一个网络协议&#xff0c;2006年AMQP规范发布【类比HTTP】 专门为消…

ubuntu安装zsh及环境配置

ubuntu安装zsh及环境配置 MacBook 安装 zsh 个人很喜欢使用zsh,它的终端显示很清晰,命令都很友好,使用git时,直接可以看到当前分支和修改状态 zsh安装 1.查看当前系统装了哪些shellcat /etc/shells 2.当前正在运行的是哪个版本的shellecho $SHELL 3.安装zshsudo apt-get -y …

【项目笔记】java微服务:黑马头条(day03)

文章目录 自媒体文章发布1)自媒体前后端搭建1.1)后台搭建1.2)前台搭建 2)自媒体素材管理2.1)素材上传2.2.1)需求分析2.2.2)素材管理-图片上传-表结构2.2.3)实现思路2.2.4)接口定义2.2.5)自媒体微服务集成heima-file-starter2.2.6)具体实现 2.2)素材列表查询2.2.1)接口定义2.2.2…

【Algorithm】动态规划和递归问题:动态规划和递归有什么区别?如何比较递归解决方案和它的迭代版本?

【Algorithm】动态规划和递归问题:动态规划和递归有什么区别?如何比较递归解决方案和它的迭代版本? 1. 动态规划(Dynamic Programming,DP)和递归定义及优缺点1.1 递归 (Recursion)定义及优缺点1.2 动态规划 (Dynamic Programming)定义及优缺点2. 动态规划(DP)和递归的特…

platform设备注册驱动模块的测试

一. 简介 上一篇文章编写了 platform设备注册代码&#xff0c;文章地址如下&#xff1a; 无设备树platform驱动实验&#xff1a;platform设备注册代码实现-CSDN博客 本文继续无设备树platform驱动实验&#xff0c;本文对编译好的 设备注册程序进行测试&#xff0c;测试所实…

Linux远程连接本地数据库(docker)

1. 安装docker 参考上一篇文章 CentOS安装Docker 2. Linux中安装Mysql 2.1 docker拉取mysql镜像 拉取镜像 docker pull mysql查看镜像列表 docker images2.2 运行mysql容器 运行一个名字为mysql的mysql容器&#xff0c;其连接端口号为3306&#xff0c;密码为123456 docker r…

运行gazebo机器人模型没有cmd_vel话题

运行赵虚左教程代码出现上诉问题 roslaunch urdf02_gazebo demo03_env.launch 原因&#xff1a;缺少某个包 在工作空间catkin_make编译发现报错 解决&#xff1a; sudo apt-get install ros-noetic-gazebo-ros-pkgs ros-noetic-gazebo-ros-control 下载后再次运行launch文件…

unity学习(59)——选择角色界面--MapHandler1

map内容需要结合客户端和服务器两部分的流程&#xff1a; 1.客户端第一次发出的command的4&#xff0c;选择请求&#xff1a; 2.服务器做出相应&#xff1a; select的内容&#xff0c;最后就是返回当前玩家的playerModel结构体。 3.去客户端里找type 2&#xff08;user2&#…

服务器数据恢复—服务器硬盘灯显示红色的数据恢复案例

服务器数据恢复环境&故障&#xff1a; 一台服务器中有一组由多块硬盘组建的raid阵列&#xff0c;在运行过程中服务器突然崩溃&#xff0c;管理员检查服务器发现该服务器raid阵列中有两块硬盘的指示灯显示红色。于是&#xff0c;管理员重启服务器&#xff0c;服务器重启后&a…

maven私服搭建详细教程

1、为什么需要私服 如果在公司中多个项目模块中的的公共类用的都是一样的&#xff0c;那么不可能将这些一样的代码写两遍。所以将其中一个项目中的代码打包成私服&#xff0c;然后在另外一个模块中去进行引用。 除此之外&#xff0c;如果大公司中开发人员较多&#xff0c;大家同…

Dev C++和Visual Studio Code哪个好?

Dev C和Visual Studio Code哪个好&#xff1f; Dev C和Visual Studio Code都是常用的集成开发环境&#xff08;IDE&#xff09;&#xff0c;用于编写和调试代码。它们各自有不同的优点和适用场景。 在开始前我有一些资料&#xff0c;是我根据网友给的问题精心整理了一份「C的资…

对模型性能进行评估(Machine Learning 研习十五)

在上一篇我们已然训练了一个用于对数字图像识别的模型&#xff0c;但我们目前还不知道该模型在识别数字图像效率如何&#xff1f;所以&#xff0c;本文将对该模型进行评估。 使用交叉验证衡量准确性 评估模型的一个好方法是使用交叉验证&#xff0c;让我们使用cross_val_score…

Linux从0到1——Linux第一个小程序:进度条

Linux从0到1——Linux第一个小程序&#xff1a;进度条 1. 输出缓冲区2. 回车和换行的本质3. 实现进度条3.1 简单原理版本3.2 实际工程版本 1. 输出缓冲区 1. 小实验&#xff1a; 编写一个test.c文件&#xff0c;&#xff1a; #include <stdio.h> #include <unistd.h…