workflow笔记

workflow 介绍

搜狗公司C++服务器引擎,编程范式。支撑搜狗几乎所有后端C++在线服务,包括所有搜索服务,云输入法,在线广告等,每
日处理数百亿请求。这是一个设计轻盈优雅的企业级程序引擎,可以满足大多数后端与嵌入式开发需求。

特征:
快速搭建 http 服务器。
可异步访问常见第三方服务:http,redis,mysql 和kafka。
构建异步任务流,支持常用的串并联,也支持更加复杂的DAG结构。
作为并行计算工具使用。除了网络任务,我们也包含计算任务的调度。所有类型的任务都可以放入同一个流中。
在Linux系统下作为文件异步IO工具使用,性能超过任何标准调用。磁盘IO也是一种任务。
实现任何计算与通讯关系非常复杂的高性能高并发的后端服务。

workflow 编译安装

git clone https://github.com/sogou/workflow # From
gitee: git clone https://gitee.com/sogou/workflow
cd workflow
make
cd tutorial
make

workflow 编程范式

抽象出来的模型: 程序 = 协议 + 算法 + 任务流

协议

大多数情况下,用户使用的是内置的通用网络协议,例如http,redis或各种rpc。
用户可以方便的自定义网络协议,只需提供序列化和反序列化函数,就可以定义出自己的client/server。

算法

在我们的设计里,算法是与协议对称的概念。如果说协议的调用是rpc,算法的调用就是一次apc(Async Procedure Call)。
我们提供了一些通用算法,例如sort,merge,psort,reduce,可以直接使用。
与自定义协议相比,自定义算法的使用要常见得多。任何一次边界清晰的复杂计算,都应该包装成算法

任务流

在workflow当中所有的任务都是异步处理的
任务隐藏了若干个异步执行流

任务流就是实际的业务逻辑,就是把开发好的协议与算法放在流程图里使用起来。
典型的任务流是一个闭合的串并联图。复杂的业务逻辑,可能是一个非闭合的DAG。
任务流图可以直接构建,也可以根据每一步的结果动态生成。所有任务都是异步执行的。

请求回应模式

workflow是一种请求回应的模式,服务端不会主动给客户端推数据,必须要发起一个请求,而且这个请求是一个一个的发送,如果没有得到回应不要继续发请求。
比如 http

结构化并发与任务隐藏

  • 我们系统中包含五种基础任务:通讯,计算,文件IO,定时器,计数器
  • 一切任务都由任务工厂产生,用户通过调用接口组织并发结构。例如串联并联,DAG等。
  • 大多数情况下,用户通过任务工厂产生的任务,都隐藏了多个异

例如,一次http请求,可能包含许多次异步过程(DNS,重定向),但对用户来讲,就是一次通信任务。
文件排序,看起来就是一个算法,但其实包括复杂的文件IO与CPU计算的交互过程。
如果把业务逻辑想象成用设计好的电子元件搭建电路,那么每个电子元件内部可能又是一个复杂电路。
任务隐藏机制大幅减少了用户需要创建的任务数量和回调深度。

任何任务都运行在某个串行流(series)里,共享series上下文,让异步任务之间数据传递变得简单

workflow的串联执行

使用tutorial里面hello world的例子

#include <stdio.h>
#include "workflow/WFHttpServer.h"
void mul(int a , int b , int& res) {
    res = a * b;
}
int main()
{
    WFHttpServer server([](WFHttpTask *task) {
        task->get_resp()->append_output_body("<html>Hello World!</html>");
        int a = 533;
        int b = 7;
        int res;
        WFGoTask* go = WFTaskFactory::create_go_task("test" , mul , a , b , res);
        go->set_callback([&](WFGoTask* t){
            protocol::HttpResponse* resp = (protocol::HttpResponse*)t->user_data;
            char buf[1024] = {0};
            sprintf(buf , "<br> 7 * 533 = %d<br/>" , res);
            resp->append_output_body(buf);
        });
        go->user_data = task->get_resp();
        series_of(task)->push_back(go); // 串联执行
    });
    if (server.start(8889) == 0) { // start server on port 8888
        getchar(); // press "Enter" to end.
        server.stop();
    }
    return 0;
}

