Go channel关闭方法

channel关闭原则

1、不能在消费端关闭channel(基础原则,单生产者或多生产者均不能在消费端关闭);

2、多个生产者时,不能对channel执行关闭;

3、只有在唯一或最后唯一剩下的生产者协程中关闭channel,之后通知消费者
   没有值可以读,才能确保向一个已经关闭的channel中不再发送数据;

暴力关闭channel

强行在消费端或多个生产者端关闭channel会产生pannic,可以使用recover机制接收异常避免崩溃;

生产端:
func SafeSend(ch chan T, value T) (closed bool) {
    defer func() {
        if recover() != nil {
            // The return result can be altered 
            // in a defer function call.
            closed = true
        }
    }()
    
    ch <- value // panic if ch is closed
    return false // <=> closed = false; return
}

消费端:
func SafeClose(ch chan T) (justClosed bool) {
    defer func() {
        if recover() != nil {
            justClosed = false
        }
    }()
    
    // assume ch != nil here.
    close(ch) // panic if ch is closed
    return true // <=> justClosed = true; return
}

不同生产者消费者关闭channel情况

1.单生产者单(多)消费者

直接在生产者端close();

2.多生产者单消费者

不能在生产者端直接close(),需要新建一个信号channel,通知发送端停止发送数据;channel在没有go协程引用时会自动关闭,不用显式关闭;

package main

import (
	"time"
	"math/rand"
	"sync"
	"log"
)

func main() {
	rand.Seed(time.Now().UnixNano())
	log.SetFlags(0)
	
	// ...
	const MaxRandomNumber = 100000
	const NumSenders = 1000
	
	wgReceivers := sync.WaitGroup{}
	wgReceivers.Add(1)
	
	// ...
	dataCh := make(chan int, 100)
	stopCh := make(chan struct{})
	// stopCh is an additional signal channel.
	// Its sender is the receiver of channel dataCh.
	// Its reveivers are the senders of channel dataCh.
    // stopCh为添加的信号channel,它的数据来源是dataCh的接受端发出的数据,它的数据    
    // 是在dataCh的生产端进行消费;
	
	// senders
	for i := 0; i < NumSenders; i++ {
		go func() {
			for {
				// The first select here is to try to exit the goroutine
				// as early as possible. In fact, it is not essential
				// for this example, so it can be omitted.
				select {
				case <- stopCh:
					return
				default:
				}
				
				// Even if stopCh is closed, the first branch in the
				// second select may be still not selected for some
				// loops if the send to dataCh is also unblocked.
				// But this is acceptable, so the first select
				// can be omitted.
                
                // ?对于某些loop,dataCh的生产端塞入数据,即使stopCh已经关闭,第二个select的           
                // 第一个分支仍然可能不能被选择(select如果多个条件同时满足条件,会随机选择);
				select {
				case <- stopCh:
					return
				case dataCh <- rand.Intn(MaxRandomNumber):
				}
			}
		}()
	}
	
	// the receiver
	go func() {
		defer wgReceivers.Done()
		
		for value := range dataCh {
			if value == MaxRandomNumber-1 {
				// The receiver of the dataCh channel is
				// also the sender of the stopCh cahnnel.
				// It is safe to close the stop channel here.
				close(stopCh)
				return
			}
			
			log.Println(value)
		}
	}()
	
	// ...
	wgReceivers.Wait()
}
3.多生产者多消费者

不能让接受端或发送端关闭channel,甚至都不能让接受者关闭一个退出信号来通知生产者停止生产,因为多消费者会导致接受者多次执行close(),相当于多个生产端关闭channel,违反了channel关闭原则,但可以引入一个额外的协调者来关闭附加的退出信号channel。

package main

import (
	"time"
	"math/rand"
	"sync"
	"log"
	"strconv"
)

