大数据-252 离线数仓 - Airflow 任务调度 Crontab简介 任务集成部署 入门案例

点一下关注吧!!!非常感谢!!持续更新!!!

Java篇开始了!

目前开始更新 MyBatis,一起深入浅出!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(已更完)
  • Flink(已更完)
  • ClickHouse(已更完)
  • Kudu(已更完)
  • Druid(已更完)
  • Kylin(已更完)
  • Elasticsearch(已更完)
  • DataX(已更完)
  • Tez(已更完)
  • 数据挖掘(已更完)
  • Prometheus(已更完)
  • Grafana(已更完)
  • 离线数仓(正在更新…)

章节内容

  • Airflow 任务调度系统
  • 安装部署测试

在这里插入图片描述

Crontab简介

基础介绍

Linux系统是由cron(crond)系统服务来控制的,Linux系统上原本那就有非常多的计划性工作,因此这个系统服务是默认启动的。
Linux系统也提供了Linux用户控制计划任务的命令:crontab命令

在这里插入图片描述

  • 日志文件:ll /var/log/cron*
  • 编辑文件: vim /etc/crontab
  • 进程: ps -ef | grep crond => /etc/init.d/crond restart
  • 作用:任务(命令)定时调度 定时备份等

格式说明

在这里插入图片描述
以上各个字段中,还可以使用以下特殊字符:

  • 代表所有的取值范围内的数字,如月份字段为,则表示1到12个月
  • /代表每一定时间间隔的意思,如分钟字段为*/10,表示每10分钟执行1次
  • -代表从某个区间范围,是闭区间,如2-5表示2,3,4,5,小时字段中0-23/2表示在0~23点范围内每两小时执行一次
  • ,分散的数字(不连续),如1,2,3,6,8,9
  • 由于各个地方每周的第一天不一样,因此Sunday=0(第一天)或Sunday=7(最后一天)

配置实例

# 每一分钟执行一次command(因cron默认每1分钟扫描一次,因此全为*即可)
* * * * * command

# 每小时的第3和第15分钟执行command
3,15 * * * * command

# 每天上午8-11点的第3和15分钟执行command
3,15 8-11 * * * command

# 每隔2天的上午8-11点的第3和15分钟执行command
3,15 8-11 */2 * * command

# 每个星期一的上午8点到11点的第3和第15分钟执行command
3,15 8-11 * * 1 command

# 每晚的21:30执行command
30 21 * * * command

# 每月1、10、22日的4:45执行command
45 4 1,10,22 * * command

# 每周六、周日的1 : 10执行command
10 1 * * 6,0 command

# 每小时执行command
0 */1 * * * command

# 晚上11点到早上7点之间,每隔一小时执行command
* 23-7/1 * * * command

任务集成部署

Airflow核心概念

DAGs

有向无环图(Directed Acyclic Graph),将所有需要运行的tasks按照依赖关系组织起来,描述的是所有tasks执行的顺序

Operators

Airflow内置了很多Operators

  • BashOperator 执行一个Bash命令
  • PythonOperator 调用任意的Python函数
  • EmailOperator 用于发送邮件
  • HTTPOperator 用于发送HTTP请求
  • SqlOperator 用于执行SQL命令
  • 自定义 Operator

Task

Task:Task是Operator的一个实例

Task Instance

Task Instance:由于Task会被重复调度,每次Tasks的运行就是不同的Task Instance,Task Instance 有自己的状态,包括 success、running、failed、skipped、up_for_rechedule、up_for_retry、queued、no_status等

Task Relationships

Task Relationships:DAGs中的不同Tasks之间可以有依赖关系

Executor

Executor,在Airflow中支持的执行器就有四种:

  • SequentialExecutor:单进程顺序执行任务,默认执行器,通常只用于测试
  • LocalExecutor:多进程本地执行任务
  • CeleryExecutor:分布式调度,生产常用,Celery是一个分布式调度框架,其本身没有队列功能,需要使用第三方组件,如RabbitMQ
  • DaskExecutor:动态任务调度,主要用于数据分析
  • 执行器的修改:修改 $AIRFLOW_HOME/airflow.cfg 中:executor = LocalExecutor
    这里关于执行器的修改,修改如下所示:

在这里插入图片描述

入门案例

编写脚本

mkdir $AIRFLOW_HOME/dags
vim $AIRFLOW_HOME/dags/helloworld.py

我们需要写入的内容如下:

from datetime import datetime, timedelta
from airflow import DAG
from airflow.utils import dates
from airflow.utils.helpers import chain
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator

# 定义默认参数
def default_options():
    default_args = {
        'owner': 'airflow',  # 拥有者名称
        'start_date': dates.days_ago(1),  # 第一次开始执行的时间
        'retries': 1,  # 失败重试次数
        'retry_delay': timedelta(seconds=5)  # 失败重试间隔
    }
    return default_args

# 定义Bash任务
def task1(dag):
    t = "pwd"
    task = BashOperator(
        task_id='MyTask1',  # task_id
        bash_command=t,  # 指定要执行的命令
        dag=dag  # 指定归属的dag
    )
    return task

# Python任务函数
def hello_world():
    current_time = str(datetime.today())
    print('hello world at {}'.format(current_time))

# 定义Python任务
def task2(dag):
    task = PythonOperator(
        task_id='MyTask2',
        python_callable=hello_world,  # 指定要执行的函数
        dag=dag
    )
    return task

# 定义另一个Bash任务
def task3(dag):
    t = "date"
    task = BashOperator(
        task_id='MyTask3',
        bash_command=t,
        dag=dag
    )
    return task

# 定义DAG
with DAG(
    'HelloWorldDag',  # dag_id
    default_args=default_options(),  # 指定默认参数
    schedule_interval="*/2 * * * *"  # 执行周期,每分钟2次
) as d:
    task1 = task1(d)
    task2 = task2(d)
    task3 = task3(d)
    chain(task1, task2, task3)  # 指定执行顺序

写入的内容如下所示:
在这里插入图片描述

测试运行

# 执行命令检查脚本是否有错误。如果命令行没有报错,就表示没问题
python $AIRFLOW_HOME/dags/helloworld.py

执行的结果如下图所示:
在这里插入图片描述
查看生效的 dags

# 查看生效的 dags
airflow dags list --subdir $AIRFLOW_HOME/dags

执行结果如下图所示:
在这里插入图片描述
查看指定dag中的task

airflow tasks list HelloWorldDag

执行的结果如下图所示:
在这里插入图片描述

测试dag中的task

airflow tasks test HelloWorldDag MyTask2 2020-08-01

执行的结果如下所示:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/939124.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

鸿蒙快速切换签名配置

鸿蒙快速切换签名配置 参考文档 根据官方签名文档完成签名之后。会在Signing Configs里边生成一个签名项目。 但是因为发布打包这个配置项目是需要手动配置的。那不能开发的时候用自动测试签名,上线的时候还需要手动配置一遍这个吧。想想这么弄就很麻烦。 这个时…

解决git clone时报错“authentication failed for huggingface repository”

问题1: 已经获取了模型的授权,但是git clone时,弹出弹窗 输入huggingface的用户名和密码后,报错如下 解决方式1: 阅读红框标注的说明,“password authentication in git is no longer supported.”&#…

python小课堂(一)

基础语法 1 常量和表达式2 变量和类型2.1 变量是什么2.2 变量语法 3 变量的类型3.1 动态类型特性 4 注释4.1注释是什么 5 输入输出5.1 print的介绍5.2 input 6 运算符6.1 算术运算符在这里插入图片描述6.2 关系运算符6.3 逻辑运算符6.4赋值运算符 1 常量和表达式 在print()中可…

php面对对象的基础知识

php面对对象的基础知识 程序开发:面向过程vs面向对象 面向过程面向过程是一种以“整体事件”为中心的编程思想,编程的时候把解决问题的步骤分析出来,然后用函数把这些步骤实现,在一步一步的具体步骤中再按顺序调用函数。 面向对…

es 开启slowlog