当我们再次请求的时候发现 , 响应中追加了一行乘法结果
在这里插入图片描述

workflow的并行执行

tutorial-06-parallel_wget.cc

workflow的DAG执行

tutorial-11-graph_task.cc
在这里插入图片描述
内部重载了–>

抽象层次

  • 流 (添加任务,添加上下文,设置回调)
  • 任务 WFTaskFactory 中定义的基础任务
  • 任务流 SubTask
class Workflow
{
public:
	static SeriesWork *
	create_series_work(SubTask *first, series_callback_t callback);  // 串行流

	static void
	start_series_work(SubTask *first, series_callback_t callback); // 这个任务流里面所有的任务执行完之后会调用这个回调函数

	static ParallelWork *
	create_parallel_work(parallel_callback_t callback);   // 并行流

	static ParallelWork *
	create_parallel_work(SeriesWork *const all_series[], size_t n,
						 parallel_callback_t callback);

	static void
	start_parallel_work(SeriesWork *const all_series[], size_t n,
						parallel_callback_t callback);

public:
	static SeriesWork *
	create_series_work(SubTask *first, SubTask *last,
					   series_callback_t callback);

	static void
	start_series_work(SubTask *first, SubTask *last,
					  series_callback_t callback);
};

在这里插入图片描述
在这里插入图片描述

回调与内存回收机制

  • 一切调用都是异步执行,几乎不存在占着线程等待的操作。
  • 显式的回调机制。用户清楚自己在写异步程序。
  • 通过一套对象生命周期机制,大幅简化异步程序的内存管理

任何框架创建的任务,生命周期都是从创建到callback函数运行结束为止。没有泄漏风险。
如果创建了任务之后不想运行,则需要通过dismiss()接口删除。
任务中的数据,例如网络请求的resp,也会随着任务被回收。此时用户可通过std::move()把需要的数据移走。
项目中不使用任何智能指针来管理内存。代码观感清新。

  • 尽量避免用户级别派生,以std::function封装用户行为,包括:

任何任务的callback。
任何server的process。符合FaaS(Function as aService)思想。
一个算法的实现,简单来讲也是一个std::function。
如果深入使用,又会发现一切皆可派生。

线程模型

在这里插入图片描述

主线程

4个网络线程

poller 做那么几件事情
1 检测条件是否满足,如果是网络的话,组装完整的数据,然后抛出队列

20个工作线程池

从任务队列中取出任务,执行server注册的回调函数 ,如果这个过程中新生成了net/timer/file io的时间需要去监听注册,则直接调用epoll_ctl 加入到某个poller线程里面去 , 如果有计算密集的任务则将任务加入到go线程的任务队列里面去给go线程处理

8个计算线程池

专门处理cpu类型的任务

应用场景

高扇出场景

搜索服务一般都是高扇出场景;它需要调用许多下游模块多,考验网络通信框架的调度能力。
高扇出解释:一个节点与许多其他节点存在大量连接的情况。
高扇出的痛点:吞吐与长尾;提升吞吐需要提高单个请求的响应速度,所以需要尽量少切换网络收发线程,但是不切换容易导致处理的慢的资源堆积,长尾问题就很明显。

多 client 混用场景

同时访问 redis/mysql/kafka 的数据管理需求。workflow 实现了常见的网络协议,不同协议的任务可以在底层调度层无缝打通。

SRPC

业务层协议 IDL 使用了 protobuf,需要对里边定义的service 支持简单的计算功能,同时支持可以异步执行,如何能够快速搭建这样的服务。基于Workflow做底层调度的生态项目,拥有Workflow的网络性能优势,且自生成service接口,因此可以让 service 接口底层打通 Workflow 底层的异步 server 功能。除此之外使 用案例 2 中的其他协议或者计算任务都很方便。

