golang中的并发模型

并发模型

传统的编程语言(如C++、Java、Python等)并非为并发而生的,因此它们面对并发的逻辑多是基于操作系统的线程。其并发的执行单元(线程)之间的通信利用的也是操作系统提供的线程或进程间通信的原语,比如共享内存、信号、管道、消息队列、套接字等。在这些通信原语中,使用最多、最广泛同时也最高效的是结合了线程同步原语(比如锁以及更为低级的原子操作)的共享内存方式,因此,可以说传统语言的并发模型是基于共享内存的模型

Untitled

这些传统的就基于共享内存的并发模型难用且易错,在大型程序中,开发人员在设计并发程序时需要根据线程模型对程序进行建模同时规划线程之间的通信方式,且程序难以阅读、理解、维护

Go采用了CSP(Communicating Sequential Process,通信顺序进程)模型

一个符合CSP模型的并发程序应该是一组通过输入/输出原语连接起来的P的集合

Untitled

CSP模型旨在简化并发程序的编写,让并发程序的编写与编写顺序程序一样简单。Tony Hoare认为输入/输出应该是基本的编程原语,数据处理逻辑(CSP中的P)仅需调用输入原语获取数据,顺序处理数据,并将结果数据通过输出原语输出

CSP理论中的P(Process,进程)是个抽象概念,它代表任何顺序处理逻辑的封装,它获取输入数据(或从其他P的输出获取),并生产可以被其他P消费的输出数据。

为了实现CSP模型中的输入/输出原语,Go引入了goroutine(P)之间的通信原语channel。通过channel将goroutine(P)组合与连接在一起,这使得设计和编写大型并发系统变得更为简单和清晰

虽然CSP模型已经成为Go语言支持的主流并发模型,但Go也支持传统的基于共享内存的并发模型,并提供基本的低级同步原语(主要是sync包中的互斥锁、条件变量、读写锁、原子操作等

那么在实践中应该如何选择是使用channel还是低级同步原语下的共享内存?

Go始终推荐以CSP模型风格构建并发程序,尤其是在复杂的业务层面。这将提升程序逻辑的清晰度,大大降低并发设计的复杂性,并让程序更具可读性和可维护性;

对于局部情况,比如涉及性能敏感的区域或需要保护的结构体数据,可以使用更为高效的低级同步原语(如sync.Mutex),以保证goroutine对数据的同步访问。

并发模式

在语言层面,Go针对CSP模型提供了三种并发原语。

  • goroutine:对应CSP模型中的P,封装了数据的处理逻辑,是Go运行时调度的基本执行单元。
  • channel:对应CSP模型中的输入/输出原语,用于goroutine之间的通信和同步。
  • select:用于应对多路输入/输出,可以让goroutine同时协调处理多个channel操作。

深入了解一下在实践中这些原语的常见组合方式,即并发模式:

创建模式

go关键字+function/method 创建 goroutine:

go fmt.println("I'm a goroutine")
​
c := srv.NewConn(rw)
go c.serve(connCtx)

在稍微复杂的程序里,需要考虑通过原语的承载体channel在goroutine间建立联系,所以通常采用以下方式建立goroutine:

type T struct {...}func spwan(f func()) chan T {
    c := make(chan T)
    go func() {
        ...
        f()
        ...
    }()
    return c
}func main() {
//使用c与新创建的goroutine通信
    c := spawn(func(){})
}

在内部创建一个goroutine并返回一个channel类型变量函数

spwan函数创建的新的goroutine和调用spwan函数的goroutine通过channel建立联系

函数得以实现得益于channel作为go语言的一等公民(first-class citizen)的存在:channel可以像变量一样被初始化、传递和赋值。上面例子中的spawn只返回了一个channel变量、

退出模式

goroutine的执行函数返回意味着goroutine退出。但有些时候会要求优雅退出,以下为方案:

分离(detached)模式

是使用最广泛的goroutine退出模式

创建它的goroutine不需要关心它的退出,这类goroutine在启动后即与其创建者彻底分离,其生命周期与其执行的主函数相关,函数返回即goroutine退出。这类goroutine有两个常见用途。

一次性任务:用来执行任务完成后既退出,比如此标准库代码:

// $GOROOT/src/net/dial.gofunc (d *Dialer) DialContext(ctx context.Context, network, address string) (Conn, error) {
    ...
    if oldCancel := d.Cancel; oldCancel != nil {
        subCtx, cancel := context.WithCancel(ctx)
        defer cancel()
//有数据处理后既退出
        go func() {
            select {
            case <-oldCancel:
                cancel()
            case <-subCtx.Done():
            }
        }()
        ctx = subCtx
    }
    ...
}

