workflow源码解析:ThreadTask

1、使用程序,一个简单的加法运算程序

#include <iostream>
#include <workflow/WFTaskFactory.h>
#include <errno.h>

// 直接定义thread_task三要素
// 一个典型的后端程序由三个部分组成,并且完全独立开发。即:程序=协议+算法+任务流。

// 定义INPUT
struct AddInput
{
    int x;
    int y;
};

// 定义OUTPUT
struct AddOutput
{
    int res;
};

// 加法流程
void add_routine(const AddInput *input, AddOutput *output)
{
    output->res = input->x + input->y;
}

using AddTask = WFThreadTask<AddInput, AddOutput>;

void callback(AddTask *task)
{
	auto *input = task->get_input();
	auto *output = task->get_output();

	assert(task->get_state() == WFT_STATE_SUCCESS);

    fprintf(stderr, "%d + %d = %d\n", input->x, input->y, output->res);
}

int main()
{
    using AddFactory = WFThreadTaskFactory<AddInput, AddOutput>;
	AddTask *task = AddFactory::create_thread_task("add_task",
												add_routine,
												callback);
	AddInput *input = task->get_input();

	input->x = 1;
	input->y = 2;

	task->start();

	getchar();
	return 0;
}

2、类继承关系

WFThreadTaskFactory代码

// src/factory/WFTaskFactory.h
template<class INPUT, class OUTPUT>
class WFThreadTaskFactory
{
private:
	using T = WFThreadTask<INPUT, OUTPUT>;
    ...
public:
	static T *create_thread_task(const std::string& queue_name,
								 std::function<void (INPUT *, OUTPUT *)> routine,
								 std::function<void (T *)> callback);

    ...
};
// src/factory/WFTaskFactory.inl
template<class INPUT, class OUTPUT>
WFThreadTask<INPUT, OUTPUT> *
WFThreadTaskFactory<INPUT, OUTPUT>::create_thread_task(const std::string& queue_name,
						std::function<void (INPUT *, OUTPUT *)> routine,
						std::function<void (WFThreadTask<INPUT, OUTPUT> *)> callback)
{
	return new __WFThreadTask<INPUT, OUTPUT>(WFGlobal::get_exec_queue(queue_name),
											 WFGlobal::get_compute_executor(),
											 std::move(routine),
											 std::move(callback));
}

__WFThreadTask代码

// src/factory/WFTaskFactory.inl
template<class INPUT, class OUTPUT>
class __WFThreadTask : public WFThreadTask<INPUT, OUTPUT>
{
protected:
	virtual void execute()  //实现ExecSession的纯虚函数
	{
		this->routine(&this->input, &this->output); //执行用户程序的routine
	}

protected:
	std::function<void (INPUT *, OUTPUT *)> routine;

public:
	__WFThreadTask(ExecQueue *queue, Executor *executor,
				   std::function<void (INPUT *, OUTPUT *)>&& rt,
				   std::function<void (WFThreadTask<INPUT, OUTPUT> *)>&& cb) :
		WFThreadTask<INPUT, OUTPUT>(queue, executor, std::move(cb)),
		routine(std::move(rt))
	{
	}
};

WFThreadTask代码

// src/factory/WFTask.h
template<class INPUT, class OUTPUT>
class WFThreadTask : public ExecRequest
{
public:
	void start();
	void dismiss();

	INPUT *get_input() { return &this->input; }
	OUTPUT *get_output() { return &this->output; }

	void *user_data;

	int get_state() const { return this->state; }
	int get_error() const { return this->error; }

	void set_callback(std::function<void (WFThreadTask<INPUT, OUTPUT> *)> cb);
protected:
	virtual SubTask *done();

protected:
	INPUT input;
	OUTPUT output;
	std::function<void (WFThreadTask<INPUT, OUTPUT> *)> callback;

public:
	WFThreadTask(ExecQueue *queue, Executor *executor,
				 std::function<void (WFThreadTask<INPUT, OUTPUT> *)>&& cb) :
		ExecRequest(queue, executor),
		callback(std::move(cb))
	{
        // 初始化
	}

protected:
	virtual ~WFThreadTask() { }
};

