Day 03 - goroutine基础与原理
1. goroutine创建和调度
1.1 goroutine基本特性
特性 说明 轻量级 初始栈大小仅2KB,可动态增长 调度方式 协作式调度,由Go运行时管理 创建成本 创建成本很低,可同时运行数十万个 通信方式 通过channel进行通信,而不是共享内存
1.2 创建goroutine的示例代码
package main
import (
"fmt"
"runtime"
"sync"
"time"
)
func monitorGoroutines ( duration time. Duration, done chan struct { } ) {
ticker := time. NewTicker ( duration)
defer ticker. Stop ( )
for {
select {
case <- ticker. C:
fmt. Printf ( "当前goroutine数量: %d\n" , runtime. NumGoroutine ( ) )
case <- done:
return
}
}
}
type Worker struct {
ID int
wg * sync. WaitGroup
}
func NewWorker ( id int , wg * sync. WaitGroup) * Worker {
return & Worker{
ID: id,
wg: wg,
}
}
func ( w * Worker) Work ( jobs <- chan int , results chan <- int ) {
defer w. wg. Done ( )
for job := range jobs {
fmt. Printf ( "Worker %d 开始处理任务 %d\n" , w. ID, job)
time. Sleep ( 100 * time. Millisecond)
results <- job * 2
}
}
func main ( ) {
numWorkers := 5
numJobs := 10
jobs := make ( chan int , numJobs)
results := make ( chan int , numJobs)
var wg sync. WaitGroup
done := make ( chan struct { } )
go monitorGoroutines ( time. Second, done)
fmt. Printf ( "创建 %d 个worker\n" , numWorkers)
for i := 1 ; i <= numWorkers; i++ {
wg. Add ( 1 )
worker := NewWorker ( i, & wg)
go worker. Work ( jobs, results)
}
fmt. Printf ( "发送 %d 个任务\n" , numJobs)
for j := 1 ; j <= numJobs; j++ {
jobs <- j
}
close ( jobs)
go func ( ) {
wg. Wait ( )
close ( results)
} ( )
for result := range results {
fmt. Printf ( "收到结果: %d\n" , result)
}
done <- struct { } { }
fmt. Printf ( "最终goroutine数量: %d\n" , runtime. NumGoroutine ( ) )
}
2. GMP模型详解
2.1 GMP组件说明
组件 说明 职责 G (Goroutine) goroutine的抽象 包含goroutine的栈、程序计数器等信息 M (Machine) 工作线程 执行G的实体,对应系统线程 P (Processor) 处理器 维护G的运行队列,提供上下文环境
2.2 GMP调度流程图
2.3 GMP相关的运行时参数
runtime. GOMAXPROCS ( n)
runtime. NumCPU ( )
runtime. NumGoroutine ( )
3. 并发模型原理
3.1 Go并发模型特点
特点 说明 CSP模型 通过通信来共享内存,而不是共享内存来通信 非阻塞调度 goroutine让出CPU时不会阻塞其他goroutine 工作窃取 空闲P可以从其他P窃取任务 抢占式调度 支持基于信号的抢占式调度
3.2 并发模型示例
package main
import (
"context"
"fmt"
"runtime"
"sync"
"time"
)
type Pipeline struct {
input chan int
output chan int
done chan struct { }
}
func NewPipeline ( ) * Pipeline {
return & Pipeline{
input: make ( chan int ) ,
output: make ( chan int ) ,
done: make ( chan struct { } ) ,
}
}
func ( p * Pipeline) Process ( ctx context. Context) {
go func ( ) {
defer close ( p. output)
for {
select {
case num, ok := <- p. input:
if ! ok {
return
}
result := num * 2
select {
case p. output <- result:
case <- ctx. Done ( ) :
return
}
case <- ctx. Done ( ) :
return
}
}
} ( )
}
type WorkerPool struct {
workers int
tasks chan func ( )
wg sync. WaitGroup
}
func NewWorkerPool ( workers int ) * WorkerPool {
pool := & WorkerPool{
workers: workers,
tasks: make ( chan func ( ) , workers* 2 ) ,
}
pool. Start ( )
return pool
}
func ( p * WorkerPool) Start ( ) {
for i := 0 ; i < p. workers; i++ {
p. wg. Add ( 1 )
go func ( workerID int ) {
defer p. wg. Done ( )
for task := range p. tasks {
fmt. Printf ( "Worker %d executing task\n" , workerID)
task ( )
}
} ( i + 1 )
}
}
func ( p * WorkerPool) Submit ( task func ( ) ) {
p. tasks <- task
}
func ( p * WorkerPool) Stop ( ) {
close ( p. tasks)
p. wg. Wait ( )
}
func main ( ) {
runtime. GOMAXPROCS ( runtime. NumCPU ( ) )
ctx, cancel := context. WithTimeout ( context. Background ( ) , 5 * time. Second)
defer cancel ( )
pipeline := NewPipeline ( )
pipeline. Process ( ctx)
pool := NewWorkerPool ( 3 )
go func ( ) {
defer close ( pipeline. input)
for i := 1 ; i <= 10 ; i++ {
select {
case pipeline. input <- i:
fmt. Printf ( "Sent %d to pipeline\n" , i)
case <- ctx. Done ( ) :
return
}
}
} ( )
go func ( ) {
for result := range pipeline. output {
result := result
pool. Submit ( func ( ) {
time. Sleep ( 100 * time. Millisecond)
fmt. Printf ( "Processed result: %d\n" , result)
} )
}
pool. Stop ( )
} ( )
<- ctx. Done ( )
fmt. Println ( "Main context done" )
}
4. goroutine生命周期
4.1 生命周期状态
状态 说明 创建 goroutine被创建,分配栈空间 可运行 等待被调度执行 运行中 正在被M执行 系统调用中 阻塞在系统调用上 等待中 因channel或同步原语阻塞 死亡 执行完成,等待回收
4.2 生命周期示例
package main
import (
"context"
"fmt"
"runtime"
"runtime/debug"
"sync"
"time"
)
type GoroutineMonitor struct {
startTime time. Time
endTime time. Time
status string
sync. Mutex
}
func NewGoroutineMonitor ( ) * GoroutineMonitor {
return & GoroutineMonitor{
startTime: time. Now ( ) ,
status: "created" ,
}
}
func ( g * GoroutineMonitor) UpdateStatus ( status string ) {
g. Lock ( )
defer g. Unlock ( )
g. status = status
fmt. Printf ( "Goroutine状态更新: %s, 时间: %v\n" , status, time. Since ( g. startTime) )
}
func ( g * GoroutineMonitor) Complete ( ) {
g. Lock ( )
defer g. Unlock ( )
g. endTime = time. Now ( )
g. status = "completed"
fmt. Printf ( "Goroutine完成, 总运行时间: %v\n" , g. endTime. Sub ( g. startTime) )
}
type Task struct {
ID int
Duration time. Duration
Monitor * GoroutineMonitor
}
func ( t * Task) Execute ( ctx context. Context, wg * sync. WaitGroup) {
defer wg. Done ( )
defer t. Monitor. Complete ( )
defer func ( ) {
if r := recover ( ) ; r != nil {
fmt. Printf ( "Task %d panic: %v\nStack: %s\n" , t. ID, r, debug. Stack ( ) )
t. Monitor. UpdateStatus ( "panic" )
}
} ( )
t. Monitor. UpdateStatus ( "running" )
select {
case <- time. After ( t. Duration) :
t. Monitor. UpdateStatus ( "normal completion" )
case <- ctx. Done ( ) :
t. Monitor. UpdateStatus ( "cancelled" )
return
}
if t. ID% 4 == 0 {
t. Monitor. UpdateStatus ( "blocked" )
time. Sleep ( 100 * time. Millisecond)
} else if t. ID% 3 == 0 {
panic ( "模拟任务panic" )
}
}
type TaskScheduler struct {
tasks chan Task
workers int
monitors map [ int ] * GoroutineMonitor
mu sync. RWMutex
}
func NewTaskScheduler ( workers int ) * TaskScheduler {
return & TaskScheduler{
tasks: make ( chan Task, workers* 2 ) ,
workers: workers,
monitors: make ( map [ int ] * GoroutineMonitor) ,
}
}
func ( s * TaskScheduler) AddTask ( task Task) {
s. mu. Lock ( )
s. monitors[ task. ID] = task. Monitor
s. mu. Unlock ( )
s. tasks <- task
}
func ( s * TaskScheduler) Start ( ctx context. Context) {
var wg sync. WaitGroup
for i := 0 ; i < s. workers; i++ {
wg. Add ( 1 )
go func ( workerID int ) {
defer wg. Done ( )
for task := range s. tasks {
task. Execute ( ctx, & wg)
}
} ( i)
}
go func ( ) {
wg. Wait ( )
close ( s. tasks)
} ( )
}
func main ( ) {
runtime. GOMAXPROCS ( 4 )
ctx, cancel := context. WithTimeout ( context. Background ( ) , 5 * time. Second)
defer cancel ( )
scheduler := NewTaskScheduler ( 3 )
scheduler. Start ( ctx)
for i := 1 ; i <= 10 ; i++ {
task := Task{
ID: i,
Duration: time. Duration ( i* 200 ) * time. Millisecond,
Monitor: NewGoroutineMonitor ( ) ,
}
scheduler. AddTask ( task)
}
<- ctx. Done ( )
fmt. Println ( "\n最终状态:" )
scheduler. mu. RLock ( )
for id, monitor := range scheduler. monitors {
monitor. Lock ( )
fmt. Printf ( "Task %d - 状态: %s\n" , id, monitor. status)
monitor. Unlock ( )
}
scheduler. mu. RUnlock ( )
}
4.3 Goroutine生命周期状态转换图
5. 实践注意事项
5.1 goroutine泄露的常见场景
channel阻塞且无法释放
func leakyGoroutine ( ) {
ch := make ( chan int )
go func ( ) {
val := <- ch
} ( )
}
无限循环
func infiniteLoop ( ) {
go func ( ) {
for {
}
} ( )
}
5.2 最佳实践表格
最佳实践 说明 合理控制goroutine数量 避免无限制创建goroutine 使用context控制生命周期 优雅管理goroutine的退出 处理panic 避免goroutine意外退出影响整个程序 及时清理资源 使用defer确保资源释放 合理设置GOMAXPROCS 根据CPU核心数调整P的数量
5.3 性能优化建议
goroutine池化
type Pool struct {
work chan func ( )
sem chan struct { }
}
func NewPool ( size int ) * Pool {
return & Pool{
work: make ( chan func ( ) ) ,
sem: make ( chan struct { } , size) ,
}
}
func ( p * Pool) Submit ( task func ( ) ) {
select {
case p. work <- task:
case p. sem <- struct { } { } :
go p. worker ( task)
}
}
func ( p * Pool) worker ( task func ( ) ) {
defer func ( ) { <- p. sem } ( )
for {
task ( )
task = <- p. work
}
}
避免锁竞争
type Counter struct {
count int32
}
func ( c * Counter) Increment ( ) {
atomic. AddInt32 ( & c. count, 1 )
}
func ( c * Counter) Get ( ) int32 {
return atomic. LoadInt32 ( & c. count)
}
6. 调试和监控
6.1 调试工具
GODEBUG参数
GODEBUG = schedtrace= 1000 ./program
GODEBUG = gctrace= 1 ./program
pprof工具
import _ "net/http/pprof"
go func ( ) {
log. Println ( http. ListenAndServe ( "localhost:6060" , nil ) )
} ( )
6.2 监控指标
goroutine数量 P的使用率 系统调用次数 调度延迟 GC影响
通过深入理解goroutine的原理和生命周期,我们可以:
更好地控制并发程序的行为 避免常见的并发陷阱 优化程序性能 排查并发相关问题
怎么样今天的内容还满意吗?再次感谢观众老爷的观看,关注GZH:凡人的AI工具箱,回复666,送您价值199的AI大礼包。最后,祝您早日实现财务自由,还请给个赞,谢谢!