Golang项目:实现生产者消费者模式

one-one

先创建out.go目录与文件夹

// 定义了一个名为out的包,用于处理输出相关的功能。
package out

import "fmt"

// Out结构体定义了一个channel,用于存储需要输出的数据。
type Out struct {
	data chan interface{} // data字段是一个interface{}类型的channel,用于存储任意类型的数据。
}

// out是一个全局变量,指向Out类型的指针,用于全局访问。
var out *Out

// Newout函数用于创建一个新的Out实例,如果全局变量out为nil,则创建一个新的Out实例并初始化其data字段。
// 如果out已经初始化,则直接返回现有的out实例。
func Newout() *Out {
	if out == nil {
		out = &Out{
			make(chan interface{}, 65535), // 初始化data字段为一个容量为65535的channel。
		}
	}
	return out
}

// Println函数用于向Out实例的data channel发送数据。
// 它接受一个interface{}类型的参数i,并将i发送到out的data channel中。
func Println(i interface{}) {
	out.data <- i // 向channel发送数据。
}

// OutPut方法是Out结构体的方法,用于从data channel中接收数据并打印。
// 它是一个无限循环,使用select语句监听data channel。
// 当channel中有数据时,使用fmt.Println打印接收到的数据。
func (o *Out) OutPut() {
	for {
		select {
		case i := <-o.data: // 从channel中接收数据。
			fmt.Println(i) // 打印接收到的数据。
		}
	}
}

同时创建one-one.go文件,来实现单生产者单消费者模型

// 定义了一个名为one_one的包,用于实现生产者-消费者模式。
package one_one

import (
	"producer-consumer/out" // 导入自定义的out包,用于输出功能。
	"sync"                 // 导入sync包,用于同步goroutine。
)

// Task结构体定义了一个任务,包含一个ID字段。
type Task struct {
	ID int64
}

// Task的run方法用于执行任务,这里只是简单地打印任务的ID。
func (t *Task) run() {
	out.Println(t.ID) // 使用out包的Println函数打印任务ID。
}

// taskCh是一个缓冲channel,用于在生产者和消费者之间传递Task对象。
var taskCh = make(chan Task, 10)

// taskNum定义了要生成的任务数量。
const taskNum int64 = 1000

// producer函数是一个生产者goroutine,它生成taskNum个任务并发送到channel。
func producer(wo chan<- Task) {
	var i int64
	for i = 1; i < taskNum; i++ {
		t := Task{i} // 创建一个新的Task对象。
		wo <- t       // 将Task对象发送到channel。
	}
	close(wo) // 生产结束后关闭channel。
}

// consumer函数是一个消费者goroutine,它从channel接收任务并执行。
func consumer(ro <-chan Task) {
	for t := range ro {
		if t.ID != 0 {
			t.run() // 执行任务。
		}
	}
}

// Exec函数用于启动生产者和消费者goroutine,并等待它们完成。
func Exec() {
	wg := &sync.WaitGroup{} // 创建一个WaitGroup对象用于同步。
	wg.Add(2)               // 增加两个计数,一个用于生产者,一个用于消费者。

	// 启动生产者goroutine。
	go func(wg *sync.WaitGroup) {
		defer wg.Done() // 确保在goroutine结束时减少WaitGroup计数。
		go producer(taskCh) // 调用producer函数启动生产者。
	}(wg)

	// 启动消费者goroutine。
	go func(wg *sync.WaitGroup) {
		defer wg.Done() // 确保在goroutine结束时减少WaitGroup计数。
		consumer(taskCh) // 调用consumer函数启动消费者。
	}(wg)

	wg.Wait() // 等待所有goroutine完成。
	out.Println("执行成功") // 打印执行成功的消息。
}

最后在main函数中测试

// 定义了main包,这是程序的入口点。
package main

import (
	one_one "producer-consumer/one-one" // 导入one-one包,它实现了生产者-消费者模式。
	"producer-consumer/out"            // 导入out包,它提供了输出功能。
	"time"                           // 导入time包,用于处理时间相关的功能。
)