ExecRequest代码

// src/kernel/ExecRequest.h
class ExecRequest : public SubTask, public ExecSession
{
public:
	ExecRequest(ExecQueue *queue, Executor *executor);
	ExecQueue *get_request_queue() const { return this->queue; }
	void set_request_queue(ExecQueue *queue) { this->queue = queue; }
	virtual void dispatch()  // 实现SubTask的纯虚函数,这个纯虚函数主要是任务的开始执行接口
	{
		this->executor->request(this, this->queue);
		...
	}

protected:
	int state;
	int error;

	ExecQueue *queue;
	Executor *executor;

protected:
	virtual void handle(int state, int error); // 实现ExecSession的纯虚函数
};

SubTask代码

class SubTask
{
     // 子任务被调起的时机
     virtual void dispatch() = 0;
     // 子任务执行完成的时机
     virtual SubTask *done() = 0;
     // 内部实现,决定了任务流走向
     void subtask_done();
     ...
};

ExecSession代码

/src/kernel/Executor.h
class ExecSession
{
private:
	virtual void execute() = 0;
	virtual void handle(int state, int error) = 0;

protected:
	ExecQueue *get_queue() { return this->queue; }

private:
	ExecQueue *queue;
    ...
};

继承关系图

__WFThreadTask__目前还未用到,暂不清楚

在这里插入图片描述

3、两个重要成员: ExecQueue, Executor

ExecQueue代码

/src/kernel/Executor.h
class ExecQueue
{
    ...
private:
	struct list_head task_list;
	pthread_mutex_t mutex;
};

Executor代码

/src/kernel/Executor.h
class Executor
{
public:
    // 一次要执行的接口,对于线程执行器来说,就是把一个执行任务扔进某个队列中
    int request(ExecSession *session, ExecQueue *queue);

private:
    // 执行器和系统资源,是一个包含关系
    thrdpool_t *thrdpool;
};

request() 函数把任务扔进线程池队列等待执行,线程池会从队列拿到这个任务,然后执行executor_thread_routine

// src/kernel/Executor.cc
int Executor::request(ExecSession *session, ExecQueue *queue)
{
    ExecSessionEntry *entry = new ExecSessionEntry;

	session->queue = queue;
	entry->session = session;
	entry->thrdpool = this->thrdpool;
	queue->mutex.lock();
	list_add_tail(&entry->list, &queue->session_list);
	if (queue->session_list.next == &entry->list)
	{
		struct thrdpool_task task = {Executor::executor_thread_routine, queue};
		/*
		{
			.routine	=	Executor::executor_thread_routine,
			.context	=	queue
		};
		*/
		if (thrdpool_schedule(&task, this->thrdpool) < 0)
		{
			list_del(&entry->list);
			delete entry;
			entry = NULL;
		}
	}

	queue->mutex.unlock();
	return -!entry;
}
struct ExecSessionEntry
{
	struct list_head list;
	ExecSession *session;
	thrdpool_t *thrdpool;
};
// src/kernel/Executor.cc
void Executor::executor_thread_routine(void *context)
{
	ExecQueue *queue = (ExecQueue *)context;
	ExecSessionEntry *entry;
	ExecSession *session;

	queue->mutex.lock();
	entry = list_entry(queue->session_list.next, ExecSessionEntry, list);
	list_del(&entry->list);
	session = entry->session;
	if (!list_empty(&queue->session_list))
	{
		struct thrdpool_task task = {Executor::executor_thread_routine, queue};
		/*
		{
			.routine	=	Executor::executor_thread_routine,
			.context	=	queue
		};
		*/
		__thrdpool_schedule(&task, entry, entry->thrdpool);
	}
	else
		delete entry;

	queue->mutex.unlock();
	session->execute(); //这里会执行到用户routine
	session->handle(ES_STATE_FINISHED, 0);
}

4、参考链接