嵌入式领域

服务治理场景(服务发现)

自定义协议接入

场景:公网接入网需要 http 协议,但是后端服务是自定义私有协议,接入层需要自己开发。
常见的办法是使用 nginx,开发 ngx_module_t,但 nginx的网络有 11 个阶段,内部资源纯自行管理,模块代码与框架代码完全耦合到一起,开发起来非常困难,出了问题也很难排查。
workflow 可以派生基本网络层,实现消息的序列化/反序列化接口,即得到自定义协议的任务。然后启动 http server,创建自定义任务,再利用案例 4 中提到的转发功能,即可得到一个自定义协议接入层,转发性能无损耗。

workflow三板斧

抽象粒度合适的任务

通过任务流组织任务, 串联,并联,DAG

协调任务

counter 递减的“信号量”
conditional 条件变量
resource pool 类似信号量
message queue 类似消息队列
不占用线程的方式,条件触发

网络模块的设计

多线程网络

workflow会创建4个poller线程 , 将listen fd % poller线程数,落入其中的一个poller线程里面
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
accept一个新的连接之后 ,又会将clientfd % poller线程数量,去均衡的加入到一个poller线程当中去处理后续的网络事件

epoll三个系统调用是否线程安全

都是线程安全的

线程状态

1 epoll_wait
2 正在处理事件

工作线程调用epoll_ctl 是否安全

增加 : 一个epoll里面本来没有这个fd , 增加一个是没问题的
修改 :没有问题,epoll本身有这个fd , 把读事件改为写事件如connect ,或者把写事件改为读事件。
删除 : 有问题。 其中一个线程正在处理epoll_wait返回的事件循环,在这个过程中也可能执行删除的操作,另外一个线程同样执行删除的操作,内核就会报错。事件循环的那个线程要下一次epoll_wait的时候才能感知到另外一个线程的删除操作

删除操作通过管道pipe去解决。
在这里插入图片描述
工作线程把删除的操作通过管道传递给epoll_wait线程,让删除的操作串行执行。

队列设计

workflow使用有锁队列

workflow每天要处理成千上万的高并发请求,竞争相当的激烈,且任务的执行时间也比较长。
在这里插入图片描述
如果get队列为空,get队列会和put队里做一个转换,生产者只往put队列添加任务,消费者只从get队列拿任务。

好处:生产者线程和消费者线程碰撞的概率很低,把多对多的问题转换成了多对一的问题。只有当消费者线程发现get队列为空的时候,会锁定put队列,这时候才会发生碰撞,然后把put队列里面的任务拷贝到get队列里面去。
设计的相当优雅,提升并发性

队列元素通用性设计

在这里插入图片描述
1 不限制节点的类型
2 对节点只有一个约束: 节点偏移linkoff 字节之后有一个指针类型用于链接下一个节点
在这里插入图片描述

关于无锁队列

什么时候使用无锁队列:
1 资源竞争不那么激烈的时候
2 任务执行时间比较短

线程池设计

在这里插入图片描述
网络线程当中添加一个线程任务给线程池调度,context是Communicator的this指针
在这里插入图片描述
线程任务中执行一个死循环
在这里插入图片描述
如果在swap的时候发现put队列里面没有任务,就会阻塞线程池里面的线程
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/935133.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【Vulkan入门】09-CreateFrameBuffer

目录 先叨叨git信息关键代码VulkanEnv::FindHostVisitbaleMemoryTypeIndex()TestPipeLine::CreateFramebuffers() 与网上大多数文章不同&#xff0c;其他文章基本上都使用窗口框架&#xff08;X11、GLFW、WSL等&#xff09;提供的surface来显示Vulkan渲染出的图像。我认为那样会…

【人工智能】5G-A技术及应用