在 Elasticsearch 中,slowlog(慢日志)是用来记录查询和索引操作的性能数据,帮助你诊断性能瓶颈。你可以为查询 (search slowlog) 和索引 (index slowlog) 配置慢日志。 数据准备 POST /products/_doc/1 {"product_name&quo…

CANape使用之新建工程

基本概念 CANape有两个基本概念:“工程”和“配置”,控制着CANape中进行的所有工作。 “工程”是指硬件设置,可能是连接到ECU或车辆总线上的Vector网络接口卡,或者连接到ECU或ADAS传感器(如雷达)上的高速ECU内存接口(VX1000)&am…

Spring Cloud Sleuth 分布式链路追踪入门

您好,我是今夜写代码,今天学习下分布式链路组件Spring Cloud Sleuth。 本文内容 介绍了分布式链路的思想 Sleuth 和 Zipkin 简单集成Demo,并不涉及 Sleuth原理。 为什么要用链路追踪? 微服务架构下,一个复杂的电商应用,完成下…

Chrome 132 版本开发者工具(DevTools)更新内容

Chrome 132 版本开发者工具(DevTools)更新内容 一、使用 Gemini 调试 Network、Source 和 Performance Chrome 131 可以使用 Gemini 调试 CSS,现在可以调试更多模块了 与元素面板中的右键菜单类似,要打开 AI 辅助面板并开始与 …

[白月黑羽]关于风机协议工具的解答

架构 python3.8pyqt5 先来看下原题: 视频中软件的效果 先来看下程序的效果如何,看上去大概相似 对应代码已经上传到了gitcode https://gitcode.com/m0_37662818/fan_protocol_tool/overview 实现中的难点是双悬浮可视化,同时要高亮悬浮对…

使用C#在目录层次结构中搜索文件以查找目标字符串

例程以递归方式搜索目录层次结构中的文件以查找目标字符串。它可以搜索几乎任何类型的文件,即使它不包含 Windows 理解的文本。例如,它可以搜索 DLL 和可执行文件以查看它们是否恰好包含字符串。 下面的代码中显示的ListFiles 方法完成了大部分工作。 …

JAVA爬虫获取1688关键词接口

以下是使用Java爬虫获取1688关键词接口的详细步骤和示例代码: 一、获取API接口访问权限 要使用1688关键词接口,首先需要获取API的使用权限,并了解接口规范。以下是获取API接口的详细步骤: 注册账号:在1688平台注册一…

微服务SpringCloud链路追踪之Micrometer+Zipkin

视频教程: https://www.bilibili.com/video/BV12LBFYjEvR 效果演示 当我们发送一个请求给 Gateway 的时候,由 Micrometer trace 进行链路追踪和数据收集,由 Zipkin 进行数据展示。可以清楚的看到微服务的调用过程,以及每个微服务…

Leetcode 插入区间

class Solution {public int[][] insert(int[][] intervals, int[] newInterval) {List<int[]> result new ArrayList<>();int i 0;// Step 1: 添加所有在 newInterval 之前的区间while(i < intervals.length && intervals[i][1] < newInterval[0]…

CSS|07 标准文档流

标准文档流 一、什么是标准文档流 在制作的 HTML 网页和 PS 画图软件画图时有本质上面的区别: HTML 网页在制作的时候都得遵循一个“流的规则:从左至右、从上至下。 使用 Ps 软件画图时可以在任意地方画图。 <!DOCTYPE html> <html lang"en"> <hea…

redis 缓存使用

工具类 package org.springblade.questionnaire.redis;import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.beans.factor…

【排序算法】——选择排序

前言 排序(Sorting) 是计算机程序设计中的一种重要操作&#xff0c;它的功能是将一个数据元素&#xff08;或记录&#xff09;的任意序列&#xff0c;重新排列成一个关键字有序的序列。所谓排序&#xff0c;就是使一串记录&#xff0c;按照其中的某个或某些关键字的大小&#x…

递归实现指数型枚举(递归)

92. 递归实现指数型枚举 - AcWing题库 每个数有选和不选两种情况 我们把每个数看成每层&#xff0c;可以画出一个递归搜索树 叶子节点就是我们的答案 很容易写出每dfs函数 dfs传入一个u表示层数 当层数大于我们n时&#xff0c;去判断每个数字的选择情况&#xff0c;输出被选…

无限次使用 cursor pro

github地址 cursor-vip 使用方式 在 MacOS/Linux 中&#xff0c;请打开终端&#xff1b; 在 Windows 中&#xff0c;请打开 Git Bash。 然后执行以下命令来安装&#xff1a; 部分电脑可能会误报毒&#xff0c;需要关闭杀毒软件/电脑管家/安全防护再进行 方式1&#xff1a;通过…

【AI热点】小型语言模型(SLM)的崛起:如何在AI时代中找到你的“左膀右臂”?

人工智能模型的演变 多年来&#xff0c;谷歌等科技巨头和OpenAI等初创公司&#xff0c;一直在不遗余力地利用海量在线数据&#xff0c;打造更大、更昂贵的人工智能&#xff08;AI&#xff09;模型。这些大型语言模型&#xff08;LLM&#xff09;被广泛应用于ChatGPT等聊天机器…

解决Nginx + Vue.js (ruoyi-vue) 单页应用(SPA) 404问题的指南

问题描述 在使用Vue.js构建的单页应用&#xff08;SPA&#xff09;中&#xff0c;特别是像ruoyi-vue这样的框架&#xff0c;如果启用了HTML5历史记录模式进行路由管理&#xff0c;那么用户直接访问子路径或刷新页面时可能会遇到404错误。这是因为当用户尝试访问一个非根路径时…