// main函数是程序的入口点。
func main() {
	// 创建out包的Out实例,用于后续的输出操作。
	o := out.Newout()

	// 启动一个goroutine来运行Out实例的OutPut方法,这个方法会不断地从channel中读取数据并打印。
	go o.OutPut()

	// 调用one_one包的Exec函数,启动生产者和消费者逻辑。
	one_one.Exec()

	// 让main函数暂停4秒钟,确保有足够的时间让生产者和消费者完成它们的任务。
	// 这是为了在程序结束前给goroutine足够的时间来处理和打印所有的输出。
	time.Sleep(time.Second * 4)
}

one-many

将原先one-one文件稍做改变
Chanel的线程安全的,所以可以直接执行并发操作

package one_many

import (
	"producer-consumer/out"
	"sync"
)

type Task struct {
	ID int64
}

func (t *Task) run() {
	out.Println(t.ID)
}

var taskCh = make(chan Task, 10)

const taskNum int64 = 10000

func producer(wo chan<- Task) {
	var i int64
	for i = 1; i < taskNum; i++ {
		t := Task{i}
		wo <- t
	}
	close(wo)
}

func consumer(ro <-chan Task) {
	for t := range ro {
		if t.ID != 0 {
			t.run()
		}
	}
}

func Exec() {
	wg := &sync.WaitGroup{}
	wg.Add(1)

	go func(wg *sync.WaitGroup) {
		defer wg.Done()
		go producer(taskCh)
	}(wg)

	var i int64
	for i = 0; i < taskNum; i++ {
		if i%100 == 0 {
			wg.Add(1)
			go func(wg *sync.WaitGroup) {
				defer wg.Done()
				consumer(taskCh)
			}(wg)
		}
	}
	wg.Wait()
	out.Println("执行成功")
}

many-one

package many_one

import (
	"producer-consumer/out" 
	"sync"                   
)

// Task 结构体表示一个任务,每个任务有一个唯一的 ID
type Task struct {
	ID int64 // 任务的 ID,用于标识任务
}

// run 方法用来执行任务(在这里是打印任务 ID)
func (t *Task) run() {
	out.Println(t.ID) // 输出任务 ID
}

// taskCh 是一个缓冲区为 10 的 channel,用于在生产者和消费者之间传递 Task
var taskCh = make(chan Task, 10)

// taskNum 代表总共有多少个任务需要处理
const taskNum int64 = 10000

// nums 代表每次生产者生产的任务数量(批量生产)
const nums int64 = 100

// producer 函数模拟任务的生产者。它会将任务(Task)发送到任务 channel 中
func producer(wo chan<- Task, startNum int64, nums int64) {
	// 循环生成任务并将任务发送到任务通道
	var i int64
	for i = startNum; i < startNum+nums; i++ {
		t := Task{ID: i} // 创建一个任务
		wo <- t           // 将任务发送到任务通道
	}
}

// consumer 函数模拟任务的消费者。它从任务通道接收任务并处理
func consumer(ro <-chan Task) {
	// 从任务通道读取任务并执行
	for t := range ro {
		if t.ID != 0 { // 如果任务 ID 不为 0,则执行任务
			t.run()
		}
	}
}

// Exec 是主执行函数,负责启动生产者和消费者 goroutine,并进行协程同步
func Exec() {
	// 创建两个 WaitGroup 用于同步生产者和消费者的执行
	wg := &sync.WaitGroup{} // 用于等待消费者 goroutine 完成
	pwg := &sync.WaitGroup{} // 用于等待所有生产者 goroutine 完成

	// 启动消费者 goroutine,开始从 taskCh 中消费任务
	wg.Add(1) // 增加一个计数,表示一个 goroutine(消费者)需要等待
	go func() {
		defer wg.Done() // 在消费者完成时调用 Done(),减少计数
		consumer(taskCh) // 启动消费者并处理任务
	}()

	// 启动多个生产者 goroutine,每个生产者负责生成一定数量的任务
	for i := int64(0); i < taskNum; i += nums {
		if i >= taskNum {
			break
		}

		pwg.Add(1) // 增加一个计数,表示一个 goroutine(生产者)需要等待
		// 启动生产者 goroutine,批量生成任务并将其发送到 taskCh
		go func(i int64) {
			defer pwg.Done()  // 在生产者完成时调用 Done(),减少计数
			producer(taskCh, i, nums) // 启动生产者并生成任务
		}(i)
	}

	// 输出执行成功的提示
	out.Println("执行成功")

	// 等待所有生产者完成(所有任务已发送完)
	pwg.Wait()

	// 在所有生产者完成后关闭任务通道
	// 使用 go 是为了防止与消费者的执行顺序冲突
	// 一旦任务通道关闭,消费者会停止接收任务
	go close(taskCh)

	// 等待消费者完成(所有任务已被处理)
	wg.Wait()
}

在这里插入图片描述

many-many

多对多模式,消费者和生产者都不主动退出,我们通过一个第三方信号来控制退出

// many_many包实现了多个生产者和多个消费者的场景。
package many_many

import "producer-consumer/out" // 导入out包,用于输出任务ID。

// Task结构体定义了一个任务,包含一个ID字段。
type Task struct {
	ID int64
}

// Task的run方法用于执行任务,这里只是简单地打印任务的ID。
func (t *Task) run() {
	out.Println(t.ID)
}

// taskChan是一个缓冲channel,用于在生产者和消费者之间传递Task对象。
var taskChan = make(chan Task, 10)

// done是一个用于通知生产者和消费者退出的channel。
var done = make(chan struct{})

// taskNum定义了生产者要生成的任务数量。
const taskNum int64 = 10000

// producer函数是一个生产者goroutine,它生成任务并发送到taskChan。
func producer(wo chan<- Task, done chan struct{}) {
	var i int64
	for {
		if i >= taskNum {
			i = 0 // 如果达到任务数量上限,重置i。
		}
		i++
		t := Task{
			ID: i,
		}
		// 使用select语句来处理两个case:发送任务到channel或者接收done信号退出。
		select {
		case wo <- t:
		case <-done:
			out.Println("生产者退出")
			return
		}
	}
}

// consumer函数是一个消费者goroutine,它从taskChan接收任务并执行。
func consumer(ro <-chan Task, done chan struct{}) {
	for {
		select {
		case t := <-ro:
			if t.ID != 0 {
				t.run() // 执行任务。
			}
		case <-done:
			// 如果接收到done信号,处理channel中剩余的所有任务然后退出。
			for t := range ro {
				if t.ID != 0 {
					t.run()
				}
			}
			return
		}
	}
}

// Exec函数用于启动多个生产者和消费者goroutine。
func Exec() {
	// 启动多个生产者goroutine。
	for i := 0; i < 8; i++ {
		go producer(taskChan, done)
	}
	// 启动多个消费者goroutine。
	for i := 0; i < 4; i++ {
		go consumer(taskChan, done)
	}
}

此时会无限的生产和消费

这时候我们关闭channel
如果先关闭数据channel,在关闭控制channel


	time.Sleep(5 * time.Second)
	close(taskChan)
	close(done)
	time.Sleep(5 * time.Second)

	fmt.Println(len(taskChan))

在这里插入图片描述
报错,说我们往关闭了的channel里写数据

因为,如果我们没有先关闭控制channel,那么消费者和生产者就都还没有收到停止的消息,在两行语句的时间差中,会发生很多写入channel的操作。

所以我们要先关闭控制channel
在这里插入图片描述
正常退出!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/925903.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

说说Elasticsearch拼写纠错是如何实现的?

大家好&#xff0c;我是锋哥。今天分享关于【说说Elasticsearch拼写纠错是如何实现的&#xff1f;】面试题。希望对大家有帮助&#xff1b; 说说Elasticsearch拼写纠错是如何实现的&#xff1f; 1000道 互联网大厂Java工程师 精选面试题-Java资源分享网 在 Elasticsearch 中&…

【Leecode】Leecode刷题之路第62天之不同路径

