Go语言并发学习目标
出色的并发性是Go语言的特色之一
- • 理解并发与并行
- • 理解进程和线程
- • 掌握Go语言中的Goroutine和channel
- • 掌握select分支语句
- • 掌握sync包的应用
并发与并行
并发与并行的概念这里不再赘述,
可以看看之前java版写的并发实践;
进程和线程
-
程序、进程与线程这里也不赘述
- 一个进程可以包括多个线程,线程是容器中的工作单位;
协程~Goroutine
概念:
协程(Coroutine),最初在1963年被提出,又称为微线程,是一种比线程更加轻量级的存在。正如一个进程可以拥有多个线程,一个线程也可以拥有多个协程;
协程是编译器级的,进程和线程是操作系统级的。
协程不被操作系统内核管理,而完全由程序控制,因此没有线程切换的开销。和多线程比,线程数量越多,协程的性能优势就越明显。协程的最大优势在于其轻量级,可以轻松创建上万个而不会导致系统资源衰竭。
Go语言中的协程
Go与语言中的协程由运行时调度和管理,Go会智能将协程中的任务合理的分配给每个CPU;
一开始创建的协程堆栈很小,但是可以根据需要增长和收缩;
Coroutine与Goroutine
Goroutine能并行执行,Coroutine只能顺序执行,Go中Goroutine可以在单线程中产生,也可以在多线程中产生;
Coroutine程序需要主动交出控制权,系统才能获得控制权并将控制权交给其他Coroutine.
Coroutine属于协作处理,在应用程序不使用CPU时,需要让渡CPU,否则会使得计算机失去响应或者宕机;
Goroutine属于抢占式任务处理,和现有的多线程和多进程任务处理类似;应用程序对CPU的控制最终由操作系统来管理,如果操作系统发现一个应用程序长时间占用CPU,那么用户有权终止这个任务。
在Go中开启协程
在Go中开启协程----只需要在函数前面加上关键字go,将会同时运行一个新的Goroutine;
注意:使用go关键字创建协程时,被调用的函数往往没有返回值,如果函数有返回值,那么返回值会被忽略,那我们就是要返回值时,必须使用channel,通过channel把数据从中取出来;
Go程序的执行过程
创建和启动主Goroutine,初始化操作,执行main函数,当main函数执行结束后,程序也就结束了;
代码demo
func helloworld() {
fmt.Println("hello world")
}
func main() {
go helloworld()
fmt.Println("main exit")
}
运行结果:
但是这可能并不是我们看到的全部,如果main()的Goroutine比子Goroutine后终止,那么我们就会看到打印出的hello world;
多试几次(直接运行应该还是上面的结果,如果debug,就会出现打印hello world,这是因为协程有足够的时间反应;
我们来看一下如果加上睡眠时间:
func helloworld() {
fmt.Println("hello world")
}
func main() {
go helloworld()
time.Sleep(10 * time.Microsecond)
fmt.Println("main exit")
}
运行结果:
如果我们在fmt.Println("main exit")
前加上defer 会是什么结果?会打印 hello world吗?
我试了一下,不行:
我们来看一下defer 关键字的释义
“defer”语句调用一个函数,该函数的执行被延迟到周围的函数返回的那一刻,要么是因为周围的函数执行了return语句,到达了它的函数体的末尾,要么是因为对应的协程出现了恐慌。
所以下面的代码运行结果是什么?
代码demo3
func helloworld() {
fmt.Println("hello world")
}
func helloworld2() string {
fmt.Println("hello world222222")
return "hello world222222"
}
func main() {
go helloworld() //要留有足够的时间,否则main()的Goroutine终止了,程序将被终止,该协程根本就没有机会表现
defer helloworld2()
defer fmt.Println("main exit")
}
我们来分析一下 程序的执行(即代码demo3):
go程序启动时,runtime默认为main函数创建一个Goroutine;
在main函数的Goroutine执行到 go helloworld()即加了关键字go的方法时.归属于helloworld()函数的Goroutine被创建,helloworld()函数开始在自己的Goroutine中执行,
此时main函数的Goroutine继续执行,如果helloworld()函数不能够在main函数的Goroutine执行完毕之前将任务处理完毕,那么就会发生helloworld()函数没有执行的样子 ;
下面我们来修改上面的代码:
func helloworld() {
var i int
for {
i++
fmt.Println("add", i)
time.Sleep(time.Second)
}
}
func main() {
go helloworld() //要留有足够的时间,否则main()的Goroutine终止了,程序将被终止,该协程根本就没有机会表现
var str string
fmt.Scanln(&str) //阻塞
fmt.Println("main exit")
}
控制台不断输出add int,同时还可以接收用户输入。两个环节同时运行。
此时,main()继续执行,两个Goroutine通过Go程序的调度机制同时运行。
匿名函数创建Goroutine
即我们可以在匿名函数前加go关键字实现对匿名函数创建Goroutine;
func main() {
go func() {
var arr int
for {
arr++
fmt.Println("add", arr)
time.Sleep(time.Second)
}
}() //闭包
var str string
fmt.Scanln(&str)
fmt.Println("main exit")
}
启动多个Goroutine
func p1() {
for i := 0; i < 10; i++ {
time.Sleep(100 * time.Millisecond)
fmt.Print(i)
}
}
func p2() {
for i := 'a'; i < 'i'; i++ {
time.Sleep(500 * time.Millisecond)
fmt.Printf("%c", i)
}
}
func main() {
go p1()
go p2()
var str string
fmt.Scanln(&str)
fmt.Println("main exit")
}
多个Goroutine随机调度,打印的结果是数字与字母交叉输出
并发性能优化
在Go程序运行时,go 关键字实现了一个小型的任务调度器,该调度器是使用CPU的,那么怎么为其分配CPU呢 ?
go中可以使用runtime.Gosched()来交出CPU的控制权,同时我们可以使用runtime.GOMAXPROCS()来匹配CPU核心数量
- Go1.5版本之前,默认使用单核执行。
- Go1.5版本开始,默认执行runtime.GOMAXPROCS(逻辑CPU数量),让代码并发执行,最大效率地利用CPU。
//Gosched生成处理器,允许其他goroutines运行。它不是挂起当前的goroutine,这个就像java中的yield(),只是让出cpu,但同时我还可以跟其他协程竞争cpu
func Gosched() {
checkTimeouts()
mcall(gosched_m)
}
// GOMAXPROCS设置可以执行的cpu的最大数目同时返回之前的设置。默认为 runtime.NumCPU的值。如果n < 1,则不改变当前设置。
//当调度器改进时,此调用将消失。
func GOMAXPROCS(n int) int {
if GOARCH == "wasm" && n > 1 {
n = 1 // WebAssembly has no threads yet, so only one CPU is possible.
}
lock(&sched.lock)
ret := int(gomaxprocs)
unlock(&sched.lock)
if n <= 0 || n == ret {
return ret
}
stopTheWorldGC("GOMAXPROCS")
// newprocs will be processed by startTheWorld
newprocs = int32(n)
startTheWorldGC()
return ret
}
Channel 通道
go中channel是协程之间的通信机制,一个channel是一个通信管道,它可以让协程通过它给另一个协程发送数据. 每个channel都需要指定数据类型, 如需要发送 int 则可以定义一个 chan int;
传统的线程之间通过共享内存进行数据交互,不同的线程共享内存的同步问题使用锁来解决,但是在go中,数据的传递使用channel来实现;
type Animal struct {
name string
age int
weight int
}
type Dog struct {
Animal //匿名结构体
}
func channelT() {
//声明channel
var ch chan int
var c chan interface{}
var d chan Dog
fmt.Printf("%v \n %v\n %v\n", ch, c, d)
//chan类型的空值是nil,声明后需要配合make()才能使用。
//channel是引用类型,需要使用make()进行创建
ch = make(chan int, 1024)
c = make(chan interface{}, 1024)
d = make(chan Dog)
fmt.Println("ch的size", len(ch))
fmt.Println("c的size", len(c))
fmt.Println("dogs的size", len(d))
}
func main() {
channelT()
}
使用channel发送数据
通过channel发送数据需要使用 特殊的操作符 <-
channel发送的值的类型必须与channel的元素类型一致。
如果接收方一直没有接收,那么发送操作将持续阻塞。此时所有的Goroutine,包括main()的Goroutine都处于等待状态。
运行会提示报错:fatal error: all goroutines are asleep - deadlock!。
死锁的产生
使用channel时要考虑死锁的可能性,即一个Goroutine在channel上发送数据,但是没有人接收,那么就会出现死锁;或者一个Goroutine正在等待从channel接收数据,但是其他的Goroutine要在channel上写入数据,如果没有写入,程序也会死锁;
channel接收数据
channel收发数据在不同的两个Goroutine间进行;
// 阻塞接收数据
func main() {
//搞一个channel
ch := make(chan string)
go sendData(ch) //开启协程往通道中发送数据
//data接收channel中的数据
//如果通道关闭,那么通道中传输的数据为个数据类型的默认值;
for {
data := <-ch //接收数据,执行该语句时channel将会阻塞,直到接收到数据并赋值给data变量。
if data == "" { //
break
}
fmt.Println("从通道中取出的数据1--->>", data)
}
//
for {
//data 表示接收到的数据。未接收到数据时,data为channel类型的零值。ok(布尔类型)表示是否接收到数据。通过ok值可以判断当前channel是否被关闭。
data, ok := <-ch
fmt.Println(ok)
if !ok {
break
}
fmt.Println("从通道中取出的数据2-->>", data)
}
for i2 := range ch {
fmt.Println("取出的数据3-->>", i2)
}
}
func sendData(ch chan string) {
for i := 0; i < 10; i++ {
ch <- fmt.Sprintf("发送数据%d \n", i)
}
fmt.Println("send exit")
//关闭通道
defer close(ch)
}
以上代码是sendData的协程 与main的协程之间的数据交互;
总结就是先声明一个通道,然后 一个函数的入参是一个通道,该函数用于向通道中发送数据,然后另一个协程来取出数据;
for … range会自动判断出channel已关闭,而无须通过判断来终止循环
如果要取的数据少了,要取的多了,而且通道没有关闭—>>
func main(){
ch := make(chan string)
go sendData(ch) //开启协程往通道中发送数据
for i := 0; i < 20; i++ {
data := <-ch //接收数据
if data == "" { //
break
}
fmt.Println("从通道中取出的数据1--->>", data)
}
}
func sendData(ch chan string) {
for i := 0; i < 10; i++ {
ch <- fmt.Sprintf("发送数据%d \n", i)
}
fmt.Println("send exit")
//关闭通道
//close(ch)
//defer close(ch)
}
发生了死锁;
如果最够关闭通道,就不会出现死锁;
阻塞
channel默认是阻塞的。当数据被发送到channel时会发生阻塞,直到有其他Goroutine从该channel中读取数据。当从channel读取数据时,读取也会被阻塞,直到其他Goroutine将数据写入该channel。
这就像java中的阻塞队列,不过go语言简化了,直接拿来用即可;
代码demo
func main() {
var ch chan int
ch = make(chan int)
fmt.Printf("%T \n", ch)
ch2 := make(chan bool)
go func() {
data, ok := <-ch
if ok {
fmt.Println("子goroutine取值", data)
}
ch2 <- true
}()
ch <- 10
<-ch2 // 阻塞,防止代码不打印 抛弃 ---即取出后没有接收的
defer close(ch)
defer close(ch2)
fmt.Println("main exit")
}
关闭channel:
发送方如果数据写入完毕,需要关闭channel,用于通知接收方数据传递完毕。
通常情况是发送方主动关闭channel。
接收方通过多重返回值判断channel是否关闭,如果返回值是false,则表示channel已经被关闭。往关闭的channel中写入数据会报错:panic: send on closed channel。但是可以从关闭后的channel中读取数据,返回数据的默认值和false。
func main() {
ch1 := make(chan int)
go func() {
//往channel中放入数据
ch1 <- 1
ch1 <- 2
ch1 <- 3
//关闭channel
close(ch1)
//在向通道中写入数据
ch1 <- 4
}()
for i := 0; i < 4; i++ {
data, ok := <-ch1
if !ok {
break
}
fmt.Println(data)
}
}
运行结果:
向已经关闭的channel写入数据会导致程序崩溃。
缓冲channel
我们默认创建的channel不是用于缓冲的,即读写都是即时阻塞的; 缓冲channel自带缓冲区,如果缓冲区满了才会发生阻塞;
缓冲channel的创建
func main() {
//channel
ch1 := make(chan int)
//往channel中放入数据
go sendD(ch1)
//再放入数据
ch1 <- 100
}
func sendD(ch chan int) {
for i := 0; i < 2; i++ {
//放入数据
ch <- i
}
}
运行结果—发生了死锁
如果我们使用 缓冲channel
func main() {
//channel
ch1 := make(chan int, 16)
//往channel中放入数据
go sendD(ch1)
//再放入数据
ch1 <- 100
for i := 0; i < 3; i++ {
data, ok := <-ch1
if !ok {
break
}
fmt.Println(ok, "---", data)
}
}
func sendD(ch chan int) {
for i := 0; i < 2; i++ {
//放入数据
ch <- i
}
}
运行结果:
根据缓冲channel,我们就可以模拟生产者消费者问题了:
// 生产者生产蛋糕放入channel
func produce(c chan int) {
for i := 0; i < 10; i++ {
//放入蛋糕
c <- i
fmt.Println(i, "号蛋糕已放入")
//放入需要时间的
time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond)
}
//关闭通道
defer close(c)
}
// 消费者消费蛋糕
// 消费的对象 单号编码 ,消费成功/失败的channel
func consumer(num int, ch chan int, b chan bool) {
for i2 := range ch {
fmt.Println("协程代号", num)
fmt.Println("消费的蛋糕号码--->>", i2)
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
}
//记录消费结果
b <- true
defer close(b)
}
func main() {
con := make(chan int)
bool1 := make(chan bool)
bool2 := make(chan bool)
bool3 := make(chan bool)
//生产者
go produce(con)
//3个消费 同时将消费结果通过通道传输
go consumer(1, con, bool1)
go consumer(2, con, bool2)
go consumer(3, con, bool3)
//取出消费情况是否成功
for i := range bool1 {
fmt.Println(i)
}
for i := range bool2 {
fmt.Println(i)
}
for i := range bool3 {
fmt.Println(i)
}
defer fmt.Println("main exit")
}
运行结果:
单向channel
我们声明的channel 如果没有特别的指定,默认是可读可写的;
定向channel也叫单向channel,即要么只读,要么只可以写;
声明只读channel
直接声明单向channel是没有意义的,通常我们需要声明一个双向channel,然后通过单向channel的方式进行函数传递;
//往通道中放数据
func normalChannel(c chan string) {
c <- "Go语言"
//c <- "GoWeb"
//c <- "GoLang"
data1 := <-c
data2 := <-c
fmt.Println("normal--->>", data1, data2)
}
//只读的channel
func ReadChan(c <-chan string) {
data := <-c
fmt.Println("只读的数据-->>", data)
//此时我们如果往channel中放数据
//c<-"话只说了一百遍" //编译检查不会通过-->>Invalid operation: c<-"话只说了一百遍" (send to the receive-only type <-chan string)
}
func WriteChan(c chan<- string) {
c <- "只能写入呀!!"
//此时我们要读的话
//data:=<-c //编译检查不通过 Invalid operation: <-c (receive from the send-only type chan<- string)
fmt.Printf("%T\n", c)
}
func main() {
//read := make(<-chan int)//只读channel
//write := make(chan<- int)//只写channel
c := make(chan string)
go normalChannel(c) //往channel中放数据
data := <-c //取数据 --此时去走的是剩下的那个数据
fmt.Println("取出的数据", data)
c <- "放入数据1"
c <- "放入数据2"
go WriteChan(c)
go ReadChan(c)
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond) //纯粹是为了复习
fmt.Println("main exit")
}
上述代码的运行逻辑:
1,创建了一个通道,
2,通过 normalChannel协程往通道中放入数据
3,main的协程取出数据
4,通道中再放入数据
5.通道写入数据
6.读取通道数据
7.main exit
小结—>>
只读通道声明方式:
read := make(<-chan int)//只读channel
只写通道声明方式
write := make(chan<- int)//只写channel
time包中跟channel相关的API
我们来看一下Timer结构体:
// The Timer type represents a single event.
// When the Timer expires, the current time will be sent on C,
// unless the Timer was created by AfterFunc.
// A Timer must be created with NewTimer or AfterFunc.
//即计时器到期后后当前时间将会发送到channel
type Timer struct {
C <-chan Time //只读channel
r runtimeTimer
}
计时器必须使用NewTimer()或After()创建。
func timeChan() {
now := time.Now()
fmt.Println(now)
timer := time.NewTimer(time.Second)
fmt.Printf("%T\n", timer)
data := <-timer.C //从Timer的通道中取出数据
fmt.Println(data)
}
func main() {
timeChan()
}
newTimer()源码:
// NewTimer creates a new Timer that will send
// the current time on its channel after at least duration d.
func NewTimer(d Duration) *Timer {
c := make(chan Time, 1)
t := &Timer{
C: c,
r: runtimeTimer{
when: when(d),
f: sendTime,
arg: c,
},
}
startTimer(&t.r)
return t
}
Afetr()源码
// After waits for the duration to elapse and then sends the current time
// on the returned channel.
// It is equivalent to NewTimer(d).C.
// The underlying Timer is not recovered by the garbage collector
// until the timer fires. If efficiency is a concern, use NewTimer
// instead and call Timer.Stop if the timer is no longer needed.
func After(d Duration) <-chan Time {
return NewTimer(d).C
}
可以看到After返回一个只读的channel
func timeAfter() {
now := time.Now()
fmt.Println("前输出", now)
after := time.After(time.Second * 2) //返回一个只读通道
//取出数据
fmt.Println("后输出", <-after)
}
func main() {
//timeChan()
timeAfter()
}
selelct 语句
select 语句会随机挑选一个可通信的case执行,如果所有case没有到达的数据,那么会执行default,若没有default,那么就会阻塞,直到case接收到数据;
func selectChannel() {
c := make(chan int)
ch := make(chan int)
go func() {
time.Sleep(time.Duration(rand.Intn(10)) * time.Second)
c <- 100
}()
go func() {
time.Sleep(time.Duration(rand.Intn(20)) * time.Second)
ch <- 100
}()
select {
case data := <-c:
fmt.Println("c中读到了数据", data)
case data2 := <-ch:
fmt.Println("c中读到了数据", data2)
default:
defer fmt.Println("有default执行default")
}
defer close(c)
defer close(ch)
}
上述代码大部分时间是执行default的;原因在前面提到过,不再赘述;
我们修改一下上面的代码:
func selectChannel() {
c := make(chan int)
ch := make(chan int)
go func() {
time.Sleep(time.Duration(rand.Intn(10)) * time.Millisecond)
c <- 100
}()
go func() {
time.Sleep(time.Duration(rand.Intn(5)) * time.Millisecond)
ch <- 100
}()
select {
case data := <-c:
fmt.Println("c中读到了数据", data)
case data2 := <-ch:
fmt.Println("ch中读到了数据", data2)
case <-time.After(2 * time.Millisecond):
fmt.Println("阻塞发生了")
//default:
// time.Sleep(time.Duration(rand.Intn(20)) * time.Second)
// defer fmt.Println("有default执行default")
}
defer close(c)
defer close(ch)
fmt.Println("main exit")
}
当我们注释掉default,那么结果可能如下: