Go协程goroutine与管道channel
- 1 协程goroutine
- 1.1 基本介绍
- 1.2 快速入门
- 1.3 调度模型:MPG模式介绍
- 1.4 设置cpu数
- 1.5 协程资源竞争问题
- 1.6 解决协程并发方案
- 2 管道channel
- 2.1 基本介绍
- 2.2 快速入门
- 2.3 管道的关闭和遍历
- 2.4 管道和协程的结合
- 2.5 声明 只读/只写 的管道
- 2.6 select解决管道堵塞
- 2.7 recover解决程序崩溃
1 协程goroutine
1.1 基本介绍
前置知识:“进程和线程”,“并发与并行”
- 协程的概念
协程(Coroutine)是一种用户态的轻量级线程,不同于操作系统线程,协程能够在单个线程中实现多任务并发,使用更少的系统资源。协程的运行由程序控制,不需要操作系统介入,因此协程之间的切换更加快速。
- Go语言中的协程
在Go语言中,协程被称为“Goroutine”,是一种轻量级的线程。与操作系统线程不同,Go语言的Goroutine只需要几kb的内存,并且可以很容易地创建数千个Goroutine,因为它们由Go运行时(Goruntime)自动管理。
Go协程和Go主线程
- Go主线程(有程序员直接称为线程/也可以理解成进程):一个Go线程上,可以起多个协程,你可以这样理解,
协程是轻量级的线程[编译器做优化]
。 - Go协程的特点:
- 有独立的栈空间
- 共享程序堆空间
- 调度由用户控制
- 协程是轻量级的线程
1.2 快速入门
案例说明:
- 在主线程(可以理解成进程)中,开启一个goroutine,该协程每隔1秒输出 “hello,world”。
- 在主线程中也每隔一秒输出"hello,golang",输出10次后,退出程序。
- 要求主线程和goroutine同时执行。
func test() {
for i := 1; i < 10; i++ {
fmt.Println("协程 test() hello, world" + strconv.Itoa(i))
time.Sleep(time.Second)
}
}
func main() {
// 开启协程
go test()
for i := 1; i < 10; i++ {
fmt.Println("主线程 main() hello, world" + strconv.Itoa(i))
time.Sleep(time.Second)
}
}
输出结果:
主线程 main() hello, world1
协程 test() hello, world1
协程 test() hello, world2
主线程 main() hello, world2
主线程 main() hello, world3
协程 test() hello, world3
略。。。
从输出结果就可以看出来,这两个是交替输出的,就是并发执行~
小结:
-
主线程是一个物理线程,直接作用在cpu上的,是重量级的,非常消费cpu资源。
-
协程从主线程开启的,是轻量级的线程,是逻辑态。对资源消耗相对较小。
-
Golang的协程机制是重要的特点,可以轻松的开启上万个协程。其他编程语言的并发机制是一般基于线程的,开启过多的线程,资源消费大,这里就突显Golang在并发上的优势了。
1.3 调度模型:MPG模式介绍
这里只是简单的讲一下,具体可以去网上找找文章
- M:操作系统的主线程(是物理线程)
- P:协程执行需要的上下文(需要的资源等)
- G:协程
状态一:
状态二:
1.4 设置cpu数
介绍:为了充分的利用多cpu的优势,在Golang程序中,设置运行的cpu数目。
使用的函数:func NumCPU
功能:NumCPU返回本地机器的逻辑CPU个数。
函数的代码如下:
func NumCPU() int
使用案例:
func main() {
// 获取当前系统cpu的数量
num := runtime.NumCPU()
// 我这里设置num-1的cpu运行go程序
runtime.GOMAXPROCS(num)
fmt.Println("num=", num)
}
输出结果:num= 16
- go 1.8 后,默认让程序运行在多个核上,可以不用设置了。
- go 1.8 前,还是要设置一下,可以更新的利用cpu。
1.5 协程资源竞争问题
**需求:**现在要计算1-200的各个数的阶乘,并且把各个数的阶乘放入到map中。最后显示出来。要求使用goroutine完成。
分析思路:
- 使用goroutine来完成,效率搞,但是会出现并发/并行安全问题。
- 这里就提出了不同goroutine如何通信的问题。
代码实现:
- 使用goroutine来完成(看看使用gorotine并发完成会出现什么问题?然后我们会去解决)
- 在运行某个程序是,如何知道是否存在资源竞争问题。方法很简单,在编译该程序时,增加一个参数 -race即可。
思路
- 编写一个函数,来计算各个数的阶乘,并放入到 map中。
- 我们启动的协程多个,统计的将结果放入到 map中。
- map 应该做出一个全局的。
初步代码:
var (
myMap = make(map[int]int, 10)
)
// test 函数就是计算 n!, 让将这个结果放入到 myMap
func test(n int) {
res := 1
for i := 1; i <= n; i++ {
res *= i
}
}
func main() {
// 我们这里开启多个协程完成这个任务[200个]
for i := 1; i <= 20; i++ {
go test(i)
}
//我们输出结果,变量这个
for i, v := range myMap {
fmt.Printf("map[%d]=%d\n", i, v)
}
}
输出结果:无???什么都没有??为啥呢?因为主线程提前结束了,在协程结束前结束了,就什么都没有!!
下一步:我们让主线程休眠10秒钟
添加代码:
//休眠10秒钟【第二个问题 】
time.Sleep(time.Second * 5)
再运行,会发现竟然报错了:
也就是报错显示,恐怖错误,并发向协程做了写操作。
为啥?
因为,map是不安全的,也就是,200个线程同时向map里面写操作
,导致并发问题。
所以现在有两个问题:
- map是不安全的,导致并发问题。
- 主线程休眠时间无法确定。
1.6 解决协程并发方案
- 声明一个全局的互斥锁,当第一个线程进行写操作,其他的线程没办法进去操作并且进入一个队列(数据结构)进行排队。
var (
myMap = make(map[int]int, 10)
//声明一个全局的互斥锁
//lock 是一个全局的互斥锁,
//sync 是包: synchornized 同步
//Mutex : 是互斥
lock sync.Mutex
)
func test(n int) {
res := 1
for i := 1; i <= n; i++ {
res *= i
}
//这里我们将 res 放入到myMap
//加锁
lock.Lock()
myMap[n] = res //concurrent map writes?
//解锁
lock.Unlock()
}
func main() {
// 我们这里开启多个协程完成这个任务[200个]
for i := 1; i <= 20; i++ {
go test(i)
}
//这里我们输出结果,变量这个结果
lock.Lock()
for i, v := range myMap {
fmt.Printf("map[%d]=%d\n", i, v)
}
lock.Unlock()
}
输出结果:会发现没有问题~~
2 管道channel
虽然上面的方式解决了这个问题,但是不够完美,包括官方也说了,加锁是比较低级的做法,所以就引出了管道。
为什么需要channel
-
前面使用全局变量加锁同步来解决goroutine的通讯,但不完美
-
主线程在等待所有goroutine全部完成的时间很难确定,我们这里设置10秒,仅仅是估算。
-
如果主线程休眠时间长了,会加长等待时间,如果等待时间短了,可能还有goroutine处于工作状态,这时也会随主线程的退出而销毁
-
通过全局变量加锁同步来实现通讯,也并不利用多个协程对全局变量的读写操作。
-
上面种种分析都在呼唤一个新的通讯机制-channel
2.1 基本介绍
- chanel 本质就是一个数据结构-队列
- 数据是先进先出【FIFO】
- 线程安全,多goroutine 访问时,不需要加锁,就是说channel 本身就是线程安全的。
- channel 有类型的,一个 string 的channel 只能存放string类型数据。
2.2 快速入门
定义/声明channel
var 变量名 chen 数据类型
说明:
- channel是引用类型
- channel必须初始化才能写入数据,即make后才能使用。
- 管道是有类型的,intChan 只能写入整数int
案例入门:
func main() {
//演示一下管道的使用
//1. 创建一个可以存放3个int类型的管道
var intChan chan int
intChan = make(chan int, 3)
//2. 看看intChan是什么
fmt.Printf("intChan 的值=%v intChan本身的地址=%p\n", intChan, &intChan)
}
输出结果:
intChan 的值=0xc000014700 intChan本身的地址=0xc00005e020
下面我们试一下向管道写入数据和读取数据
func main() {
//演示一下管道的使用
//1. 创建一个可以存放3个int类型的管道
var intChan chan int
intChan = make(chan int, 3)
//2. 看看intChan是什么
fmt.Printf("intChan 的值=%v intChan本身的地址=%p\n", intChan, &intChan)
//3. 向管道写入数据
intChan<- 10
num := 211
intChan<- num
intChan<- 50
// //如果从channel取出数据后,可以继续放入
<-intChan
intChan<- 98//注意点, 当我们给管写入数据时,不能超过其容量
//4. 看看管道的长度和cap(容量)
fmt.Printf("channel len= %v cap=%v \n", len(intChan), cap(intChan)) // 3, 3
//5. 从管道中读取数据
var num2 int
num2 = <-intChan
fmt.Println("num2=", num2)
fmt.Printf("channel len= %v cap=%v \n", len(intChan), cap(intChan)) // 2, 3
//6. 在没有使用协程的情况下,如果我们的管道数据已经全部取出,再取就会报告 deadlock
num3 := <-intChan
num4 := <-intChan
//num5 := <-intChan
fmt.Println("num3=", num3, "num4=", num4/*, "num5=", num5*/)
}
输出结构:
intChan 的值=0xc000014700 intChan本身的地址=0xc00005e020
channel len= 3 cap=3
num2= 211
channel len= 2 cap=3
num3= 50 num4= 98
2.3 管道的关闭和遍历
channel的关闭
使用内置函数close可以关闭chanel,当channel关闭后,就不能再向channel写数据了,但是仍让可以从该channel读数据。
channel的遍历
channel支持for-range的方式进行遍历,请注意两个心结
- 在遍历时,如果channel没有关闭,则会出现deadlock的错误。
- 在遍历时,如果channel已经关闭,则会正常遍历数据,遍历完后,就会退出遍历。
代码演示:
func main() {
intChan := make(chan int, 3)
intChan <- 100
intChan <- 200
close(intChan) // close
//这是不能够再写入数到channel
//intChan<- 300
fmt.Println("okook~")
//当管道关闭后,读取数据是可以的
n1 := <-intChan
fmt.Println("n1=", n1)
//遍历管道
intChan2 := make(chan int, 100)
for i := 0; i < 10; i++ {
intChan2 <- i * 2 //放入10个数据到管道
}
//遍历管道不能使用普通的 for 循环
// for i := 0; i < len(intChan2); i++ {
// }
//在遍历时,如果channel没有关闭,则会出现deadlock的错误
//在遍历时,如果channel已经关闭,则会正常遍历数据,遍历完后,就会退出遍历
close(intChan2)
for v := range intChan2 {
fmt.Println("v=", v)
}
}
输出结果:
okook~
n1= 100
v= 0
v= 2
v= 4
v= 6
后面略~~~
2.4 管道和协程的结合
需求:
要求统计 1-80000 的数字中,哪些是素数?
下面是思路分析图:
- 创建一个管道intChan,用来存这8000个数,设置容量1000
- 创建一个协程,将1到8000放到这个inChan管道里
- 创建一个管道primeChan,用来储存素数,只要是素数都存进来
- 创建四个协程,从inChan管道里取数,并计算受否为素数,如果是素数就放到primeChan里
- 那怎么确认协程传输完毕,然后把管道关掉?
- 创建一个管道exitChan,这里是四个协程,那就容量为4
- 当这四个协程取不到数的时候,就会向exitChan管道,传一个True,表示结束
- 主线程进行一个循环取exitChan,当取出来的数达到四个的时候,就关闭,这两个管道
package main
import (
"fmt"
"time"
)
// 向 intChan放入 1-100个数
func putNum(intChan chan int) {
for i := 1; i <= 100; i++ {
intChan <- i
}
//关闭intChan
close(intChan)
}
// 从 intChan取出数据,并判断是否为素数,如果是,就
//
// //放入到primeChan
func primeNum(intChan chan int, primeChan chan int, exitChan chan bool) {
//使用for 循环
// var num int
var flag bool //
for {
//time.Sleep(time.Millisecond * 10)
num, ok := <-intChan //intChan 取不到..
if !ok {
break
}
flag = true //假设是素数
//判断num是不是素数
for i := 2; i < num; i++ {
if num%i == 0 { //说明该num不是素数
flag = false
break
}
}
if flag {
//将这个数就放入到primeChan
primeChan <- num
}
}
fmt.Println("有一个primeNum 协程因为取不到数据,退出")
//这里我们还不能关闭 primeChan
//向 exitChan 写入true
exitChan <- true
}
func main() {
intChan := make(chan int, 100)
primeChan := make(chan int, 100) //放入结果
//标识退出的管道
exitChan := make(chan bool, 8) // 4个
start := time.Now().Unix()
//开启一个协程,向 intChan放入 1-8000个数
go putNum(intChan)
//开启4个协程,从 intChan取出数据,并判断是否为素数,如果是,就
//放入到primeChan
for i := 0; i < 8; i++ {
go primeNum(intChan, primeChan, exitChan)
}
//这里我们主线程,进行处理
//直接
go func() {
for i := 0; i < 8; i++ {
<-exitChan
}
end := time.Now().Unix()
fmt.Println("使用协程耗时=", end-start)
//当我们从exitChan 取出了4个结果,就可以放心的关闭 prprimeChan
close(primeChan)
}()
res := make([]int, 0)
//遍历我们的 primeChan ,把结果取出
for {
data, ok := <-primeChan
if !ok {
break
}
res = append(res, data)
}
fmt.Printf("100的素数有: %v个,分别是:%v \n", len(res), res)
fmt.Println("main线程退出")
}
输出结果:
有一个primeNum 协程因为取不到数据,退出
有一个primeNum 协程因为取不到数据,退出
有一个primeNum 协程因为取不到数据,退出
有一个primeNum 协程因为取不到数据,退出
有一个primeNum 协程因为取不到数据,退出
有一个primeNum 协程因为取不到数据,退出
有一个primeNum 协程因为取不到数据,退出
有一个primeNum 协程因为取不到数据,退出
使用协程耗时= 0
100的素数有: 26个,分别是:[1 2 3 5 7 11 13 17 19 23 29 31 37 41 43 47 53 59 61 67 71 73 79 83 89 97]
main线程退出
补充:这里用了一个记录协程消耗时间的方法
start := time.Now().Unix()
end := time.Now().Unix()
fmt.Println("使用协程耗时=", end-start)
补充知识:管道阻塞
在使用channel通道进行数据传递时,接收方从通道中获取数据的操作可以是阻塞的。具体来说,有以下几种情况会导致接收方的操作阻塞:
- 通道中没有可用的数据:当接收方尝试从一个没有数据的通道中获取数据时,它将被阻塞,等待通道中有数据可供接收。
- 通道中没有发送者:如果通道已经关闭,并且没有任何Goroutine发送数据到该通道,接收方将永久地阻塞在接收操作中。
- 通道中发送数据速度过慢:如果接收方的处理速度快于发送方的发送速度,那么接收方在获取到一条数据后,等待一段时间后可能会发现再次从通道中获取的数据仍然不可用,因此接收方将再次被阻塞。
- 管道未关闭进行遍历操作:在遍历通道时,如果通道没有被关闭,并且发送方没有向通道发送数据,接收方的遍历操作会被阻塞。
需要注意的是,当通道被阻塞时,程序的执行仍然会继续,只是被阻塞的Goroutine会暂停执行,直到满足接收操作的条件。
2.5 声明 只读/只写 的管道
channel可以声明为只读,或者只写性质。
var chan1 chan<- int // 声明为只写
var chan2 <-chan int// 声明为只写
注意:如果在一个协程里传入了一个管道,并且设置它只读只写,那么,因为作用域,就只在这个协程里面是属于只读只写的情况,可以防止一下误操作。
2.6 select解决管道堵塞
使用select可以解决从管道取数据的阻塞问题
案例代码,自学理解:
func main() {
//使用select可以解决从管道取数据的阻塞问题
//1.定义一个管道 10个数据int
intChan := make(chan int, 10)
for i := 0; i < 10; i++ {
intChan <- i
}
//2.定义一个管道 5个数据string
stringChan := make(chan string, 5)
for i := 0; i < 5; i++ {
stringChan <- "hello" + fmt.Sprintf("%d", i)
}
//传统的方法在遍历管道时,如果不关闭会阻塞而导致 deadlock
//问题,在实际开发中,可能我们不好确定什么关闭该管道.
//可以使用select 方式可以解决
//label:
for {
select {
//注意: 这里,如果intChan一直没有关闭,不会一直阻塞而deadlock
//,会自动到下一个case匹配
case v := <-intChan:
fmt.Printf("从intChan读取的数据%d\n", v)
time.Sleep(time.Second)
case v := <-stringChan:
fmt.Printf("从stringChan读取的数据%s\n", v)
time.Sleep(time.Second)
default:
fmt.Printf("都取不到了,不玩了, 程序员可以加入逻辑\n")
time.Sleep(time.Second)
return
//break label
}
}
}
这样就能够解决啦~~~
2.7 recover解决程序崩溃
**说明:**如果我们起了一个协程,但是这个协程出现了panic,如果我们没有捕获这个panic,就会早晨整个程序崩溃,这时我,1可以砸爱goroutine中使用recover来捕获panic,进行处理,这样即使这个协程发生了问题,但是主线程仍然不受影响,可以继续执行。
// 函数
func sayHello() {
for i := 0; i < 10; i++ {
time.Sleep(time.Second)
fmt.Println("hello,world")
}
}
// 函数
func test() {
//这里我们可以使用defer + recover
defer func() {
//捕获test抛出的panic
if err := recover(); err != nil {
fmt.Println("test() 发生错误", err)
}
}()
//定义了一个map
var myMap map[int]string
myMap[0] = "golang" //error
}
func main() {
go sayHello()
go test()
for i := 0; i < 10; i++ {
fmt.Println("main() ok=", i)
time.Sleep(time.Second)
}
}
输出结果:
main() ok= 0
test() 发生错误 assignment to entry in nil map
main() ok= 1
hello,world
hello,world
main() ok= 2
略~~~
Over!!!!坚持就是胜利!!兄弟们!!!冲冲冲!!!