ACK One Argo工作流:实现动态 Fan-out/Fan-in 任务编排

作者:庄宇

什么是 Fan-out Fan-in

在工作流编排过程中,为了加快大任务处理的效率,可以使用 Fan-out Fan-in 任务编排,将大任务分解成小任务,然后并行运行小任务,最后聚合结果。

图片

由上图,可以使用 DAG(有向无环图)编排 Fan-out Fan-in 任务,子任务的拆分方式分为静态和动态,分别对应静态 DAG 和动态 DAG。动态 DAG Fan-out Fan-in 也可以理解为 MapReduce。每个子任务为 Map,最后聚合结果为 Reduce。

静态 DAG: 拆分的子任务分类是固定的,例如:在数据收集场景中,同时收集数据库 1 和数据库 2 中的数据,最后聚合结果。

动态 DAG: 拆分的子任务分类是动态的,取决于前一个任务的输出结果,例如:在数据处理场景中,任务 A 可以扫描待处理的数据集,为每个子数据集(例如:一个子目录)启动子任务 Bn 处理,当所有子任务 Bn 运行结束后,在子任务 C 中聚合结果,具体启动多少个子任务 B 取决由任务 A 的输出结果。根据实际的业务场景,可以在任务 A 中自定义子任务的拆分规则。

ACK One 分布式工作流 Argo 集群

在实际的业务场景中,为了加快大任务的执行,提升效率,往往需要将一个大任务分解成数千个子任务,为了保证数千个子任务的同时运行,需要调度数万核的 CPU 资源,叠加多任务需要竞争资源,一般 IDC 的离线任务集群难以满足需求。例如:自动驾驶仿真任务,修改算法后的回归测试,需要对所有驾驶场景仿真,每个小驾驶场景的仿真可以由一个子任务运行,开发团队为加快迭代速度,要求所有子场景测试并行执行。

如果您在数据处理,仿真计算和科学计算等场景中,需要使用动态 DAG 的方式编排任务,或者同时需要调度数万核的 CPU 资源加快任务运行,您可以使用阿里云 ACK One 分布式工作流 Argo 集群 [ 1]

ACK One 分布式工作流 Argo 集群,产品化托管 Argo Workflow [ 2] ,提供售后支持,支持动态 DAG Fan-out Fan-in 任务编排,支持按需调度云上算力,利用云上弹性,调度数万核 CPU 资源并行运行大规模子任务,减少运行时间,运行完成后及时回收资源节省成本。支持数据处理,机器学习,仿真计算,科学计算,CICD 等业务场景。

Argo Workflow 是开源 CNCF 毕业项目,聚焦云原生领域下的工作流编排,使用 Kubernetes CRD 编排离线任务和 DAG 工作流,并使用 Kubernetes Pod 在集群中调度运行。

本文介绍使用 Argo Workflow 编排动态 DAG Fan-out Fan-in 任务。

Argo Workflow 编排 Fan-out Fan-in 任务