func main() {
	rand.Seed(time.Now().UnixNano())
	log.SetFlags(0)
	
	// ...
	const MaxRandomNumber = 100000
	const NumReceivers = 10
	const NumSenders = 1000
	
	wgReceivers := sync.WaitGroup{}
	wgReceivers.Add(NumReceivers)
	
	// ...
	dataCh := make(chan int, 100)
	stopCh := make(chan struct{})
		// stopCh is an additional signal channel.
		// Its sender is the moderator goroutine shown below.
		// Its reveivers are all senders and receivers of dataCh.
	toStop := make(chan string, 1)
		// The channel toStop is used to notify the moderator
		// to close the additional signal channel (stopCh).
		// Its senders are any senders and receivers of dataCh.
		// Its reveiver is the moderator goroutine shown below.
	
	var stoppedBy string
	
	// moderator
	go func() {
		stoppedBy = <- toStop
		close(stopCh)
	}()
	
	// senders
	for i := 0; i < NumSenders; i++ {
		go func(id string) {
			for {
				value := rand.Intn(MaxRandomNumber)
				if value == 0 {
					// Here, a trick is used to notify the moderator
					// to close the additional signal channel.
					select {
					case toStop <- "sender#" + id:
					default:
					}
					return
				}
				
				// The first select here is to try to exit the goroutine
				// as early as possible. This select blocks with one
				// receive operation case and one default branches will
				// be optimized as a try-receive operation by the
				// official Go compiler.
				select {
				case <- stopCh:
					return
				default:
				}
				
				// Even if stopCh is closed, the first branch in the
				// second select may be still not selected for some
				// loops (and for ever in theory) if the send to
				// dataCh is also unblocked.
				// This is why the first select block is needed.
				select {
				case <- stopCh:
					return
				case dataCh <- value:
				}
			}
		}(strconv.Itoa(i))
	}
	
	// receivers
	for i := 0; i < NumReceivers; i++ {
		go func(id string) {
			defer wgReceivers.Done()
			
			for {
				// Same as the sender goroutine, the first select here
				// is to try to exit the goroutine as early as possible.
				select {
				case <- stopCh:
					return
				default:
				}
				
				// Even if stopCh is closed, the first branch in the
				// second select may be still not selected for some
				// loops (and for ever in theory) if the receive from
				// dataCh is also unblocked.
				// This is why the first select block is needed.
				select {
				case <- stopCh:
					return
				case value := <-dataCh:
					if value == MaxRandomNumber-1 {
						// The same trick is used to notify
						// the moderator to close the
						// additional signal channel.
						select {
						case toStop <- "receiver#" + id:
						default:
						}
						return
					}
					
					log.Println(value)
				}
			}
		}(strconv.Itoa(i))
	}
	
	// ...
	wgReceivers.Wait()
	log.Println("stopped by", stoppedBy)
}
Context结束多个协程
package main

import (
	"context"
	"fmt"
	"sync"
	"time"
)

func worker(ctx context.Context, id int, wg *sync.WaitGroup) {
	defer wg.Done()

	for {
		select {
		case <-ctx.Done():
			fmt.Printf("Worker %d canceled\n", id)
			return
		default:
			// 执行协程的工作任务
			fmt.Printf("Worker %d working\n", id)
			time.Sleep(time.Second)
		}
	}
}

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	var wg sync.WaitGroup

	// 启动多个协程
	for i := 1; i <= 5; i++ {
		wg.Add(1)
		go worker(ctx, i, &wg)
	}

	// 主程序等待一段时间后取消所有协程
	time.Sleep(time.Second * 3)
	cancel()

	// 等待所有协程完成
	wg.Wait()

	fmt.Println("Main program finished")
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/957477.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

初探——【Linux】程序的翻译与动静态链接

我们所写的C/C程序计算机是看不懂的&#xff0c;它只认识0101这样的机器码。所以我们就需要借助编译器对这些源代码进行翻译&#xff0c;使之成为计算机能够执行的二进制指令。这个过程通常分为几个关键步骤&#xff1a;预处理、编译、汇编和链接。 一.预处理&#xff08;Prep…

亲测有效!如何快速实现 PostgreSQL 数据迁移到 时序数据库TDengine

小T导读&#xff1a;本篇文章是“2024&#xff0c;我想和 TDengine 谈谈”征文活动的优秀投稿之一&#xff0c;作者从数据库运维的角度出发&#xff0c;分享了利用 TDengine Cloud 提供的迁移工具&#xff0c;从 PostgreSQL 数据库到 TDengine 进行数据迁移的完整实践过程。文章…

matlab实现数据极坐标显示

%% % 读取文件数据 filename E:\ProjectWorkspace\866\866data\665hangji.txt;%代码 距离 方位相对正北 时间 地址 横滚角度 TRK importdata(filename); filename1 E:\ProjectWorkspace\866\866data\665dianji.txt;%代码 距离 方位相对正北 时间 地址 横滚角度 PLOT …

Jenkins 启动

废话 这一阵子感觉空虚&#xff0c;心里空捞捞的&#xff0c;总想找点事情做&#xff0c;即使这是一件微小的事情&#xff0c;空余时间除了骑车、打球&#xff0c;偶尔朋友聚会 … 还能干什么呢&#xff1f; 当独自一人时&#xff0c;究竟可以做点什么&#xff0c;填补这空虚…

人工智能之深度学习_[4]-神经网络入门

文章目录 神经网络基础1 神经网络1.1 神经网络概念1.1.1 什么是神经网络1.1.2 如何构建神经网络1.1.3 神经网络内部状态值和激活值 1.2 激活函数1.2.1 网络非线性因素理解1.2.2 常见激活函数1.2.2.1 Sigmoid 激活函数1.2.2.2 Tanh 激活函数1.2.2.3 ReLU 激活函数1.2.2.4 SoftMa…

卸载和安装Git小乌龟、git基本命令

卸载 Git 打开控制面板&#xff1a; 按 Win R 打开运行对话框&#xff0c;输入 control 并按回车键。或直接在功能搜索里搜索“控制面板”。在控制面板中&#xff0c;选择“程序”或“程序和功能”。 查找并卸载 Git&#xff1a; 在程序列表中找到“Git”或“Git for Windows…

群论学习笔记

