40分钟学 Go 语言高并发:Goroutine基础与原理

Day 03 - goroutine基础与原理

1. goroutine创建和调度

1.1 goroutine基本特性

特性说明
轻量级初始栈大小仅2KB,可动态增长
调度方式协作式调度,由Go运行时管理
创建成本创建成本很低,可同时运行数十万个
通信方式通过channel进行通信,而不是共享内存

1.2 创建goroutine的示例代码

package main

import (
    "fmt"
    "runtime"
    "sync"
    "time"
)

// 监控goroutine数量
func monitorGoroutines(duration time.Duration, done chan struct{}) {
    ticker := time.NewTicker(duration)
    defer ticker.Stop()
    
    for {
        select {
        case <-ticker.C:
            fmt.Printf("当前goroutine数量: %d\n", runtime.NumGoroutine())
        case <-done:
            return
        }
    }
}

// 模拟工作负载
type Worker struct {
    ID int
    wg *sync.WaitGroup
}

func NewWorker(id int, wg *sync.WaitGroup) *Worker {
    return &Worker{
        ID: id,
        wg: wg,
    }
}

func (w *Worker) Work(jobs <-chan int, results chan<- int) {
    defer w.wg.Done()
    
    for job := range jobs {
        fmt.Printf("Worker %d 开始处理任务 %d\n", w.ID, job)
        // 模拟工作负载
        time.Sleep(100 * time.Millisecond)
        results <- job * 2
    }
}

func main() {
    numWorkers := 5
    numJobs := 10
    
    // 创建通道
    jobs := make(chan int, numJobs)
    results := make(chan int, numJobs)
    
    // 创建WaitGroup来等待所有worker完成
    var wg sync.WaitGroup
    
    // 监控goroutine数量
    done := make(chan struct{})
    go monitorGoroutines(time.Second, done)
    
    // 创建worker池
    fmt.Printf("创建 %d 个worker\n", numWorkers)
    for i := 1; i <= numWorkers; i++ {
        wg.Add(1)
        worker := NewWorker(i, &wg)
        go worker.Work(jobs, results)
    }
    
    // 发送任务
    fmt.Printf("发送 %d 个任务\n", numJobs)
    for j := 1; j <= numJobs; j++ {
        jobs <- j
    }
    close(jobs)
    
    // 等待所有worker完成
    go func() {
        wg.Wait()
        close(results)
    }()
    
    // 收集结果
    for result := range results {
        fmt.Printf("收到结果: %d\n", result)
    }
    
    // 停止监控
    done <- struct{}{}
    
    // 最终统计
    fmt.Printf("最终goroutine数量: %d\n", runtime.NumGoroutine())
}

2. GMP模型详解

2.1 GMP组件说明

组件说明职责
G (Goroutine)goroutine的抽象包含goroutine的栈、程序计数器等信息
M (Machine)工作线程执行G的实体,对应系统线程
P (Processor)处理器维护G的运行队列,提供上下文环境

2.2 GMP调度流程图

在这里插入图片描述

2.3 GMP相关的运行时参数

runtime.GOMAXPROCS(n) // 设置最大P的数量
runtime.NumCPU()      // 获取CPU核心数
runtime.NumGoroutine() // 获取当前goroutine数量

3. 并发模型原理

3.1 Go并发模型特点

特点说明
CSP模型通过通信来共享内存,而不是共享内存来通信
非阻塞调度goroutine让出CPU时不会阻塞其他goroutine
工作窃取空闲P可以从其他P窃取任务
抢占式调度支持基于信号的抢占式调度

3.2 并发模型示例

package main

import (
    "context"
    "fmt"
    "runtime"
    "sync"
    "time"
)

// Pipeline 表示一个数据处理管道
type Pipeline struct {
    input  chan int
    output chan int
    done   chan struct{}
}

// NewPipeline 创建新的处理管道
func NewPipeline() *Pipeline {
    return &Pipeline{
        input:  make(chan int),
        output: make(chan int),
        done:   make(chan struct{}),
    }
}

// Process 处理数据
func (p *Pipeline) Process(ctx context.Context) {
    go func() {
        defer close(p.output)
        for {
            select {
            case num, ok := <-p.input:
                if !ok {
                    return
                }
                // 模拟处理
                result := num * 2
                select {
                case p.output <- result:
                case <-ctx.Done():
                    return
                }
            case <-ctx.Done():
                return
            }
        }
    }()
}