题目出处 62-不同路径-题目出处 题目描述 个人解法 思路&#xff1a; todo代码示例&#xff1a;&#xff08;Java&#xff09; todo复杂度分析 todo官方解法 62-不同路径-官方解法 方法1&#xff1a;动态规划 思路&#xff1a; 代码示例&#xff1a;&#xff08;Java&…

Windows修复SSL/TLS协议信息泄露漏洞(CVE-2016-2183) --亲测

漏洞说明&#xff1a; 打开链接&#xff1a;https://docs.microsoft.com/zh-cn/troubleshoot/windows-server/windows-security/restrict-cryptographic-algorithms-protocols-schannel 可以看到&#xff1a; 找到&#xff1a;应通过配置密码套件顺序来控制 TLS/SSL 密码 我们…

第六届国际科技创新(IAECST 2024)暨第四届物流系统与交通运输(LSTT 2024)

重要信息 会议官网&#xff1a;www.lstt.org 大会时间&#xff1a;2024年12月6-8日 大会地点&#xff1a;中国-广州 简介 第六届国际科技创新暨第四届物流系统与交通运输国际&#xff08;LSTT 2024&#xff09;将于2024年12月6-8日在广州举办&#xff0c;这是一个集中探讨…

ArcGIS 软件中路网数据的制作

内容导读 路网数据是进行网络分析的基础&#xff0c;它是建立网络数据集的数据来源。 本文我们以OSM路网数据为例&#xff0c;详细介绍OSM路网数据从下载&#xff0c;到数据处理&#xff0c;添加属性&#xff0c;完成符合网络分析的网络数据集的全部过程。 01 数据获取 比较…

【0346】Postgres内核 Startup Process 通过 signal 与 postmaster 交互实现 (5)

1. Startup Process 进程 postmaster 初始化过程中, 在进入 ServerLoop() 函数之前,会先通过调用 StartChildProcess() 函数来开启辅助进程,这些进程的目的主要用来完成数据库的 XLOG 相关处理。 如: 核实 pg_wal 和 pg_wal/archive_status 文件是否存在Postgres先前是否发…

PYNQ 框架 - OV5640驱动 + Linux 驱动分析

目录 1. 简介 1.1 博文要点 1.2 V4L2 2. 极简 Char 驱动 2.1 源码 2.2 Makefile 2.3 加载驱动 2.4 设备文件 2.5 测试驱动程序 2.6 卸载驱动程序 2.7 自动创建设备文件 2.8 日志等级 3. 极简 V4L2 驱动 3.1 源码 3.2 Makefile 3.3 设备节点类型 3.4 测试 V4L2…

RNN详解及其实现

目录 概述为什么需要 RNN&#xff1f;RNN 理解及其简单实现RNN 完成文本分类任务RNN 存在的问题 概述 提及 RNN&#xff0c;绝大部分人都知道他是一个用于序列任务的神经网络&#xff0c;会提及他保存了时序信息&#xff0c;但是&#xff0c;为什么需要考虑时序的信息&#xf…

Redis开发03:常见的Redis命令

1.输入以下命令&#xff0c;启动redis。 sudo service redis-server start 如果你是直接安装在WSL的&#xff0c;搜索栏搜索Ubuntu或者点击左下角Windows图表找到U那一栏&#xff0c;直接打开Ubentu&#xff0c;输入账密后&#xff0c;输入“sudo service redis-server start”…

【webrtc】 mediasoup中m77的IntervalBudget及其在AlrDetector的应用

IntervalBudget 用于带宽控制和流量整形 mediasoup中m77 代码的IntervalBudget ,版本比较老IntervalBudget 在特定时间间隔内的比特预算管理,从而实现带宽控制和流量整形。 一。 pacedsender 执行周期: 下一次执行的时间的动态可变的 int64_t PacedSender::TimeUntilNextPr…

Docker for Everyone Plus——No Enough Privilege

直接告诉我们flag在/flag中&#xff0c;访问第一小题&#xff1a; sudo -l查看允许提权执行的命令&#xff1a; 发现有image load命令 题目指明了有rz命令&#xff0c;可以用ZMODEM接收文件&#xff0c;看到一些write up说可以用XShell、MobaXterm、Tabby Terminal等软件连接上…