什么是对称&#xff1f; 对称是一个保持对象结构不变的变换&#xff0c;对称是一个过程&#xff0c;而不是一个具体的事物&#xff0c;伽罗瓦的对称是对方程根的置换&#xff0c;而一个置换就是对一系列事物的重排方式&#xff0c;严格的说&#xff0c;它也并不是这个重排本身…

C语言自定义类型:结构体

结构体简介&#xff1a; c语言里int 、float、double、等等类型来表示一个对象&#xff0c;但有时也有未能表达的对象&#xff0c;比如表示一个人的类型&#xff0c;这个类型里有人的身高、体重、年龄等等&#xff0c;这就需要很多个类型来拼凑&#xff0c;这就很不方便。于是…

【整体介绍】

ODO&#xff1a;汽车总行驶里程 Chime: 例如安全带没系的报警声音 多屏交互就是中控屏的信息会同步到主驾驶的仪表盘上 面试问题&#xff1a;蓝牙电话协议HFP 音乐协议A2DP 三方通话测试的逻辑

线性规划:机器学习中的优化利器

一、线性规划的基本概念 线性规划&#xff08;Linear Programming, LP&#xff09;是运筹学中数学规划的一个重要分支&#xff0c;用于在一组线性不等式的约束条件下&#xff0c;找到线性目标函数的最大值或最小值。其问题可以表述为&#xff1a; 在一组线性约束条件 s.t.&am…

SurgiTrack:外科手术视频中的细粒度多类别多工具跟踪|文献速递-视觉大模型医疗图像应用|文献速递-视觉大模型医疗图像应用

Title 题目 SurgiTrack: Fine-grained multi-class multi-tool tracking in surgical videos SurgiTrack&#xff1a;外科手术视频中的细粒度多类别多工具跟踪 01 文献速递介绍 手术器械跟踪在计算机辅助手术系统中发挥着至关重要的作用&#xff0c;可为一系列应用提供有价…

亚博microros小车-原生ubuntu支持系列:1 键盘控制

背景&#xff1a;电脑配置不太行&#xff0c;我在ubuntu再运行vmware&#xff0c;里面运行亚博官方的虚拟机镜像ubuntu&#xff0c;系统很卡。基本上8G内存给打满了。还是想把亚博官方的代码迁移出来&#xff0c;之前售后就说除了官方镜像虚拟机&#xff0c;需要自己摸索迁移。…

总结5..

#include<stdio.h> struct nb {//结构体列队 int x, y;//x为横坐标&#xff0c;y为纵坐标 int s, f;//s为步数&#xff0c;//f为方向 }link[850100]; int n, m, x, y, p, q, f; int hard 1, tail 1; int a[52][52], b[52][52], book[52][52][91]; int main() { …

鸿蒙系统 将工程HarmonyOS变成OpenHarmony

DevEco Studio软件创建工程后需要修改两个地方&#xff1a; 修改第二个build-profile.json5文件 将原先内容&#xff1a; {"app": {"signingConfigs": [],"products": [{"name": "default","signingConfig": &q…

什么样的问题适合用递归

递归是一种通过函数调用自身来解决问题的方法。递归适用于那些可以被分解为相似子问题的问题&#xff0c;即原问题可以通过解决一个或多个更小规模的同类问题来解决。递归通常需要满足以下两个条件&#xff1a; 递归基&#xff08;Base Case&#xff09;&#xff1a;问题的最简…

C# 网络协议第三方库Protobuf的使用

为什么要使用二进制数据 通常我们写一个简单的网络通讯软件可能使用的最多的是字符串类型&#xff0c;比较简单&#xff0c;例如发送格式为(head)19|Msg:Heart|100,x,y,z…&#xff0c;在接收端会解析收到的socket数据。 这样通常是完全可行的&#xff0c;但是随着数据量变大&…

认识BOM

BOM 弹出层 可视窗口尺寸 屏幕宽高 浏览器内核和其操作系统的版本 剪贴板 是否允许使用cookie 语言 是否在线

国产编辑器EverEdit - 大纲视图

1 大纲视图 1.1 应用场景 在编辑较长代码文件时&#xff0c;使用大纲视图可以方便的检视当前文件的变量、函数等信息&#xff0c;方便在不同函数间跳转&#xff0c;对整个文档的全貌了然于胸。   在编辑XML文档时&#xff0c;通过展示XML文件的层次结构、节点布局&#xff0…

(2)STM32 USB设备开发-USB虚拟串口

例程&#xff1a;STM32USBdevice: 基于STM32的USB设备例子程序 - Gitee.com 本篇为USB虚拟串口教程&#xff0c;没有知识&#xff0c;全是实操&#xff0c;按照步骤就能获得一个STM32的USB虚拟串口。本例子是在野火F103MINI开发板上验证的&#xff0c;如果代码中出现一些外设的…

68,[8] BUUCTF WEB [RoarCTF 2019]Simple Upload(未写完)

<?php // 声明命名空间&#xff0c;遵循 PSR-4 自动加载规范&#xff0c;命名空间为 Home\Controller namespace Home\Controller;// 导入 Think\Controller 类&#xff0c;以便扩展该类 use Think\Controller;// 定义 IndexController 类&#xff0c;继承自 Think\Control…