// WorkerPool 表示工作池
type WorkerPool struct {
    workers int
    tasks   chan func()
    wg      sync.WaitGroup
}

// NewWorkerPool 创建新的工作池
func NewWorkerPool(workers int) *WorkerPool {
    pool := &WorkerPool{
        workers: workers,
        tasks:   make(chan func(), workers*2),
    }
    pool.Start()
    return pool
}

// Start 启动工作池
func (p *WorkerPool) Start() {
    for i := 0; i < p.workers; i++ {
        p.wg.Add(1)
        go func(workerID int) {
            defer p.wg.Done()
            for task := range p.tasks {
                fmt.Printf("Worker %d executing task\n", workerID)
                task()
            }
        }(i + 1)
    }
}

// Submit 提交任务
func (p *WorkerPool) Submit(task func()) {
    p.tasks <- task
}

// Stop 停止工作池
func (p *WorkerPool) Stop() {
    close(p.tasks)
    p.wg.Wait()
}

func main() {
    // 设置使用的CPU核心数
    runtime.GOMAXPROCS(runtime.NumCPU())
    
    // 创建上下文
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()
    
    // 创建处理管道
    pipeline := NewPipeline()
    pipeline.Process(ctx)
    
    // 创建工作池
    pool := NewWorkerPool(3)
    
    // 启动生产者
    go func() {
        defer close(pipeline.input)
        for i := 1; i <= 10; i++ {
            select {
            case pipeline.input <- i:
                fmt.Printf("Sent %d to pipeline\n", i)
            case <-ctx.Done():
                return
            }
        }
    }()
    
    // 使用工作池处理pipeline输出
    go func() {
        for result := range pipeline.output {
            result := result // 捕获变量
            pool.Submit(func() {
                // 模拟处理时间
                time.Sleep(100 * time.Millisecond)
                fmt.Printf("Processed result: %d\n", result)
            })
        }
        // 处理完成后停止工作池
        pool.Stop()
    }()
    
    // 等待上下文结束
    <-ctx.Done()
    fmt.Println("Main context done")
}

4. goroutine生命周期

4.1 生命周期状态

状态说明
创建goroutine被创建,分配栈空间
可运行等待被调度执行
运行中正在被M执行
系统调用中阻塞在系统调用上
等待中因channel或同步原语阻塞
死亡执行完成,等待回收

4.2 生命周期示例

package main

import (
    "context"
    "fmt"
    "runtime"
    "runtime/debug"
    "sync"
    "time"
)

// GoroutineMonitor 用于监控goroutine的状态
type GoroutineMonitor struct {
    startTime time.Time
    endTime   time.Time
    status    string
    sync.Mutex
}

// NewGoroutineMonitor 创建新的goroutine监控器
func NewGoroutineMonitor() *GoroutineMonitor {
    return &GoroutineMonitor{
        startTime: time.Now(),
        status:    "created",
    }
}

// UpdateStatus 更新goroutine状态
func (g *GoroutineMonitor) UpdateStatus(status string) {
    g.Lock()
    defer g.Unlock()
    g.status = status
    fmt.Printf("Goroutine状态更新: %s, 时间: %v\n", status, time.Since(g.startTime))
}

// Complete 标记goroutine完成
func (g *GoroutineMonitor) Complete() {
    g.Lock()
    defer g.Unlock()
    g.endTime = time.Now()
    g.status = "completed"
    fmt.Printf("Goroutine完成, 总运行时间: %v\n", g.endTime.Sub(g.startTime))
}

// Task 代表一个任务
type Task struct {
    ID       int
    Duration time.Duration
    Monitor  *GoroutineMonitor
}

// Execute 执行任务
func (t *Task) Execute(ctx context.Context, wg *sync.WaitGroup) {
    defer wg.Done()
    defer t.Monitor.Complete()
    defer func() {
        if r := recover(); r != nil {
            fmt.Printf("Task %d panic: %v\nStack: %s\n", t.ID, r, debug.Stack())
            t.Monitor.UpdateStatus("panic")
        }
    }()

    t.Monitor.UpdateStatus("running")

    // 模拟任务执行
    select {
    case <-time.After(t.Duration):
        t.Monitor.UpdateStatus("normal completion")
    case <-ctx.Done():
        t.Monitor.UpdateStatus("cancelled")
        return
    }

    // 模拟一些可能的状态
    if t.ID%4 == 0 {
        t.Monitor.UpdateStatus("blocked")
        time.Sleep(100 * time.Millisecond)
    } else if t.ID%3 == 0 {
        panic("模拟任务panic")
    }
}

