文章目录
- 前言
- 一、信道的定义与使用
- 信道的声明
- 信道的使用
- 二、信道的容量与长度
- 三、缓冲信道与无缓冲信道
- 缓冲信道
- 无缓冲信道
- 四、信道的初体验
- 信道关闭的广播机制
- 总结
前言
Goroutine的开发,当遇到生产者消费者场景的时候,离不开 channel(信道)的使用。
信道,就是一个管道,连接多个goroutine程序 ,它是一种队列式的数据结构,遵循先入先出的规则。
一、信道的定义与使用
信道的声明
信道声明的两种方式:
// 先声明再初始化
var 信道实例 chan 信道类型
信道实例 = make(chan 信道类型)
// 上面两句合并
信道实例 := make(chan 信道类型)
信道的使用
发送数据,接收数据
// 往信道中发送数据
pipline<- 200
// 从信道中取出数据,并赋值给mydata
mydata := <-pipline
信道用完了,可以对其进行关闭,避免有人一直在等待。但是你关闭信道后,接收方仍然可以从信道中取到数据,只是接收到的会永远是 0。
close(pipline)
当从信道中读取数据时,可以有多个返回值,其中第二个可以表示 信道是否被关闭,如果已经被关闭,ok 为 false,若还没被关闭,ok 为true。
x, ok := <-pipline
二、信道的容量与长度
一般创建信道都是使用 make 函数,make 函数接收两个参数
- 第一个参数:必填,指定信道类型
- 第二个参数:选填,不填默认为0,指定信道的容量(可缓存多少数据)
对于信道的容量,很重要,这里要多说几点:
- 当容量为0时,说明信道中不能存放数据,在发送数据时,必须要求立马有人接收,否则会报错。此时的信道称之为无缓冲信道。
- 当容量为1时,说明信道只能缓存一个数据,若信道中已有一个数据,此时再往里发送数据,会造成程序阻塞。 利用这点可以利用信道来做锁。
- 当容量大于1时,信道中可以存放多个数据,可以用于多个协程之间的通信管道,共享资源。
信道的容量,可以使用 cap 函数获取 ,而信道的长度,可以使用 len 长度获取。
package main
import "fmt"
func main() {
pipline := make(chan int, 10)
fmt.Printf("信道可缓冲 %d 个数据\n", cap(pipline))
pipline <- 1
pipline <- 1
fmt.Printf("信道中当前有 %d 个数据\n", len(pipline))
}
输出结果
[root@work day01]# go run channel.go
信道可缓冲 10 个数据
信道中当前有 2 个数据
三、缓冲信道与无缓冲信道
按照是否可缓冲数据可分为:缓冲信道 与 无缓冲信道
缓冲信道
允许信道里存储一个或多个数据,这意味着,设置了缓冲区后,发送端和接收端可以处于异步的状态。
pipline := make(chan int, 10)
无缓冲信道
在信道里无法存储数据,这意味着,接收端必须先于发送端准备好,以确保你发送完数据后,有人立马接收数据,否则发送端就会造成阻塞,原因很简单,信道中无法存储数据。也就是说发送端和接收端是同步运行的。
pipline := make(chan int)
四、信道的初体验
信道的用途就是传递数据信息,下面以经典的生产者和消费者场景,实践下信道是使用。
下面的例子中,声明了一个容量为10的传递整型的信道。生产者生产的数据放入信道,消费者将从信道的数据打印。
这里,我们消费者有两种,其实是演示了信道遍历的两种常见的方式。
第一种是用for range方式
第二种是用for循环,断言判断信道是否关闭,关闭了就退出。
package main
import (
"fmt"
"sync"
)
// 生产者
func producer(mychan chan int, wg *sync.WaitGroup) {
defer wg.Done()
for i := 1; i <= 10; i++ {
mychan <- i
fmt.Printf("produce: %v\n", i)
fmt.Printf("pipeline Length: %v \n", len(mychan))
}
// 记得 close 信道
// 不然主函数中遍历完并不会结束,而是会阻塞。
close(mychan)
}
//消费者 Range方式
func consumer_range(name string, mychan chan int, wg *sync.WaitGroup) {
defer wg.Done()
for k := range mychan {
fmt.Printf("consumer_range %v: %v\n", name, k)
}
}
//消费者 断言方式
func consumer_assert(name string, mychan chan int, wg *sync.WaitGroup) {
defer wg.Done()
for {
// if data, ok := <-mychan; ok ok为bool值,true表示正常接受,false表示通道关闭。
if data, ok := <-mychan; ok {
fmt.Printf("consumer_assert %v: %v\n", name, data)
} else {
break
}
}
}
func main() {
var wg sync.WaitGroup
pipline := make(chan int, 10)
wg.Add(3)
go producer(pipline, &wg)
go consumer_range("ID1", pipline, &wg)
go consumer_assert("ID2", pipline, &wg)
wg.Wait()
}
输出的结果:
[root@work day01]# go run channel3.go
produce: 1
pipeline Length: 0
produce: 2
pipeline Length: 1
produce: 3
pipeline Length: 2
produce: 4
pipeline Length: 3
produce: 5
pipeline Length: 4
produce: 6
pipeline Length: 5
produce: 7
pipeline Length: 6
produce: 8
pipeline Length: 7
produce: 9
pipeline Length: 8
produce: 10
pipeline Length: 9
consumer_assert ID2: 1
consumer_assert ID2: 2
consumer_assert ID2: 3
consumer_assert ID2: 4
consumer_assert ID2: 5
consumer_assert ID2: 6
consumer_assert ID2: 7
consumer_assert ID2: 8
consumer_assert ID2: 9
consumer_assert ID2: 10
信道关闭的广播机制
上面的案例中,有一点需要注意,我们的生产者函数中,在数据生产结束后,调用了close(mychan)方法,关闭了信道。如果不关闭程序还能正常执行吗?我们手动修改下代码,将代码改造如下,再次执行。
// 记得 close 信道
// 不然主函数中遍历完并不会结束,而是会阻塞。
// close(mychan)
执行结果如下:
[root@work day01]# go run channel3.go
produce: 1
pipeline Length: 0
produce: 2
pipeline Length: 0
produce: 3
pipeline Length: 1
produce: 4
pipeline Length: 2
produce: 5
pipeline Length: 3
produce: 6
pipeline Length: 4
produce: 7
pipeline Length: 5
produce: 8
pipeline Length: 6
produce: 9
pipeline Length: 7
produce: 10
pipeline Length: 8
consumer_assert ID2: 1
consumer_assert ID2: 3
consumer_assert ID2: 4
consumer_assert ID2: 5
consumer_assert ID2: 6
consumer_assert ID2: 7
consumer_assert ID2: 8
consumer_assert ID2: 9
consumer_assert ID2: 10
consumer_range ID1: 2
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [semacquire]:
sync.runtime_Semacquire(0xc000010060?)
/usr/local/go/src/runtime/sema.go:62 +0x27
sync.(*WaitGroup).Wait(0x0?)
/usr/local/go/src/sync/waitgroup.go:116 +0x4b
main.main()
/data/go_projects/third_packages/goroutine/day01/channel3.go:50 +0x157
goroutine 7 [chan receive]:
main.consumer_range({0x49bc4f, 0x3}, 0x0?, 0x0?)
/data/go_projects/third_packages/goroutine/day01/channel3.go:25 +0x11f
created by main.main
/data/go_projects/third_packages/goroutine/day01/channel3.go:48 +0xf9
goroutine 8 [chan receive]:
main.consumer_assert({0x49bc52, 0x3}, 0x0?, 0x0?)
/data/go_projects/third_packages/goroutine/day01/channel3.go:34 +0x11d
created by main.main
/data/go_projects/third_packages/goroutine/day01/channel3.go:49 +0x14d
exit status 2
通过上面的结果,我们看到消费者在消费完信道中的消息后,就panic退出了。
当程序一直在等待从信道里读取数据,而此时并没有人会往信道中写入数据。此时程序就会陷入死循环,造成死锁。
这里在现实生活中也有类似的例子,生产者是网红,每天摆摊,大量消费者蹲守在网红摊点前打卡,突然一天网红不出摊了,也没在微信和社交媒体更新动态,那消费者一天一直苦等。正确的做法是网红在社交媒体上发布下今天不出摊的通知,避免粉丝等待。
if data, ok := <-mychan; ok ok为bool值,true表示正常接受,false表示通道关闭。
同理,我们上面的案例一样,当生产者不在生产的时候,应该关闭信道。因为信道关闭是有关播机制的,所有的channel接收者都会在chennel关闭时,立刻从阻塞等待中返回ok值为false。
总结
- 关闭一个未初始化的 channel 会产生 panic
- 重复关闭同一个 channel 会产生 panic
- 向一个已关闭的 channel 发送消息会产生 panic
- 从已关闭的 channel 读取消息不会产生 panic,且能读出 channel 中还未被读取的消息,若消息均已被读取,则会读取到该类型的零值
- 从已关闭的 channel 读取消息永远不会阻塞,并且会返回一个为 false 的值,用以判断该 channel 是否已关闭(x,ok := <- ch)
- 关闭 channel 会产生一个广播机制,所有向 channel 读取消息的 goroutine 都会收到消息
- 如果写端没有写数据,也没有关闭channel 。<-ch; 会阻塞