我们将构建一个动态 DAG Fan-out Fan-in 工作流,读取阿里云 OSS 对象存储中的一个大日志文件,并将其拆分为多个小文件(split),启动多个子任务分别计算每个小文件中的关键词数量(count),最后聚合结果(merge)。

  1. 创建分布式工作流 Argo 集群 [ 3]

  2. 挂载阿里云 OSS 存储卷,工作流可以像操作本地文件一样,操作阿里云 OSS 上的文件。参考:工作流使用存储卷 [ 4]

  3. 使用以下工作流 YAML 创建一个工作流,参考:创建工作流 [ 5] 。具体说明参见注释。

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: dynamic-dag-map-reduce-
spec:
  entrypoint: main
  # claim a OSS PVC, workflow can read/write file in OSS through PVC. 
  volumes:
    - name: workdir
      persistentVolumeClaim:
        claimName: pvc-oss
  # how many tasks to split, default is 5.
  arguments:
    parameters:
      - name: numParts
        value: "5"
  templates:
    - name: main
      # DAG definition.
      dag:
        tasks:
          # split log files to several small files, based on numParts.
          - name: split
            template: split
            arguments:
              parameters:
                - name: numParts
                  value: "{{workflow.parameters.numParts}}"
          # multiple map task to count words in each small file.
          - name: map
            template: map
            arguments:
              parameters:
                - name: partId
                  value: '{{item}}'
            depends: "split"
            # run as a loop, partId from split task json outputs.
            withParam: '{{tasks.split.outputs.result}}'
          - name: reduce
            template: reduce
            arguments:
              parameters:
                - name: numParts
                  value: "{{workflow.parameters.numParts}}"
            depends: "map"
    # The `split` task split the big log file to several small files. Each file has a unique ID (partId).
    # Finally, it dumps a list of partId to stdout as output parameters
    - name: split
      inputs:
        parameters:
          - name: numParts
      container:
        image: acr-multiple-clusters-registry.cn-hangzhou.cr.aliyuncs.com/ack-multiple-clusters/python-log-count
        command: [python]
        args: ["split.py"]
        env:
        - name: NUM_PARTS
          value: "{{inputs.parameters.numParts}}"
        volumeMounts:
        - name: workdir
          mountPath: /mnt/vol
    # One `map` per partID is started. Finds its own "part file" and processes it.
    - name: map
      inputs:
        parameters:
          - name: partId
      container:
        image: acr-multiple-clusters-registry.cn-hangzhou.cr.aliyuncs.com/ack-multiple-clusters/python-log-count
        command: [python]
        args: ["count.py"]
        env:
        - name: PART_ID
          value: "{{inputs.parameters.partId}}"
        volumeMounts:
        - name: workdir
          mountPath: /mnt/vol
    # The `reduce` task takes the "results directory" and returns a single result.
    - name: reduce
      inputs:
        parameters:
          - name: numParts
      container:
        image: acr-multiple-clusters-registry.cn-hangzhou.cr.aliyuncs.com/ack-multiple-clusters/python-log-count
        command: [python]
        args: ["merge.py"]
        env:
        - name: NUM_PARTS
          value: "{{inputs.parameters.numParts}}"
        volumeMounts:
        - name: workdir
          mountPath: /mnt/vol
      outputs:
        artifacts:
          - name: result
            path: /mnt/vol/result.json
  1. 动态 DAG 实现

1)split 任务在拆分大文件后,会在标准输出中输出一个 json 字符串,包含:子任务要处理的 partId,例如:

["0", "1", "2", "3", "4"]

2)map 任务使用 withParam 引用 split 任务的输出,并解析 json 字符串获得所有 {{item}},并使用每个 {{item}} 作为输入参数启动多个 map 任务。

          - name: map
            template: map
            arguments:
              parameters:
                - name: partId
                  value: '{{item}}'
            depends: "split"
            withParam: '{{tasks.split.outputs.result}}'

更多定义方式,请参考开源 Argo Workflow 文档 [ 6]

  1. 工作流运行后,通过分布式工作流 Argo 集群控制台 [ 7] 查看任务 DAG 流程与运行结果。

图片

  1. 阿里云 OSS 文件列表,log-count-data.txt 为输入日志文件,split-output,cout-output 中间结果目录,result.json 为最终结果文件。

图片

  1. 示例中的源代码可以参考:AliyunContainerService GitHub argo-workflow-examples [ 8]

总结

Argo Workflow 是开源 CNCF 毕业项目,聚焦云原生领域下的工作流编排,使用 Kubernetes CRD 编排离线任务和 DAG 工作流,并使用 Kubernetes Pod 在集群中调度运行。

阿里云 ACK One 分布式工作流 Argo 集群,产品化托管 Argo Workflow,提供售后支持,加固控制面实现数万子任务(Pod)稳定高效调度运行,数据面支持无服务器方式调度云上大规模算力,无需运维集群或者节点,支持按需调度云上算力,利用云上弹性,调度数万核 CPU 资源并行运行大规模子任务,减少运行时间,支持数据处理,机器学习,仿真计算,科学计算,CICD 等业务场景。

欢迎加入 ACK One 客户交流钉钉群与我们进行交流。(钉钉群号:35688562

相关链接:

[1] 阿里云 ACK One 分布式工作流 Argo 集群

https://help.aliyun.com/zh/ack/overview-12

[2] Argo Workflow

https://argo-workflows.readthedocs.io/en/latest/

[3] 创建分布式工作流 Argo 集群

https://help.aliyun.com/zh/ack/create-a-workflow-cluster

[4] 工作流使用存储卷

https://help.aliyun.com/zh/ack/use-volumes

[5] 创建工作流

https://help.aliyun.com/zh/ack/create-a-workflow

[6] 开源 Argo Workflow 文档

https://argo-workflows.readthedocs.io/en/latest/walk-through/loops/

[7] 分布式工作流 Argo 集群控制台

https://account.aliyun.com/login/login.htm?oauth_callback=https%3A%2F%2Fcs.console.aliyun.com%2Fone%3Fspm%3Da2c4g.11186623.0.0.7e2f1428OwzMip#/argowf/cluster/detail

[8] AliyunContainerService GitHub argo-workflow-examples

https://github.com/AliyunContainerService/argo-workflow-examples/tree/main/log-count

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/375563.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

MySQL组复制的介绍

前言 本文介绍关于MySQL组复制的背景信息和基本原理。包括,介绍MySQL传统复制方法的原理和隐患、介绍组复制的原理,单主模式和多主模式等等。通过结合原理图学习这些概念,可以很好的帮助我们理解组复制技术这一MySQL高可用方案,有…

面向对象的三大特征之一封装

封装 概念 封装就是通过 权限修饰符(private)将成员变量隐藏起来 本质:就是将数据私有化,其他类使用必须通过设置的 get 和 set 方法来获取和设置例子:假设你有一本书,你将其藏起来,别人想要看…

Java基于微信小程序的医院核酸检测服务系统,附源码

博主介绍:✌程序员徐师兄、7年大厂程序员经历。全网粉丝12w、csdn博客专家、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ 🍅文末获取源码联系🍅 👇🏻 精彩专栏推荐订阅👇…

CSS伸缩盒模型

CSS伸缩盒模型 伸缩盒模型是CSS中的一种布局手段,可以使元素具有弹性,让元素可以跟随页面大小的改变而改变。 1. 伸缩容器 给元素设置display:flex 或 display:inline-flex ,就是伸缩容器。 2. 主轴与侧轴 主轴: 伸缩项目沿着…

2024年:用OKR管理你的生活

在科技高速发展的时代,越来越多的企业和团队开始采用OKR(Objectives and Key Results)管理方法来设定目标并跟踪进度。你是否想过,将OKR理念引入个人生活,以更有效地实现人生目标?本文将探讨如何在2024年运…

句子嵌入: 交叉编码和重排序

这个系列目的是揭开嵌入的神秘面纱,并展示如何在你的项目中使用它们。第一篇博客介绍了如何使用和扩展开源嵌入模型,选择现有的模型,当前的评价方法,以及生态系统的发展状态。第二篇博客将会更一步深入嵌入并解释双向编码和交叉编…

Java基于微信小程序的医院挂号系统

文章目录 1 简介2 技术栈3 系统目标3.2 系统功能需求分析3.2.1 功能需求分析 4 系统模块设计4.1 数据库模块设计 5 系统的实现5.1 微信小程序个人中心5.2 科**室内容查看的实现**5.3 预约挂号的实现5.4 后台管理界面实现5.5 医生预约管理5.6 医生信息管理 参考文献7 推荐阅读8 …

『 C++ - STL 』unordered_xxx系列关联式容器及其封装(万字)

文章目录 🎡 unordered系列关联式容器🎡 哈希表的改造🎢 节点的设置与总体框架🎢 迭代器的封装🎠 迭代器的框架🎠 operator()运算符重载🎠 其余成员函数/运算符重载 🎢 迭代器begin(…

ES监控方法以及核心指标

文章目录 1. 监控指标采集1.1 部署elasticsearch_exporter1.2 prometheus采集elasticsearch_exporter的暴露指标1.3 promethues配置告警规则或者配置grafana大盘 2. 核心告警指标2.1 es核心指标2.2 es容量模型建议 3. 参考文章 探讨es的监控数据采集方式以及需要关注的核心指标…

ArcGIS的UTM与高斯-克吕格投影分带要点总结

UTM(通用横轴墨卡托投影、等角横轴割椭圆柱投影)投影分带投影要点: 1)UTM投影采用6度分带 2)可根据公式计算,带数(经度整数位/6)的整数部分31 3)北半球地区&#xff0…