// TaskScheduler 任务调度器
type TaskScheduler struct {
    tasks    chan Task
    workers  int
    monitors map[int]*GoroutineMonitor
    mu       sync.RWMutex
}

// NewTaskScheduler 创建任务调度器
func NewTaskScheduler(workers int) *TaskScheduler {
    return &TaskScheduler{
        tasks:    make(chan Task, workers*2),
        workers:  workers,
        monitors: make(map[int]*GoroutineMonitor),
    }
}

// AddTask 添加任务
func (s *TaskScheduler) AddTask(task Task) {
    s.mu.Lock()
    s.monitors[task.ID] = task.Monitor
    s.mu.Unlock()
    s.tasks <- task
}

// Start 启动调度器
func (s *TaskScheduler) Start(ctx context.Context) {
    var wg sync.WaitGroup
    
    // 启动worker池
    for i := 0; i < s.workers; i++ {
        wg.Add(1)
        go func(workerID int) {
            defer wg.Done()
            for task := range s.tasks {
                task.Execute(ctx, &wg)
            }
        }(i)
    }

    go func() {
        wg.Wait()
        close(s.tasks)
    }()
}

func main() {
    // 设置最大P的数量
    runtime.GOMAXPROCS(4)

    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()

    // 创建调度器
    scheduler := NewTaskScheduler(3)

    // 启动调度器
    scheduler.Start(ctx)

    // 创建多个任务
    for i := 1; i <= 10; i++ {
        task := Task{
            ID:       i,
            Duration: time.Duration(i*200) * time.Millisecond,
            Monitor:  NewGoroutineMonitor(),
        }
        scheduler.AddTask(task)
    }

    // 等待context结束
    <-ctx.Done()

    // 打印最终状态
    fmt.Println("\n最终状态:")
    scheduler.mu.RLock()
    for id, monitor := range scheduler.monitors {
        monitor.Lock()
        fmt.Printf("Task %d - 状态: %s\n", id, monitor.status)
        monitor.Unlock()
    }
    scheduler.mu.RUnlock()
}

4.3 Goroutine生命周期状态转换图

在这里插入图片描述

5. 实践注意事项

5.1 goroutine泄露的常见场景

  1. channel阻塞且无法释放
func leakyGoroutine() {
    ch := make(chan int) // 无缓冲channel
    go func() {
        val := <-ch // 永远阻塞在这里
    }()
    // ch没有被写入,goroutine泄露
}
  1. 无限循环
func infiniteLoop() {
    go func() {
        for {
            // 没有退出条件的循环
            // 应该添加 select 或 检查退出信号
        }
    }()
}

5.2 最佳实践表格

最佳实践说明
合理控制goroutine数量避免无限制创建goroutine
使用context控制生命周期优雅管理goroutine的退出
处理panic避免goroutine意外退出影响整个程序
及时清理资源使用defer确保资源释放
合理设置GOMAXPROCS根据CPU核心数调整P的数量

5.3 性能优化建议

  1. goroutine池化
type Pool struct {
    work chan func()
    sem  chan struct{}
}

func NewPool(size int) *Pool {
    return &Pool{
        work: make(chan func()),
        sem:  make(chan struct{}, size),
    }
}

func (p *Pool) Submit(task func()) {
    select {
    case p.work <- task:
    case p.sem <- struct{}{}:
        go p.worker(task)
    }
}

func (p *Pool) worker(task func()) {
    defer func() { <-p.sem }()
    for {
        task()
        task = <-p.work
    }
}
  1. 避免锁竞争
// 使用atomic替代mutex
type Counter struct {
    count int32
}

func (c *Counter) Increment() {
    atomic.AddInt32(&c.count, 1)
}

func (c *Counter) Get() int32 {
    return atomic.LoadInt32(&c.count)
}

6. 调试和监控

6.1 调试工具

  1. GODEBUG参数
GODEBUG=schedtrace=1000 ./program # 每1000ms输出调度信息
GODEBUG=gctrace=1 ./program      # 输出GC信息
  1. pprof工具
import _ "net/http/pprof"

go func() {
    log.Println(http.ListenAndServe("localhost:6060", nil))
}()

