前言
并发编程是指在计算机程序中同时执行多个计算任务的技术。这种编程方式旨在利用多核处理器的计算能力,提高程序的执行效率和响应速度。而Go 语言的并发编程主要依赖于两个核心概念:Goroutine 和 Channel。
Goroutine
Goroutine 是 Go 语言中的一种轻量级线程管理机制。它是 Go 并发编程的核心特性之一,允许开发者以非常简单和高效的方式实现并发操作。
1、特点
-
轻量级:与传统的操作系统线程相比,goroutine 非常轻量。一个典型的 goroutine 只占用几 KB 的内存,而不是 MB 级别。这使得在同一时间可以运行成千上万个 goroutine,而不会对系统资源造成太大压力。
-
调度管理:Go 运行时包含了自己的调度器,用于管理 goroutines 的执行。这个调度器会将多个 goroutines 映射到少数几个 OS 线程上,从而有效地利用多核处理器。
-
简单启动:启动一个新的 goroutine 非常简单,只需要在函数调用前加上
go
关键字即可。 -
自动栈增长:goroutines 使用的是可增长的栈,这意味着它们开始时使用很小的内存,并根据需要动态扩展。这种特性进一步提高了其效率和灵活性。
-
与 Channel 配合使用:虽然 goroutines 可以独立工作,但通常会与 Channels 一起使用,以便在不同的 goroutines 间进行通信和同步。
2、示例
package main
import (
"fmt"
"testing"
)
func Test1(t *testing.T) {
for i := 0; i < 10; i++ {
go fmt.Printf("go线程 %d \n", i)
}
fmt.Println("主流程")
}
输出:
=== RUN Test1
主流程
go线程 2
go线程 3
go线程 5
--- PASS: Test1 (0.00s)
go线程 0
go线程 7
go线程 6
go线程 8
PASS
go线程 4
多执行几次发现会少了一些打印,如 1和9线程
改进:
func Test2(t *testing.T) {
for i := 0; i < 10; i++ {
go fmt.Printf("go线程 %d \n", i)
}
time.Sleep(1 * time.Second)
fmt.Println("主流程")
}
输出:
=== RUN Test2
go线程 9
go线程 0
go线程 1
go线程 2
go线程 3
go线程 4
go线程 7
go线程 8
go线程 5
go线程 6
主流程
--- PASS: Test2 (1.00s)
PASS
3、waitGroup
考虑如何优雅的等待子线程执行完成,而不是简单的让主线程等待,常用的方法是使用 sync.WaitGroup
。WaitGroup
提供了一种简单而有效的方式来等待一组 goroutines 完成它们的工作,而不需要显式地让主线程休眠。它通过一个计数器来跟踪 goroutines 的数量。你可以增加或减少这个计数器,并且可以阻塞直到计数器变为零,这表示所有被跟踪的 goroutines 都已完成。
基本步骤
- 创建 WaitGroup 实例:在你的程序中创建一个
WaitGroup
实例。 - 增加计数:每启动一个新的 goroutine,就调用
Add(1)
来增加 WaitGroup 的计数。 - 标记完成:在每个 goroutine 内部,任务完成时调用
Done()
来减少 WaitGroup 的计数。 - 等待所有任务完成:在主线程中调用
Wait()
方法,它会阻塞直到所有被追踪的任务都标记为完成。
// 模拟工作的函数
func exec(num int, wg *sync.WaitGroup) {
defer wg.Done() // 确保工作结束后通知 WaitGroup
go fmt.Printf("go线程 %d \n", num)
}
func Test3(t *testing.T) {
group := sync.WaitGroup{}
// 启动多个goroutine并使用WaitGroup追踪它们
for i := 0; i < 10; i++ {
group.Add(1) // 增加WaitGroup计数
go exec(i, &group)
}
group.Wait() // // 阻塞主线程,直到所有线程都执行完毕
fmt.Println("主流程")
}
注意事项
- 确保 Done 调用:始终确保每个启动的 goroutine 在其逻辑结束时调用了
Done()
方法。这通常通过使用关键字defer
来实现,以确保即使发生错误也能正确递减。 - 避免重复 Add 和 Done 操作:不要对同一组操作多次进行 Add 或 Done 调用,否则可能导致不一致状态和死锁。
Channel
在 Go 语言中,Channel 是一种用于 goroutine 之间进行通信的机制。它可以让一个 goroutine 发送特定类型的值到 Channel 中,然后另一个 goroutine 从该 Channel 接收值,从而实现数据的安全传递和同步。
1、基本特性
- 类型化:Channel 是类型化的,这意味着你需要指定要传输的数据类型。
- 阻塞行为:
- 发送阻塞:当一个 goroutine 向 Channel 发送数据时,如果没有其他 goroutine 正在等待接收这个数据,那么发送操作会被阻塞,直到有接收者。
- 接收阻塞:同样地,当一个 goroutine 尝试从 Channel 接收数据时,如果没有其他 goroutine 正在向这个 Channel 发送数据,那么接收操作会被阻塞,直到有新的数据可用。
- 方向性:Channel 可以是双向的,也可以是单向(只读或只写)的。
2、创建和使用 Channel
func Test4(t *testing.T) {
ch := make(chan int)
for i := 0; i < 10; i++ {
go send(i, ch)
}
for i := 0; i < 10; i++ {
value := <-ch
fmt.Printf("接受到消息:%d\n", value)
time.Sleep(time.Second)
}
}
func send(i int, ch chan int) {
fmt.Printf("-- 准备发送:%d\n", i)
ch <- i
fmt.Printf("-- 发送成功!%d\n", i) // 这行打印会被阻塞
}
输出:
=== RUN Test4
-- 准备发送:9
-- 发送成功!9
接受到消息:9
-- 准备发送:2
-- 准备发送:6
-- 准备发送:1
-- 准备发送:8
-- 准备发送:3
-- 准备发送:4
-- 准备发送:5
-- 准备发送:0
-- 准备发送:7
接受到消息:2
-- 发送成功!2
接受到消息:6
-- 发送成功!6
接受到消息:1
-- 发送成功!1
接受到消息:8
-- 发送成功!8
接受到消息:3
-- 发送成功!3
接受到消息:4
-- 发送成功!4
接受到消息:5
-- 发送成功!5
接受到消息:0
-- 发送成功!0
接受到消息:7
-- 发送成功!7
--- PASS: Test4 (10.01s)
PASS
3、带缓冲区的 Channel
带缓冲区的 Channels 可以存储一定数量的数据,而不会立即导致 sender 或 receiver 被阻塞。创建带缓冲区 Channels 时,可以指定其容量。
func Test5(t *testing.T) {
ch := make(chan int, 5)
for i := 0; i < 10; i++ {
go send(i, ch)
}
for i := 0; i < 10; i++ {
value := <-ch
fmt.Printf("接受到消息:%d\n", value)
time.Sleep(time.Second)
}
}
输出:
=== RUN Test5
-- 准备发送:9
-- 发送成功!9
接受到消息:9
-- 准备发送:0
-- 发送成功!0
-- 准备发送:1
-- 发送成功!1
-- 准备发送:2
-- 发送成功!2
-- 准备发送:3
-- 发送成功!3
-- 准备发送:7
-- 发送成功!7
-- 准备发送:8
-- 准备发送:6
-- 准备发送:4
-- 准备发送:5
接受到消息:0
-- 发送成功!8
接受到消息:1
-- 发送成功!6
接受到消息:2
-- 发送成功!4
接受到消息:3
-- 发送成功!5
接受到消息:7
接受到消息:8
接受到消息:6
接受到消息:4
接受到消息:5
--- PASS: Test5 (10.01s)
PASS
长度(Length)
- 定义:Channel 的长度是指当前 Channel 中已存储的数据元素的数量。
- 获取方式:可以使用内置函数
len(ch)
来获取 Channel 的当前长度。
容量(Capacity)
- 定义:Channel 的容量是指它最多可以容纳多少个数据元素。对于无缓冲的 channel,容量为零;对于带缓冲的 channel,容量是在创建时指定的。
- 获取方式:可以使用内置函数
cap(ch)
来获取 Channel 的总容量。
func TestChannelCapAndLen(t *testing.T) {
ch := make(chan string, 3)
ch <- "a"
ch <- "b"
fmt.Println("容量为:", cap(ch))
fmt.Println("长度为:", len(ch))
fmt.Println("读取一个元素:", <-ch)
fmt.Println("新的长度为:", len(ch))
}
输出:
=== RUN TestChannelCapAndLen
容量为: 3
长度为: 2
读取一个元素: a
新的长度为: 1
--- PASS: Test9 (0.00s)
PASS
4、单向 Channel
单向 Channel 用于限制函数对 channel 的访问权限,只能用于 send 或 receive 操作。这种限制可以帮助避免错误地使用 channel。
func sendOnly(i int, ch chan<- int) {
fmt.Printf("-- 准备发送:%d\n", i)
ch <- i
fmt.Printf("-- 发送成功!%d\n", i)
}
func receiveOnly(ch <-chan int) {
value := <-ch
fmt.Printf("接受到消息:%d\n", value)
time.Sleep(time.Second)
}
func Test6(t *testing.T) {
ch := make(chan int)
for i := 0; i < 10; i++ {
go sendOnly(i, ch)
}
for i := 0; i < 10; i++ {
receiveOnly(ch)
}
}
5、关闭Channel
如果channel没有关闭,消费者仍然在等待数据,则可能导致死锁
func Test7(t *testing.T) {
ch := make(chan int)
for i := 0; i < 10; i++ {
go sendOnly(i, ch)
}
// 消费者仍然在等待数据,则可能导致死锁
for value := range ch {
fmt.Printf("接受到消息:%d\n", value)
time.Sleep(time.Second)
}
}
报错:
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan receive]:
testing.(*T).Run(0xc00010c4e0, {0x3d35a17?, 0x125836780012db50?}, 0x3da35f0)
/Users/fangyirui/sdk/go1.23.2/src/testing/testing.go:1751 +0x3ab
testing.runTests.func1(0xc00010c4e0)
/Users/fangyirui/sdk/go1.23.2/src/testing/testing.go:2168 +0x37
testing.tRunner(0xc00010c4e0, 0xc00012dc70)
/Users/fangyirui/sdk/go1.23.2/src/testing/testing.go:1690 +0xf4
testing.runTests(0xc000010030, {0x3e7cbe0, 0x9, 0x9}, {0x3c86030?, 0x3c85c9a?, 0x0?})
/Users/fangyirui/sdk/go1.23.2/src/testing/testing.go:2166 +0x43d
testing.(*M).Run(0xc00007a0a0)
/Users/fangyirui/sdk/go1.23.2/src/testing/testing.go:2034 +0x64a
main.main()
_testmain.go:61 +0x9b
goroutine 5 [chan receive]:
awesomeProject/_28goroutine.Test7(0xc00010c680?)
/Users/fangyirui/GolandProjects/awesomeProject/_28goroutine/1_test.go:114 +0x105
testing.tRunner(0xc00010c680, 0x3da35f0)
/Users/fangyirui/sdk/go1.23.2/src/testing/testing.go:1690 +0xf4
created by testing.(*T).Run in goroutine 1
/Users/fangyirui/sdk/go1.23.2/src/testing/testing.go:1743 +0x390
关闭 Channel 是一个重要的操作,它通知接收者不会再有新的数据发送到这个 Channel 上。
- 通知完成:通过关闭一个 Channel,可以向接收者表明没有更多的数据会被发送。这对于需要知道何时停止读取数据的消费者来说非常有用。
- 避免死锁:如果所有发送者都退出了,而消费者仍然在等待数据,则可能导致死锁。通过显式地关闭 Channel,可以避免这种情况。
改进:使用内置函数 close
来关闭一个 channel
func sendOnlyV2(i int, ch chan<- int, wg *sync.WaitGroup) {
fmt.Printf("-- 准备发送:%d\n", i)
ch <- i
fmt.Printf("-- 发送成功!%d\n", i)
wg.Done()
}
func Test8(t *testing.T) {
wg := sync.WaitGroup{}
ch := make(chan int)
for i := 0; i < 10; i++ {
wg.Add(1)
go sendOnlyV2(i, ch, &wg)
}
// 启动另一个goroutine来等待所有send操作完成后再关闭channel,不然channel关闭后还没发送完数据会报错
go func() {
wg.Wait()
// 发送完毕,关闭channel
close(ch)
fmt.Println("channel 已经关闭")
}()
for value := range ch {
fmt.Printf("接受到消息:%d\n", value)
time.Sleep(time.Second)
}
}