常驻后台执行的一些特定任务:如监视(monitor)、观察(watch)等。其实现通常采用for {…}或for { select{…} }代码段形式,并多以定时器(timer)或事件(event)驱动执行。

// $GOROOT/src/runtime/mgc.go
func gcBgMarkStartWorkers() {
    // 每个P都有一个运行在后台的用于标记的G
    for _, p := range allp {
        if p.gcBgMarkWorker == 0 {
            go gcBgMarkWorker(p) // 为每个P创建一个goroutine,以运行gcBgMarkWorker
            notetsleepg(&work.bgMarkReady, -1)
            noteclear(&work.bgMarkReady)
        }
    }
}func gcBgMarkWorker(_p_ *p) {
    gp := getg()
    ...
    for { // 常驻后台处理GC事宜
        ...
    }
}

Join模式

在线程模型中,父线程可以通过pthread_join来等待子线程结束并获取子线程的结束状态。

在Go中,我们有时候也有类似的需求:goroutine的创建者需要等待新goroutine结束。

  • 等待一个goroutine退出

先看一段实例代码

func worker(args ...interface{}) {
    if len(args) == 0 {
        return
    }
    interval, ok := args[0].(int)
    if !ok {
        return
    }
    
    time.Sleep(time.Second * (time.Duration(interval)))
}func spawn(f func(args ...interface{}), args ...interface{}) chan struct{} {
    c := make(chan struct{})
    go func() {
        f(args...)
        c <- struct{}{}
    }()
    return c
}func main() {
     done := spawn(worker, 5)
     println("spawn a worker goroutine")
     <-done
     println("worker done")
}

这个channel的用途就是在两个goroutine之间建立退出事件的“信号”通信机制。main goroutine在创建完新goroutine后便在该channel上阻塞等待,直到新goroutine退出前向该channel发送了一个信号。

运行过后

Untitled

  • 获取goroutine的退出状态

如果不仅要等goroutine退出还要精准获取其结束状态,可以通过自定义类型的channel实现这一需求:

var OK = errors.New("ok")func worker(args ...interface{}) error {
    if len(args) == 0 {
        return errors.New("invalid args")
    }
    interval, ok := args[0].(int)
    if !ok {
        return errors.New("invalid interval arg")
    }
    
    time.Sleep(time.Second * (time.Duration(interval)))
    return OK
}func spawn(f func(args ...interface{}) error, args ...interface{}) chan error {
    c := make(chan error)
    go func() {
        c <- f(args...)
    }()
    return c
}func main() {
    done := spawn(worker, 5)
    println("spawn worker1")
    err := <-done
    fmt.Println("worker1 done:", err)
    done = spawn(worker)
    println("spawn worker2")
    err = <-done
    fmt.Println("worker2 done:", err)
}

将channel中承载的类型由struct{}改为了error,这样channel承载的信息就不只是一个信号了,还携带了有价值的信息:新goroutine的结束状态。运行上述示例:

Untitled

  • 等待多个goroutine退出

有时候必须等待全部新goroutine退出,可以通过Go语言提供的sync.WaitGroup实现等待多个goroutine退出的模式:

func worker(args ...interface{}) {
    if len(args) == 0 {
        return
    }
    
    interval, ok := args[0].(int)
    if !ok {
        return
    }
    
    time.Sleep(time.Second * (time.Duration(interval)))
}func spawnGroup(n int, f func(args ...interface{}), args ...interface{}) chan struct{} {
    c := make(chan struct{})
    var wg sync.WaitGroup
    
    for i := 0; i < n; i++ {
        wg.Add(1)
        go func(i int) {
            name := fmt.Sprintf("worker-%d:", i)
            f(args...)
            println(name, "done")
            wg.Done() // worker done!
        }(i)
    }
    
    go func() {
        wg.Wait()
        c <- struct{}{}
    }()
    
    return c
}func main() {
    done := spawnGroup(5, worker, 3)
    println("spawn a group of workers")
    <-done
    println("group workers done")
}