https://github.com/chanchann/workflow_annotation/blob/main/src_analysis/12_thread_task.md
https://blog.csdn.net/j497205974/article/details/135554164?spm=1001.2014.3001.5502

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/327447.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【机器学习入门】机器学习基础概念与原理

*&#xff08;本篇文章旨在帮助新手了解机器学习的基础概念和原理&#xff0c;不深入讨论算法及核心公式&#xff09; 目录 一、机器学习概念 1、什么是机器学习&#xff1f; 2、常见机器学习算法和模型 3、使用Python编程语言进行机器学习实践 4、机器学习的应用领域 二…

Dokerfile

阅读目录 什么是dockerfile?Dockerfile的基本结构Dockerfile文件说明 什么是dockerfile? Dockerfile是一个包含用于组合映像的命令的文本文档。可以使用在命令行中调用任何命令。 Docker通过读取Dockerfile中的指令自动生成映像。 docker build命令用于从Dockerfile构建映…

Android studio RecyclerView 应用设计

一、创建empty activity项目: 二、打开activity_main.xml布局文件: 添加RecyclerView控件 <?xml version="1.0" encoding="utf-8"?> <androidx.constraintlayout.widget.ConstraintLayout xmlns:android="http://schemas.android.com/…

Spring Boot - Application Events 同步 VS 异步 发布订阅事件实战

文章目录 PreCode基础工程启动类切入口事件 发布事件同步 Listener异步Listener增加EnableAsync增加 Async 测试 Pre Spring Boot - Application Events 的发布顺序_ApplicationStartingEvent Spring Boot - Application Events 的发布顺序_ApplicationEnvironmentPreparedEv…

手机与电脑更改IP地址怎么使用代理IP?

在现代互联网时代&#xff0c;代理IP已成为许多人日常生活和工作中不可或缺的一部分。通过代理IP&#xff0c;用户可以隐藏自己的真实IP地址&#xff0c;并获得更好的网络体验。本文将详细介绍如何在手机和电脑上更改IP地址并使用代理IP。 一、手机使用代理IP 1. 打开手机设置&…

目标检测DETR:End-to-End Object Detection with Transformers

NMS 对一个目标生成了多个检测窗口&#xff0c;但是事实上这些窗口中大部分内容都是重复的&#xff0c;找到目标检测最优的窗口 选取多个检测窗口中分数最高的窗口&#xff0c;剔除掉其他同类型的窗口 anchor generator 首先在该点生成scale512, aspect ratio{1:2&#xff…

第10章 通信业务

文章目录 10.1.1 通信行业1、通信行业的界定2、通信行业的特点 10.1.2 通信企业10.1.3 通信终端1、通信终端的分类2、终端发展趋势 10.2.1 通信业务的定义及分类10.2.2 基础电信业务1、第一类基础电信业务A11 固定通信业务A12 蜂窝移动通信业务A13 第一类卫星通信业务A14 第一类…

uni-app购物车页面详细代码

效果图&#xff1a; 这里的购物车加减用的是uni-app中的sku插件 代码附下&#xff08;全&#xff09;&#xff1a; <script setup lang"ts"> import {reqMemberCartList,reqMemberdelentCart,reqMemberPutCart,putMemberCartSelectedAPI, } from /services/…

go语言初探(一)

package mainimport ("fmt""time" )func main() {fmt.Print("hello go!")time.Sleep(1 * time.Second)}运行后&#xff0c;结果如下&#xff1a; 1、golang表达式中&#xff0c;加&#xff1b;和不加&#xff1b;都可以 2、函数的{和函数名一…

解决com.alibaba.fastjson.JSONException: default constructor not found的问题

1.问题描述 在进行JSON和对象互转时&#xff0c;发现有个报错&#xff1a; com.alibaba.fastjson.JSONException: default constructor not found. class com.hellobike.ph.match.service.taxi.model.message.DelayAddSkuMsg 2.原因和解决方案 通过其提示可以看出在利用fastJ…

【RTOS】快速体验FreeRTOS所有常用API(7)任务通知