SpringMVC工作原理【流程图+文字详解SpringMVC工作原理】

SpringMVC工作原理 前端控制器&#xff1a;DispactherServlet处理器映射器&#xff1a;HandlerMapping处理器适配器&#xff1a;HandlerAdapter处理器&#xff1a;Handler&#xff0c;视图解析器&#xff1a;ViewResolver视图&#xff1a;View 首先用户通过浏览器发起HTTP请求…

网络安全 社会工程学 敏感信息搜集 密码心理学攻击 密码字典生成

网络安全 社会工程学 敏感信息搜集 密码心理学攻击 理解社会工程学的概念掌握获取敏感信息的方法提高自我信息保护的意识和方法理解密码心理学的概念理解密码特征分析掌握黑客猜解密码的切入方法掌握如何提高密码强壮性 敏感信息搜集 「注」由于对实验环境的限制&#xff0c;…

【机器学习】机器学习的基本分类-监督学习-逻辑回归-对数似然损失函数(Log-Likelihood Loss Function)

对数似然损失函数&#xff08;Log-Likelihood Loss Function&#xff09; 对数似然损失函数是机器学习和统计学中广泛使用的一种损失函数&#xff0c;特别是在分类问题&#xff08;例如逻辑回归、神经网络&#xff09;中应用最为广泛。它基于最大似然估计原理&#xff0c;通过…

SQL基础入门 —— SQL概述

目录 1. 什么是SQL及其应用场景 SQL的应用场景 2. SQL数据库与NoSQL数据库的区别 2.1 数据模型 2.2 查询语言 2.3 扩展性 2.4 一致性与事务 2.5 使用场景 2.6 性能与扩展性 总结 3. 常见的SQL数据库管理系统&#xff08;MySQL, PostgreSQL, SQLite等&#xff09; 3.…

力扣--LCR 149.彩灯装饰记录I

题目 代码 /** Definition for a binary tree node. public class TreeNode { int val;TreeNode left;TreeNode right;TreeNode() {}TreeNode(int val) { this.val val; }TreeNode(int val, TreeNode left, TreeNode right) {this.val val;this.left left;this.right ri…

Admin.NET框架使用宝塔面板部署步骤

文章目录 Admin.NET框架使用宝塔面板部署步骤&#x1f381;框架介绍部署步骤1.Centos7 部署宝塔面板2.部署Admin.NET后端3.部署前端Web4.访问前端页面 Admin.NET框架使用宝塔面板部署步骤 &#x1f381;框架介绍 Admin.NET 是基于 .NET6 (Furion/SqlSugar) 实现的通用权限开发…

软通动力携子公司鸿湖万联、软通教育助阵首届鸿蒙生态大会成功举办

11月23日中国深圳&#xff0c;首届鸿蒙生态大会上&#xff0c;软通动力及软通动力子公司鸿湖万联作为全球智慧物联网联盟&#xff08;GIIC&#xff09;理事单位、鸿蒙生态服务&#xff08;深圳&#xff09;有限公司战略合作伙伴&#xff0c;联合软通教育深度参与了大会多项重磅…

利用若依代码生成器实现课程管理模块开发

目录 前言1. 环境准备1.1 数据库表设计与导入 2. 使用若依代码生成器生成模块代码2.1 导入数据库表2.2 配置生成规则2.2.1 基本信息配置2.2.2 字段信息配置2.2.3 生成信息配置 3. 下载与集成生成代码3.1 解压与集成3.2 启动项目并验证 4. 优化与扩展4.1 前端优化4.2 后端扩展 结…

MySQL Linux 离线安装

下载 进入官网&#xff0c;下载对应的需要MySQL版本&#xff0c;这里是历史版本。 官网 选择第一个MySQL Community Sever社区版&#xff0c;因为这个是免费的。 选择需要的对应版本&#xff1a; 安装 1.将下载好的安装包上传到服务器端 使用FinalShell 客户端连接服务器 …