通过sync.WaitGroup,spawnGroup每创建一个goroutine都会调用wg.Add(1),新创建的goroutine会在退出前调用wg.Done。

在spawnGroup中还创建了一个用于监视的goroutine,该goroutine调用sync.WaitGroup的Wait方法来等待所有goroutine退出。

在所有新创建的goroutine退出后,Wait方法返回,该监视goroutine会向done这个channel写入一个信号,这时main goroutine才会从阻塞在done channel上的状态中恢复,继续往下执行。

运行上述示例代码:

支持超时机制的等待

设置合理的退出时间,如若没有退出,则继续执行下一步:

func main() {
    done := spawnGroup(5, worker, 30)
    println("spawn a group of workers")
    
    timer := time.NewTimer(time.Second * 5)
    defer timer.Stop()
    select {
    case <-timer.C:
        println("wait group workers exit timeout!")
    case <-done:
        println("group workers done")
    }
}

notify-and-wait模式

main goroutine的停止代表着整个程序的停止,如果不事先通知退出,则容易导致业务数据损坏、不完整

我们可以通过notify-and-wait(通知并等待)模式来满足这一场景的要求。虽然这一模式也不能完全避免损失,但是它给了各个goroutine一个挽救数据的机会,从而尽可能减少损失。

  • 通知并等待一个goroutine的退出
func worker(j int) {
    time.Sleep(time.Second * (time.Duration(j)))
}func spawn(f func(int)) chan string {
    quit := make(chan string)
    go func() {
        var job chan int // 模拟job channel
        for {
            select {
            case j := <-job:
                f(j)
            case <-quit:
                quit <- "ok"
            }
        }
    }()
    return quit
}func main() {
    quit := spawn(worker)
    println("spawn a worker goroutine")
    
    time.Sleep(5 * time.Second)
    
    // 通知新创建的goroutine退出
    println("notify the worker to exit...")
    quit <- "exit"
    
    timer := time.NewTimer(time.Second * 10)
    defer timer.Stop()
    select {
    case status := <-quit:
        println("worker done:", status)
    case <-timer.C:
        println("wait worker exit timeout")
    }
}

执行

此时,spawn函数不仅发送退出信号给创建者还承载创建者发送的退出信号,形成了一个双向的数据通道

  • 通知并等待多个goroutine退出

channel存在一个特性:当使用close关闭channel时,所有阻塞到该channel上的goroutine都会得到通知,所以可以利用这一特性实现这一模式:

func worker(j int) {
    time.Sleep(time.Second * (time.Duration(j)))
}func spawnGroup(n int, f func(int)) chan struct{} {
    quit := make(chan struct{})
    job := make(chan int)
    var wg sync.WaitGroup
    
    for i := 0; i < n; i++ {
        wg.Add(1)
        go func(i int) {
            defer wg.Done() // 保证wg.Done在goroutine退出前被执行
            name := fmt.Sprintf("worker-%d:", i)
            for {
                j, ok := <-job
                if !ok {
                    println(name, "done")
                    return
                }
                // 执行这个job
                worker(j)
            }
        }(i)
    }
    
    go func() {
        <-quit
        close(job) // 广播给所有新goroutine
        wg.Wait()
        quit <- struct{}{}
    }()
    
    return quit
}func main() {
    quit := spawnGroup(5, worker)
    println("spawn a group of workers")
    
    time.Sleep(5 * time.Second)
    // 通知 worker goroutine 组退出
    println("notify the worker group to exit...")
    quit <- struct{}{}
    
    timer := time.NewTimer(time.Second * 5)
    defer timer.Stop()
    select {
    case <-timer.C:
        println("wait group workers exit timeout!")
    case <-quit:
        println("group workers done")
    }
}