目录 七、任务通知7.1 基本概念7.2 发出通知7.3 等待通知7.4 实例 七、任务通知 该部分在上份代码基础上修改得来&#xff0c;代码下载链接&#xff1a; https://wwzr.lanzout.com/i4Efu1la39wh 密码:cbvx 该代码尽量做到最简&#xff0c;不添加多余的、不规范的代码。 内容主要…

白码ERP快速实现库存不足时自动生成采购单功能

创建生产订单时&#xff0c;系统自动根据产品所需物料库存是否充足&#xff0c;如有物料库存不足&#xff0c;自动生成对应的采购订单&#xff1b; 前期准备&#xff1a; 需创建产品、物料、BOM、生产订单、生产订单明细、需求物料、采购订单、采购订单明细数据表&#xff0c…

如何查看centos7中dataease的安装位置

在 CentOS 7 中查找 DataEase 的安装位置&#xff0c;可以通过以下步骤进行&#xff1a; 检查服务状态&#xff1a; 如果 DataEase 作为服务运行&#xff0c;您可以使用 systemctl 命令来查看服务的状态&#xff0c;这通常会显示相关的路径信息。例如&#xff1a; systemctl st…

开发企业微信中的内嵌h5时如何开发与调试

前言&#xff1a; 在我们的项目中&#xff0c;开发企业微信内部的项目的话&#xff0c;分为两种&#xff0c;1种是直接开发企业微信的小程序&#xff0c;另一种则是企业微信内嵌我们的H5界面&#xff0c;我们这里讲一讲企业微信内嵌h5的方法与注意点。 1、开发h5项目 这点没有…

文件操作一(非常重要)

文件操作一&#xff08;非常重要&#xff09; 一、为什么使用文件&#xff1f;二、什么是文件&#xff1f;三、文件名(简单理解)四、二进制文件和文本文件&#xff08;重要&#xff09;五、流的概念&#xff08;非常重要&#xff09;六、文件的打开和关闭七、文件的顺序读写函数…

C++I/O流——(2)预定义格式的输入/输出(第二节)

归纳编程学习的感悟&#xff0c; 记录奋斗路上的点滴&#xff0c; 希望能帮到一样刻苦的你&#xff01; 如有不足欢迎指正&#xff01; 共同学习交流&#xff01; &#x1f30e;欢迎各位→点赞 &#x1f44d; 收藏⭐ 留言​&#x1f4dd; 含泪播种的人一定能含笑收获&#xff…

安达发|APS工序排程甘特图功能介绍

工序排程甘特图的主要功能 1. 显示工序时间安排&#xff1a;工序排程甘特图可以清晰地展示生产过程中各个工序的开始时间、结束时间和持续时间&#xff0c;从而帮助企业了解生产过程中各个环节的时间安排。 2. 显示工序进度情况&#xff1a;通过工序排程甘特图&#xff0c;企业…

用Python“自动连发消息”

自动连发消息&#xff0c;基本上C和Python的思路都是不停的模拟“击键”操作&#xff0c;还有一种VB的脚本写法&#xff0c;反成每种语言都能写&#xff0c;更厉害的可以用java做出个GUI界面&#xff0c;先上代码。 一 代码 import pyautogui # 鼠标 import p…

Springboot+vue的智能无人仓库管理(有报告),Javaee项目,springboot vue前后端分离项目

演示视频&#xff1a; Springbootvue的智能无人仓库管理&#xff08;有报告&#xff09;&#xff0c;Javaee项目&#xff0c;springboot vue前后端分离项目 项目介绍&#xff1a; 本文设计了一个基于Springbootvue的前后端分离的智能无人仓库管理&#xff0c;采用M&#xff08…

华为设备VRRP配置

核心代码&#xff1a; 需要对所有虚拟路由器设置&#xff08;要进入到对应的端口&#xff09; vrrp vrid 38 virtual-ip 192.168.10.254 vrrp vrid 38 priority 120 vrrp vrid 38 track int g0/0/1 reduced 30①mac由vrid生成 ②指定虚拟ip ③虚拟ip作为内部主机的网关&#x…