文章目录 前言一、5G-A基本概念及产业进展1、5G-A概述2、移动通信发展历史&#xff1a;不断扩大联结规模&#xff0c;扩展业务边界的过程3、标准Ready:首版本R18将于2024年H1冻结4、标准Ready:IMT2020完成5G-A技术测试5、频谱Ready:超大带宽是实现万兆体验的基础6、5G-A全球商用…

与 Cursor AI 对话编程:2小时开发报修维修微信小程序

本文记录了如何通过与 Cursor AI 对话&#xff0c;全程不写一行代码的情况下&#xff0c;完成一个完整的报修小程序。整个过程展示了 AI 如何帮助我们&#xff1a; 生成代码 、解决问题、优化实现、完善细节。 先看一下效果图&#xff1a; 一、项目配置 首先我是这样和 AI 对…

多模态大语言模型 MLLM 部署微调实践

1 MLLM 1.1 什么是 MLLM 多模态大语言模型&#xff08;MultimodalLargeLanguageModel&#xff09;是指能够处理和融合多种不同类型数据&#xff08;如文本、图像、音频、视频等&#xff09;的大型人工智能模型。这些模型通常基于深度学习技术&#xff0c;能够理解和生成多种模…

机器学习:全面学习路径指南

摘要&#xff1a; 本文精心规划了一条从入门到精通机器学习的学习路线&#xff0c;详细涵盖了基础理论构建、核心技术栈掌握、主流算法学习、实践项目锻炼以及前沿领域探索等多个关键阶段。通过逐步深入各个层面&#xff0c;介绍必备的数学知识、编程工具、经典与现代机器学习算…

Kingbase V8R6 数据库自动(逻辑)备份、删除脚本-Linux

脚本说明 1.该脚本为Linux环境下自动备份、删除Kingbase数据库备份脚本&#xff08;逻辑备份&#xff09;&#xff1b; 2.执行脚本前&#xff0c;请先对脚本进行修改后&#xff0c;再使用。脚本效果 1.执行脚本时&#xff0c;若备份目录不存在&#xff0c;则自动创建备份目录…

网络应用技术 实验六:通过 DHCP 管理园区网 IP 地址(华为ensp)

一、实验简介 构建园区网&#xff0c;通过 DHCP 服务器为全网的用户主机提供 IP 地址。 二、实验目的 1 、理解 DHCP 的工作原理&#xff1b; 2 、掌握 DHCP 服务器的创建和配置方法&#xff1b; 3 、掌握将 VirtualBox 虚拟机引入 eNSP 的方法&#xff1b; …

Elasticsearch使用(2):docker安装es、基础操作、mapping映射

1 安装es 1.1 拉取镜像 docker pull swr.cn-north-4.myhuaweicloud.com/ddn-k8s/docker.io/library/elasticsearch:7.17.3 1.2 运行容器 运行elasticsearch容器&#xff0c;挂载的目录给更高的权限&#xff0c;否则可能会因为目录权限问题导致启动失败&#xff1a; docker r…

Flink 核心知识总结:窗口操作、TopN 案例及架构体系详解

目录 一、FlinkSQL 的窗口操作 &#xff08;一&#xff09;窗口类型概述 &#xff08;二&#xff09;不同时间语义下窗口实践 EventTime&#xff08;事件时间&#xff09; ProcessTime&#xff08;处理时间&#xff09; 二、窗口 TopN 案例解析 三、Flink架构体系 &…

Vscode配置自动切换node版本

Vscode配置自动切换node版本 问题描述 开发环境安装了很多Node JS版本&#xff0c;项目经常切换也常常忘记了使用了什么版本&#xff0c;所以最好在打开项目terminal&#xff0c;安装依赖&#xff0c;启动项目前自动设置好版本 具体配置 .vscode/settings.json中,添加如下代…

【Linux 篇】Docker 的容器之海与镜像之岛:于 Linux 系统内探索容器化的奇妙航行

文章目录&#xff1a; 【Linux 篇】Docker 的容器之海与镜像之岛&#xff1a;于 Linux 系统内探索容器化的奇妙航行前言安装docker-centos7 【Linux 篇】Docker 的容器之海与镜像之岛&#xff1a;于 Linux 系统内探索容器化的奇妙航行 &#x1f4ac;欢迎交流&#xff1a;在学习…