6.2 监控指标

  1. goroutine数量
  2. P的使用率
  3. 系统调用次数
  4. 调度延迟
  5. GC影响

通过深入理解goroutine的原理和生命周期,我们可以:

  1. 更好地控制并发程序的行为
  2. 避免常见的并发陷阱
  3. 优化程序性能
  4. 排查并发相关问题

怎么样今天的内容还满意吗?再次感谢观众老爷的观看,关注GZH:凡人的AI工具箱,回复666,送您价值199的AI大礼包。最后,祝您早日实现财务自由,还请给个赞,谢谢!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/920137.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Python学习------第十天

数据容器-----元组 定义格式&#xff0c;特点&#xff0c;相关操作 元组一旦定义&#xff0c;就无法修改 元组内只有一个数据&#xff0c;后面必须加逗号 """ #元组 (1,"hello",True) #定义元组 t1 (1,"hello") t2 () t3 tuple() prin…

nwjs崩溃复现、 nwjs-控制台手动操纵、nwjs崩溃调用栈解码、剪切板例子中、nwjs混合模式、xdotool显示nwjs所有进程窗口列表

-1. nwjs在低版本ubuntu运行情况 ubuntu16.04运行nw-v0.93或0.89报错找不到NSS_3.30、GLIBC_2.25 uname -a #Linux Asus 4.15.0-112-generic #113~16.04.1-Ubuntu SMP Fri Jul 10 04:37:08 UTC 2020 x86_64 x86_64 x86_64 GNU/Linux cat /etc/issue #Ubuntu 16.04.7 LTS \n \l…

在自动驾驶进行大数据量因果推理实验时,如何减少无用功,提高实验效率?

在对实验结果做反事实推理时&#xff0c;通常需要对数据进行多次循环&#xff0c;然后对多次循环的结果进行处理&#xff0c;如果只在最后结果结束时&#xff0c;再进行处理&#xff0c;可能会由于反事实过程中某个参数设置错误&#xff0c;导致整个反事实实验出现错误&#xf…

DAY1 网络编程(TCP客户端服务器)

作业&#xff1a; TCP客户端服务器。 server服务器代码&#xff1a; #include <myhead.h> #define IP "192.168.110.52" #define PORT 8886 #define BACKLOG 20 int main(int argc, const char *argv[]) {int oldfdsocket(AF_INET,SOCK_STREAM,0);//IPV4通信…

Kafka怎么发送JAVA对象并在消费者端解析出JAVA对象--示例

1、在pom.xml中加入依赖 <dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-kafka</artifactId><version>3.1.6</version></dependency> 2、配置application.yml 加入Kafk…

【SQL实验】视图操作(菜单操作和命令操作)

完整代码在文章末尾【代码是自己的解答&#xff0c;并非标准答案&#xff0c;也有可能写错&#xff0c;文中可能会有不准确或待完善之处&#xff0c;恳请各位读者不吝批评指正&#xff0c;共同促进学习交流】 &#xff08;一&#xff09;菜单操作 1.建立视图“课程”&#xff…

python基础知识(七)——写入excel

一、写入excel 写入数据到excel wb load_workbook("testcase_api_wuye.xlsx") #打开一个已经存在的excel文件 sh wb["register"] #识别某一个表单 sh.cell(row 2,column 8).value "pass" #写入数据&#xff0c;单元格的值赋值 wb.sav…

MATLAB绘图基础11:3D图形绘制

参考书&#xff1a;《 M A T L A B {\rm MATLAB} MATLAB与学术图表绘制》(关东升)。 11.3D图形绘制 11.1 3D图概述 M A T L A B {\rm MATLAB} MATLAB的 3 D {\rm 3D} 3D图主要有&#xff1a; 3 D {\rm 3D} 3D散点图、 3 D {\rm 3D} 3D线图、 3 D {\rm 3D} 3D曲面图、 3 D {\rm…

ssm148基于Spring MVC框架的在线电影评价系统设计与实现+jsp(论文+源码)_kaic

毕 业 设 计&#xff08;论 文&#xff09; 题目&#xff1a;在线电影评价系统设计与实现 摘 要 现代经济快节奏发展以及不断完善升级的信息化技术&#xff0c;让传统数据信息的管理升级为软件存储&#xff0c;归纳&#xff0c;集中处理数据信息的管理方式。本在线电影评价系…