创建者直接利用了worker goroutine接收任务(job)的channel来广播退出通知,而实现这一广播的代码就是close(job)。此时各个worker goroutine监听job channel,当创建者关闭job channel时,通过“comma ok”模式获取的ok值为false,也就表明该channel已经被关闭,于是worker goroutine执行退出逻辑(退出前wg.Done()被执行)。

执行:

退出模式的应用

由于goroutine的运行状态不同,因此很难用同种框架全面管理,所以我们可以只实现一个“超时等待退出”框架,以统一解决各种运行状态

一组goroutine的退出有两种情况,第一种情况是并发退出,当goroutine的退出先后数据对数据处理无影响时可使用;另一种则是串行退出,也就是次序错误可能导致程序状态混乱和错误

  • 并发退出
  • 串行退出

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/165449.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

基于Netty实现的简单聊天服务组件

目录 基于Netty实现的简单聊天服务组件效果展示技术选型&#xff1a;功能分析聊天服务基础设施配置&#xff08;基于Netty&#xff09;定义组件基础的配置&#xff08;ChatProperties&#xff09;定义聊天服务类&#xff08;ChatServer&#xff09;定义聊天服务配置初始化类&am…

闭眼检测实现

引言 这段代码是一个实时眼睛状态监测程序&#xff0c;可以用于监测摄像头捕获的人脸图像中的眼睛状态&#xff0c;判断眼睛是否闭合。具体应用实现作用说明如下&#xff1a; 1. 实时监测眼睛状态 通过摄像头捕获的实时视频流&#xff0c;检测人脸关键点并计算眼睛的 EAR&a…

如何在CSDN植入广告

如何在CSDN植入广告 概述 如果你的博客访问量很大&#xff0c;你可以通过如下方式在博客上放置广告而通过博客赚钱 广告联盟 google adsense 链接&#xff1a;Adsense 比较主流的应该是Google Adsense&#xff0c;可以配置自动广告&#xff08;包含 业内广告、锚定广告、侧…

Web安全研究(五)

Automated WebAssembly Function Purpose Identification With Semantics-Aware Analysis WWW23 文章结构 introbackgroundsystem design abstraction genapplying abstractionsclassifier data collection and handling data acquisitionstatistics of collected datamodule-…

SpringBoot框架简介

SpringBoot框架简介 简单介绍 前言&#xff1a; 我们大家都知道Spring&#xff0c;Boot是启动的意思&#xff0c;所以SpringBoot其实是一个启动Spring项目的一个工具&#xff0c;从根本上讲&#xff0c;SpringBoot就是一些库的集合&#xff0c;它能够被任意项目的构建系统所使…

YOLOv8 加持 MobileNetv3,目标检测新篇章

🗝️YOLOv8实战宝典--星级指南:从入门到精通,您不可错过的技巧   -- 聚焦于YOLO的 最新版本, 对颈部网络改进、添加局部注意力、增加检测头部,实测涨点 💡 深入浅出YOLOv8:我的专业笔记与技术总结   -- YOLOv8轻松上手, 适用技术小白,文章代码齐全,仅需 …

qsort使用举例和qsort函数的模拟实现

qsort使用举例 qsort是C语言中的一个标准库函数&#xff0c;用于对数组或者其他数据结构中的元素进行排序。它的原型如下&#xff1a; void qsort(void *base, size_t nmemb, size_t size, int (*compar)(const void *, const void *)); 我们可以去官网搜来看一看&#xff1a;…

基于Vue+SpringBoot的大病保险管理系统 开源项目

项目编号&#xff1a; S 031 &#xff0c;文末获取源码。 \color{red}{项目编号&#xff1a;S031&#xff0c;文末获取源码。} 项目编号&#xff1a;S031&#xff0c;文末获取源码。 目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块2.1 系统配置维护2.2 系统参保管理2.3 大…

基于灰狼算法(GWO)优化的VMD参数(GWO-VMD)