leetcode108.将有序数组转换为二叉搜索树

标签&#xff1a;二叉搜索树 给你一个整数数组 nums &#xff0c;其中元素已经按 升序 排列&#xff0c;请你将其转换为一棵平衡二叉搜索树。 示例 1&#xff1a; 输入&#xff1a;nums [-10,-3,0,5,9] 输出&#xff1a;[0,-3,9,-10,null,5] 解释&#xff1a;[0,-10,5,null,…

C# 探险之旅:第二节 - 定义变量与变量赋值

欢迎再次踏上我们的C#学习之旅。今天&#xff0c;我们要聊一个超级重要又好玩的话题——定义变量与变量赋值。想象一下&#xff0c;你正站在一个魔法森林里&#xff0c;手里拿着一本空白的魔法书&#xff08;其实就是你的代码编辑器&#xff09;&#xff0c;准备记录下各种神奇…

基于事件驱动的websocket简单实现

websocket的实现 什么是websocket&#xff1f; WebSocket 是一种网络通信协议&#xff0c;旨在为客户端和服务器之间提供全双工、实时的通信通道。它是在 HTML5 规范中引入的&#xff0c;可以让浏览器与服务器进行持久化连接&#xff0c;以便实现低延迟的数据交换。 WebSock…

libaom 源码分析:av1_rd_use_partition 函数

libaom libaom 是 AOMedia Video 1 (AV1) 视频编码格式的参考实现库,由 Alliance for Open Media (AOMedia) 开发和维护。AV1 是一个高效、开放、免专利授权的下一代视频编解码标准,设计目标是提供较高的视频压缩效率,同时适配各种分辨率、码率和平台。下载:git clone http…

如何恢复使用 Cursor 免费试用

当用户尝试创建过多免费试用账户时&#xff0c;会收到提示&#xff1a;“Too many free trial accounts used on this machine. Please upgrade to pro.” 这限制了用户的试用次数。AI大眼萌帮助大家绕过 Cursor 的设备指纹验证&#xff0c;以继续享受免费试用。 &#x1f6a8;…

【Excel学习记录】01-认识Excel

1.之前的优秀软件Lotus-1-2-3 默认公式以等号开头 兼容Lotus-1-2-3的公式写法&#xff0c;不用写等号 &#xff1a; 文件→选项→高级→勾选&#xff1a;“转换Lotus-1-2-3公式(U)” 备注&#xff1a;对于大范围手动输入公式可以使用该选项&#xff0c;否则请不要勾选&#x…

网络安全——防火墙

基本概念 防火墙是一个系统&#xff0c;通过过滤传输数据达到防止未经授权的网络传输侵入私有网络&#xff0c;阻止不必要流量的同时允许必要流量进入。防火墙旨在私有和共有网络间建立一道安全屏障&#xff0c;因为网上总有黑客和恶意攻击入侵私有网络来破坏&#xff0c;防火…

kafka进阶_4.kafka扩展

文章目录 一、Controller选举二、Kafka集成2.1、大数据应用场景2.1.1、Flume集成2.1.2、Spark集成2.1.3、Flink集成 2.2、Java应用场景(SpringBoot集成) 三、Kafka常见问题3.1、Kafka都有哪些组件&#xff1f;3.2、分区副本AR, ISR, OSR的含义&#xff1f;3.3、Producer 消息重…

OpenAI 12Days 第二天 强化微调(RFT):推动语言模型在科学研究中的应用

OpenAI 12Days 第二天 强化微调&#xff08;RFT&#xff09;&#xff1a;推动语言模型在科学研究中的应用 文章目录 OpenAI 12Days 第二天 强化微调&#xff08;RFT&#xff09;&#xff1a;推动语言模型在科学研究中的应用RFT的工作原理与应用领域案例研究&#xff1a;基因突变…