使用dbeaver导入Excel到mysql数据库

最近业务需要将Excel导入到mysql数据库中,之前一直用的heisql,但是heidisql的导入功能太弱了,后来用了dbeaver,功能很强大。 一、安装dbeaver 首先去官网下载dbeaver社区版,社区版免费:dbeaver.io/ dbea…

ISIS 特性验证(ATT置位、渗透、认证)

拓扑图 配置 sysname AR1 # isis 1is-level level-1cost-style widenetwork-entity 49.0001.0000.0000.0001.00 # interface GigabitEthernet0/0/0ip address 12.1.1.1 255.255.255.0 isis enable 1 # interface GigabitEthernet0/0/1ip address 13.1.1.1 255.255.255.0 isis e…

【Java安全】ysoserial-URLDNS链分析

前言 Java安全中经常会提到反序列化,一个将Java对象转换为字节序列传输(或保存)并在接收字节序列后反序列化为Java对象的机制,在传输(或保存)的过程中,恶意攻击者能够将传输的字节序列替换为恶…

ideal打包,如何访问项目根目录的libs中的jar包

参考&#xff1a;idea maven 导入lib中jar 并打包_maven引入lib中的jar包-CSDN博客 解决办法&#xff0c;只需要在pom文件中加入 <includeSystemScope>true</includeSystemScope> <build><!-- <includeSystemScope>true</includeSystemScope&g…

Matlab数据快速处理指南

文章目录 Excel文件转Mat或工作区从Excel文件读取数据并转换为.mat文件从Excel文件读取数据并加载到工作区 Mat文件转ExcelExcel快速实现万行级填充各种数据类型的操作创建结构体访问结构体字段修改结构体字段的值添加新字段删除字段遍历结构体字段 Excel文件转Mat或工作区 在…

Java中JVM常用参数配置(提供配置示例)

目录 前言一、内存参数配置二、垃圾收集器配置三、GC策略配置3.1、基础通用配置3.2、Parallel 和 Parallel Old 常用参数配置3.3、CMS 常用参数配置3.4、G1 常用参数配置 四、GC日志配置五、dump 日志参数配置5.1、OutOfMemory异常时生成dump文件5.2、发生Full GC时生成dump文件…

《Git 简易速速上手小册》第1章:Git 基础(2024 最新版)

文章目录 1.1 Git 简介&#xff1a;版本控制的演变1.1.1 基础知识讲解1.1.2 重点案例&#xff1a;协作开发流程优化案例&#xff1a;功能开发与分支策略 1.1.3 拓展案例 1&#xff1a;代码审查与合并1.1.4 拓展案例 2&#xff1a;冲突解决 1.2 安装和配置 Git&#xff1a;首次设…

WebSocket基础详解

文章目录 前言由来简介优缺点适用场景兼容性 API介绍构造函数实例方法send()close() 实例属性ws.readyState&#xff08;只读&#xff09;ws.bufferedAmount&#xff08;只读&#xff09;ws.binaryTypeextensions&#xff08;只读&#xff09;protocol&#xff08;只读&#xf…

React+Antd实现表格自动向上滚动

1、效果 2、环境 1、react18 2、antd 4 3、代码实现 原理&#xff1a;创建一个定时器&#xff0c;修改表格ant-table-body的scrollTop属性实现滚动&#xff0c;监听表层的元素div的鼠标移入和移出实现实现鼠标进入元素滚动暂停&#xff0c;移出元素的时候表格滚动继续。 一…

VXLAN网关技术及应用实例详解

1.特性概述 VXLAN是VLAN扩展方案草案&#xff0c;是NVo3中的一种网络虚拟化技术。采用MAC in UDP封装方式&#xff0c;将二层报文用三层协议进行封装&#xff0c;可对二层网络在三层范围进行扩展&#xff0c;同时支持24bits的VNIID ( 16M租户能力&#xff09;&#xff0c;满足…