激光slam学习笔记5---ubuntu2004部署运行fastlivo踩坑记录

背景&#xff1a;看看fastlivo论文&#xff0c;觉得挺有意思的&#xff0c;就本地部署跑跑看看效果。个人环境&#xff0c;ubuntu20.04。 一、概要 由于依赖比较多&#xff0c;个人构建工作空间&#xff0c;使用catkin_make编译 src├── FAST-LIVO├── livox_ros_driver…

12. 利用“文件组织”实现石头剪刀布小游戏

文章目录 概要整体架构流程技术名词解释小结 1. 概要 ~ Jack Qiao对米粒说&#xff1a;“前面咱们了解过“文件组织”&#xff0c;让我们利用“文件组织”来制作一个有趣的“石头、剪刀、布”小游戏。”举个栗子&#xff1a; > 程序随机生成一个选择&#xff08;石头、剪刀…

VRT: 关于视频修复的模型

VRT: 关于视频修复的模型 1. 视频修复的背景与重要性背景介绍&#xff1a;重要性&#xff1a; 2. VRT的重要性和研究背景VRT的背景&#xff1a;VRT的重要性&#xff1a; 3. 视频修复概述3.1 定义与目标3.2 与单图像修复的区别3.3 对时间信息利用的需求 4. VRT模型详解4.1 整体框…

Stable Diffusion经典应用场景

&#x1f33a;系列文章推荐&#x1f33a; 扩散模型系列文章正在持续的更新&#xff0c;更新节奏如下&#xff0c;先更新SD模型讲解&#xff0c;再更新相关的微调方法文章&#xff0c;敬请期待&#xff01;&#xff01;&#xff01;&#xff08;本文及其之前的文章均已更新&…

04 —— Webpack打包CSS代码

加载器css-loader &#xff1a;解析css代码 webpack 中文文档 | webpack中文文档 | webpack中文网 加载器style-loader&#xff1a;把解析后的css代码插入到DOM style-loader | webpack 中文文档 | webpack中文文档 | webpack中文网 准备css代码&#xff0c;放到src/login目…

springboot高校网上缴费综合务系统

摘 要 相比于以前的传统手工管理方式&#xff0c;智能化的管理方式可以大幅降低运营人员成本&#xff0c;实现了高校网上缴费综合务系统的标准化、制度化、程序化的管理&#xff0c;有效地防止了高校网上缴费综合务的随意管理&#xff0c;提高了信息的处理速度和精确度&#x…

IDEA怎么定位java类所用maven依赖版本及引用位置

在实际开发中&#xff0c;我们可能会遇到需要搞清楚代码所用依赖版本号及引用位置的场景&#xff0c;便于排查问题&#xff0c;怎么通过IDEA实现呢&#xff1f; 可以在IDEA中打开项目&#xff0c;右键点击maven的pom.xml文件&#xff0c;或者在maven窗口下选中项目&#xff0c;…

springMVC重点知识

一、springMVC请求流程 二、springMVC环境搭建 Idea 下创建 springmvc01 ⼯程 1、pom.xml 坐标添加 <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>12</maven.compiler.source> …

django基于python 语言的酒店推荐系统

摘 要 酒店推荐系统旨在提供一个全面酒店推荐在线平台&#xff0c;该系统允许用户浏览不同的客房类型&#xff0c;并根据个人偏好和需求推荐合适的酒店客房。用户可以便捷地进行客房预订&#xff0c;并在抵达后简化入住登记流程。为了确保连续的住宿体验&#xff0c;系统还提供…

机器学习笔记——30种常见机器学习算法简要汇总

本笔记介绍机器学习中常见的30种机器学习算法。 文章目录 监督学习算法&#xff08;Supervised Learning&#xff09;回归算法&#xff08;Regression Algorithms&#xff09;分类算法&#xff08;Classification Algorithms&#xff09; 无监督学习算法&#xff08;Unsupervis…

Vue3、Vite5、Primevue、Oxlint、Husky9 简单快速搭建最新的Web项目模板

Vue3、Vite5、Oxlint、Husky9 简单搭建最新的Web项目模板 特色进入正题创建基础模板配置API自动化导入配置组件自动化导入配置UnoCss接入Primevue接入VueRouter4配置项目全局环境变量 封装Axios接入Pinia状态管理接入Prerttier OXLint ESLint接入 husky lint-staged&#xf…