代码的使用说明 基于灰狼算法优化的VMD参数 代码的原理 基于灰狼算法&#xff08;Grey Wolf Optimizer, GWO&#xff09;优化的VMD参数&#xff08;GWO-VMD&#xff09;是一种结合了GWO和VMD算法的优化方法&#xff0c;用于信号分解和特征提取。 GWO是一种基于群体智能的优化…

Transformer中WordPiece/BPE等不同编码方式详解以及优缺点

❤️觉得内容不错的话&#xff0c;欢迎点赞收藏加关注&#x1f60a;&#x1f60a;&#x1f60a;&#xff0c;后续会继续输入更多优质内容❤️ &#x1f449;有问题欢迎大家加关注私戳或者评论&#xff08;包括但不限于NLP算法相关&#xff0c;linux学习相关&#xff0c;读研读博…

C语言 字符函数汇总,模拟实现各字符函数(炒鸡详细)

目录 求字符串长度 strlen 示例 模拟实现strlen 长度不受限制的字符串函数 strcpy 示例 模拟实现strcpy strcat 模拟实现strcat strcmp 示例 模拟实现strcmp 长度受限制的字符串函数介绍 strncpy 示例 模拟实现strncpy strncat 示例 模拟实现strncat s…

MySQL数据库索引以及使用唯一索引实现幂等性

&#x1f4d1;前言 本文主要是MySQL数据库索引以及使用唯一索引实现幂等性的文章&#xff0c;如果有什么需要改进的地方还请大佬指出⛺️ &#x1f3ac;作者简介&#xff1a;大家好&#xff0c;我是青衿&#x1f947; ☁️博客首页&#xff1a;CSDN主页放风讲故事 &#x1f30…

数据结构:红黑树讲解(C++)

红黑树 1.前言2.红黑树简述2.1概念2.2性质 3.红黑树的插入3.1关于新插入节点的颜色3.2节点的定义3.3插入新节点3.4判断插入后是否需要调整3.5插入后维持红黑树结构&#xff08;重点&#xff09;3.5.1cur、p、u为红&#xff0c;g为黑3.5.2cur、p为红&#xff0c;g为黑&#xff0…

MISRA 2012学习笔记(5)-Rules 8.10

文章目录 Rules8.10 基本类型模型(The essential type model)8.10.1 原理8.10.2 基本类型(Essential type)Rule 10.1 操作数不得具有不适当的基本类型Rule 10.2 在加减法运算中&#xff0c;不得不当使用本质为字符类型的表达式Rule 10.3 表达式的值不得赋值给具有较窄基本类型或…

【数据结构(二)】单链表(3)

文章目录 1. 链表介绍2. 单链表应用实例2.1. 顺序添加方式2.1.1. 思路分析2.1.2. 代码实现 2.2. 按照编号顺序添加方式2.2.1. 思路分析2.2.2. 代码实现 3. 单链表节点的修改3.1. 思路分析3.2. 代码实现 4. 单链表节点的删除4.1. 思路分析4.2. 代码实现 5. 单链表常见面试题5.1.…

影刀sqlite的插入方法

影刀sqlite的插入方法 变量外面不用加‘’

YOLO免费数据集网站收集

目录 Roboflow Universe: Open Source Computer Vision Community Find Open Datasets and Machine Learning Projects | Kaggle ​编辑 【火焰和烟雾图像数据集】-计算机视觉数据集-极市开发者平台 (cvmart.net) 开放数据集- 飞桨AI Studio星河社区 - 人工智能学习与实训社…

【iOS】——知乎日报第五周总结

文章目录 一、评论区展开与收缩二、FMDB库实现本地持久化FMDB常用类&#xff1a;FMDB的简单使用&#xff1a; 三、点赞和收藏的持久化 一、评论区展开与收缩 有的评论没有被回复评论或者被回复评论过短&#xff0c;这时就不需要展开全文的按钮&#xff0c;所以首先计算被回复评…

【LeetCode刷题-树】-- 572.另一棵树的子树

572.另一棵树的子树 方法&#xff1a;深度优先搜索暴力匹配 深度优先搜索枚举root中的每一个节点&#xff0c;判断这个点的子树是否与subroot相等 /*** Definition for a binary tree node.* public class TreeNode {* int val;* TreeNode left;* TreeNode right…