Apache Airflow (十四) :Airflow分布式集群搭建及测试

🏡 个人主页:IT贫道_大数据OLAP体系技术栈,Apache Doris,Clickhouse 技术-CSDN博客

 🚩 私聊博主:加入大数据技术讨论群聊,获取更多大数据资料。

 🔔 博主个人B栈地址:豹哥教你大数据的个人空间-豹哥教你大数据个人主页-哔哩哔哩视频


目录

1. 节点规划

2. airflow集群搭建步骤

3. 初始化Airflow

4. 创建管理员用户信息

​​​​​​​5. 配置Scheduler HA

​​​​​​​6. 启动Airflow集群

​​​​​​​7. 访问Airflow 集群WebUI

8. 测试Airflow HA


1. 节点规划

节点IP

节点名称

节点角色

运行服务

192.168.179.4

node1

Master1

webserver,scheduler

192.168.179.5

node2

Master2

websever,scheduler

192.168.179.6

node3

Worker1

worker

192.168.179.7

node4

Worker2

worker

2. airflow集群搭建步骤

1) 在所有节点安装python3.7

参照单节点安装Airflow中安装anconda及python3.7。

2) 在所有节点上安装airflow

  • 每台节点安装airflow需要的系统依赖
yum -y install mysql-devel gcc gcc-devel python-devel gcc-c++ cyrus-sasl cyrus-sasl-devel cyrus-sasl-lib 
  • 每台节点配置airflow环境变量
vim /etc/profile

export AIRFLOW_HOME=/root/airflow



#使配置的环境变量生效

source /etc/profile
  • 每台节点切换airflow环境,安装airflow,指定版本为2.1.3
(python37)   conda activate python37

(python37) pip install apache-airflow==2.1.3 -i https://pypi.tuna.tsinghua.edu.cn/simple

默认Airflow安装在$ANCONDA_HOME/envs/python37/lib/python3.7/site-packages/airflow目录下。配置了AIRFLOW_HOME,Airflow安装后文件存储目录在AIRFLOW_HOME目录下。可以每台节点查看安装Airflow版本信息:

(python37)  airflow version

2.1.3
  • 在Mysql中创建对应的库并设置参数

aiflow使用的Metadata database我们这里使用mysql,在node2节点的mysql中创建airflow使用的库及表信息。

CREATE DATABASE airflow CHARACTER SET utf8;

create user 'airflow'@'%' identified by '123456';

grant all privileges on airflow.* to 'airflow'@'%';

flush privileges;

在mysql安装节点node2上修改”/etc/my.cnf”,在[mysqld]下添加如下内容:

[mysqld]

explicit_defaults_for_timestamp=1

以上修改完成“my.cnf”值后,重启Mysql即可,重启之后,可以查询对应的参数是否生效:

#重启mysql

[root@node2 ~]# service mysqld restart



#重新登录mysql查询

mysql> show variables like 'explicit_defaults_for_timestamp';

  • 每台节点配置Airflow airflow.cfg文件

修改AIRFLOW_HOME/airflow.cfg文件,确保所有机器使用同一份配置文件,在node1节点上配置airflow.cfg,配置如下:

[core]

dags_folder = /root/airflow/dags



#修改时区

default_timezone = Asia/Shanghai



#配置Executor类型,集群建议配置CeleryExecutor

executor = CeleryExecutor



# 配置数据库

sql_alchemy_conn=mysql+mysqldb://airflow:123456@node2:3306/airflow?use_unicode=true&charset=utf8



[webserver]

#设置时区

default_ui_timezone = Asia/Shanghai



[celery]

#配置Celery broker使用的消息队列

broker_url = redis://node4:6379/0

#配置Celery broker任务完成后状态更新使用库

result_backend = db+mysql://root:123456@node2:3306/airflow

将node1节点配置好的airflow.cfg发送到node2、node3、node4节点上

3. 初始化Airflow

1) 每台节点安装需要的python依赖包

初始化Airflow数据库时需要使用到连接mysql的包,执行如下命令来安装mysql对应的python包。

​
(python37) #  pip install mysqlclient -i Simple Index

​

2) 在node1上初始化Airflow 数据库

(python37) [root@node1 airflow]# airflow db init

初始化之后在MySQL airflow库下会生成对应的表。

4. 创建管理员用户信息

在node1节点上执行如下命令,创建操作Airflow的用户信息:

airflow users create \

    --username airflow \

    --firstname airflow \

    --lastname airflow \

    --role Admin \

    --email xx@qq.com

执行完成之后,设置密码为“123456”并确认,完成Airflow管理员信息创建。

​​​​​​​5. 配置Scheduler HA

1) 下载failover组件

登录https://github.com/teamclairvoyant/airflow-scheduler-failover-controller下载 airflow-scheduler-failover-controller 第三方组件,将下载好的zip包上传到node1 “/software”目录下。

在node1节点安装unzip,并解压failover组件:

(python37) [root@node1 software]# yum -y install unzip

(python37) [root@node1 software]# unzip ./airflow-scheduler-failover-controller-master.zip

2) 使用pip进行安装failover需要的依赖包

需要在node1节点上安装failover需要的依赖包。

(python37) [root@node1 software]# cd /software/airflow-scheduler-failover-controller-master

(python37) [root@node1 airflow-scheduler-failover-controller-master]# pip install -e .

3) node1节点初始化failover

(python37) [root@node1 ~]# scheduler_failover_controller init

Adding Scheduler Failover configs to Airflow config file...

Finished adding Scheduler Failover configs to Airflow config file.

Finished Initializing Configurations to allow Scheduler Failover Controller to run. Please update the airflow.cfg with your desired configurations.

注意:初始化airflow时,会向airflow.cfg配置中追加配置,因此需要先安装 airflow 并初始化。

4) 修改airflow.cfg

首先修改node1节点的AIRFLOW_HOME/airflow.cfg

[scheduler_failover]

# 配置airflow Master节点,这里配置为node1,node2,两节点需要免密

scheduler_nodes_in_cluster = node1,node2



#在1088行,特别注意,需要去掉一个分号,不然后期自动重启Scheduler不能正常启动

airflow_scheduler_start_command = export AIRFLOW_HOME=/root/airflow;nohup airflow scheduler >> ~/airflow/logs/scheduler.logs &

配置完成后,可以通过以下命令进行验证Airflow Master节点:

(python37) [root@node1 airflow]# scheduler_failover_controller test_connection

Testing Connection for host 'node1'

(True, ['Connection Succeeded', ''])

Testing Connection for host 'node2'

(True, ['Connection Succeeded\n'])

将node1节点配置好的airflow.cfg同步发送到node2、node3、node4节点上:

(python37) [root@node1 ~]# cd /root/airflow/

(python37) [root@node1 airflow]# scp airflow.cfg node2:`pwd`

(python37) [root@node1 airflow]# scp airflow.cfg node3:`pwd`

(python37) [root@node1 airflow]# scp airflow.cfg node4:`pwd`

​​​​​​​6. 启动Airflow集群

1) 在所有节点安装启动Airflow依赖的python包

(python37) [root@node1 airflow]# pip install celery==4.4.7 flower==0.9.7 redis==3.5.3

(python37) [root@node2 airflow]# pip install celery==4.4.7 flower==0.9.7 redis==3.5.3

(python37) [root@node3 airflow]# pip install celery==4.4.7 flower==0.9.7 redis==3.5.3

(python37) [root@node4 airflow]# pip install celery==4.4.7 flower==0.9.7 redis==3.5.3

2) 在Master1节点(node1)启动相应进程

#默认后台启动可以使用-D ,这里使用-D有时不能正常启动Airflow对应进程

airflow webserver

airflow scheduler

3) 在Master2节点(node2)启动相应进程

airflow webserver

4) 在Worker1(node3)、Worker2(node4)节点启动Worker

在node3、node4节点启动Worker:

(python37) [root@node3 ~]# airflow celery worker

(python37) [root@node4 ~]# airflow celery worker

5) 在node1启动Scheduler HA

(python37) [root@node1 airflow]# nohup scheduler_failover_controller start > /root/airflow/logs/scheduler_failover/scheduler_failover_run.log &

​​​​​​​

至此,Airflow高可用集群搭建完成。

​​​​​​​7. 访问Airflow 集群WebUI

浏览器输入node1:8080,查看Airflow WebUI:

8. 测试Airflow HA

1) 准备shell脚本

Airflow集群所有节点{AIRFLOW_HOME}目录下创建dags目录,准备如下两个shell脚本,将以下两个脚本放在$AIRFLOW_HOME/dags目录下,BashOperator默认执行脚本时,默认从/tmp/airflow**临时目录查找对应脚本,由于临时目录名称不定,这里建议执行脚本时,在“bash_command”中写上绝对路径。如果要写相对路径,可以将脚本放在/tmp目录下,在“bash_command”中执行命令写上“sh ../xxx.sh”也可以。

first_shell.sh

#!/bin/bash

dt=$1

echo "==== execute first shell ===="

echo "---- first : time is ${dt}"

second_shell.sh

#!/bin/bash

dt=$1

echo "==== execute second shell ===="

echo "---- second : time is ${dt}"

2) 编写airflow python 配置

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator

default_args = {
    'owner':'zhangsan',
    'start_date':datetime(2021, 9, 23),
    'retries': 1,  # 失败重试次数
    'retry_delay': timedelta(minutes=5) # 失败重试间隔
}

dag = DAG(
    dag_id = 'execute_shell_sh',
    default_args=default_args,
    schedule_interval=timedelta(minutes=1)
)

first=BashOperator(
    task_id='first',
    #脚本路径建议写绝对路径
    bash_command='sh /root/airflow/dags/first_shell.sh %s'%datetime.now().strftime("%Y-%m-%d"),
    dag = dag
)

second=BashOperator(
    task_id='second',
    #脚本路径建议写绝对路径
    bash_command='sh /root/airflow/dags/second_shell.sh %s'%datetime.now().strftime("%Y-%m-%d"),
    dag=dag
)

first >> second

将以上内容写入execute_shell.py文件,上传到所有Airflow节点{AIRFLOW_HOME}/dags目录下。

3) 重启Airflow,进入Airflow WebUI查看对应的调度

重启Airflow之前首先在node1节点关闭webserver ,Scheduler进程,在node2节点关闭webserver ,Scheduler进程,在node3,node4节点上关闭worker进程。

如果各个进程是后台启动,查看后台进程方式:

(python37) [root@node1 dags]# ps aux |grep webserver

(python37) [root@node1 dags]# ps aux |grep scheduler

(python37) [root@node2 dags]# ps aux |grep webserver

(python37) [root@node2 dags]# ps aux |grep scheduler

(python37) [root@node3 ~]# ps aux|grep "celery worker"

(python37) [root@node4 ~]# ps aux|grep "celery worker"

找到对应的启动命令对应的进程号,进行kill。

重启后进入Airflow WebUI查看任务:

点击“success”任务后,可以看到脚本执行成功日志:

​​​​​​​4) 测试Airflow HA

当我们把node1节点的websever关闭后,可以直接通过node2节点访问airflow webui:

在node1节点上,查找“scheduler”进程并kill,测试scheduler HA 是否生效:

(python37) [root@node1 ~]# ps aux|grep scheduler

root      23744  0.9  3.3 326940 63028 pts/2    S    00:08   0:02 airflow scheduler -- DagFileProcessorManager

#kill 掉scheduler进程

(python37) [root@node1 ~]# kill -9 23744



访问webserver webui

在node1节点查看scheduler_failover_controller进程日志中有启动schudler动作,注意:这里是先从node1启动,启动不起来再从其他Master 节点启动Schduler。


 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/199262.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

物理层之三种数据交换方式(电路交换、报文交换、分组交换(数据报方式、虚电路方式))

学习的最大理由是想摆脱平庸,早一天就多一份人生的精彩;迟一天就多一天平庸的困扰。各位小伙伴,如果您: 想系统/深入学习某技术知识点… 一个人摸索学习很难坚持,想组团高效学习… 想写博客但无从下手,急需…

AI模特换装的前端实现

本文作者为 360 奇舞团前端开发工程师 随着AI的火热发展,涌现了一些AI模特换装的前端工具(比如weshop网站),他们是怎么实现的呢?使用了什么技术呢?下文我们就来探索一下其实现原理。 总体的实现流程如下&am…

NX二次开发UF_MTX3_mtx4 函数介绍

文章作者:里海 来源网站:https://blog.csdn.net/WangPaiFeiXingYuan UF_MTX3_mtx4 Defined in: uf_mtx.h void UF_MTX3_mtx4(const double mtx_3D [ 9 ] , double mtx_4D [ 16 ] ) overview 概述 Converts a 3D matrix to a 4D matrix with a scale …

Anaconda离线下载torch与安装包

一、下载离线安装包 命令: pip download 安装包名 -d 安装到文件夹名 -i https://pypi.tuna.tsinghua.edu.cn/simple执行这样的命令就会把安装包的离线文件下载到指定文件夹中。 操作: 打开cmd命令行,并进入相应的目录中。 如果是tor…

比尔盖茨:GPT-5不会比GPT-4好多少,生成式AI已达到极限

比尔盖茨一句爆料,成为机器学习社区热议焦点: “GPT-5不会比GPT-4好多少。” 虽然他已不再正式参与微软的日常运营,但仍在担任顾问,并且熟悉OpenAI领导团队的想法。 消息来自德国《商报》(Handelsblatt)对…

酷开系统 | 追求娱乐不止一种方式,酷开科技带你开启新体验!

在当今社会,娱乐方式多种多样,人们对于娱乐的需求和追求也在日益增长。然而,传统的娱乐方式已经无法满足大家对于多元化、个性化的体验需求。此时,酷开科技以其独特的视角和领先的技术,为消费者们带来了全新的娱乐体验…

蓝桥杯第229题 迷宫与陷阱 BFS C++ 模拟 带你理解迷宫的深奥

题目 迷宫与陷阱 - 蓝桥云课 (lanqiao.cn)https://www.lanqiao.cn/problems/229/learning/?page1&first_category_id1&name%E8%BF%B7%E5%AE%AB%E4%B8%8E%E9%99%B7%E9%98%B1 思路和解题方法 首先,定义了一个结构体node来表示迷宫中的每个节点,包…

苍穹外卖项目笔记(6)— Redis操作营业状态设置

1 在 Java 中操作 Redis 1.1 Redis 的 Java 客户端 Jedis(官方推荐,且命令语句同 redis 命令)Lettuce(底层基于 Netty 多线程框架实现,性能高效)Spring Data Redis(对 Jedis 和 Lettuce 进行了…

解密Long型数据传递:Spring Boot后台如何避免精度丢失问题

前端和后端之间的数据传递至关重要。然而,当涉及到Long类型数据时,可能会出现精度丢失问题,这会影响数据的准确性。本文将为你介绍两种解决方案,帮助你确保Long类型数据在前端和后端之间的精确传递。 精度丢失测试 访问:http://l…

基于微信小程序的爱心捐赠平台的设计与实现-计算机毕业设计源码64923

摘 要 随着我国经济迅速发展,人们对手机的需求越来越大,各种手机软件也都在被广泛应用,但是对于手机进行数据信息管理,对于手机的各种软件也是备受用户的喜爱, 小程序的爱心捐赠平台被用户普遍使用,为方便…

【计算机网络笔记】以太网

系列文章目录 什么是计算机网络? 什么是网络协议? 计算机网络的结构 数据交换之电路交换 数据交换之报文交换和分组交换 分组交换 vs 电路交换 计算机网络性能(1)——速率、带宽、延迟 计算机网络性能(2)…

数学建模-基于BL回归模型和决策树模型对早产危险因素的探究和预测

整体求解过程概述(摘要) 近年来,全球早产率总体呈上升趋势,在我国,早产儿以每年 20 万的数目逐年递增,目前早产已经成为重大的公共卫生问题之一。据研究,早产是威胁胎儿及新生儿健康的重要因素,可能会造成死亡或智力体…

面试必须要知道的MySQL知识--索引

10 索引 10.1 数据页存储结构 10.1.1 数据页的各个部分 在讲索引之前,让我们看看一个单独的数据页是什么样子的 去除掉一些我们不太需要那么关注的部分后,简化如下: 也就是说平时我们在一个表里插入的一行一行的数据会存储在数据页里&#…

MySQL企业版之Firewall(SQL防火墙)

​​​1. 关于Firewall插件 2. Firewall插件的工作方式 3. Firewall插件测试 4. 总结延伸阅读 1. 关于Firewall插件 Friewall是MySQL企业版非常不错的功能插件之一,启用Firewall功能后,SQL的执行流程见下图示意: 2. Firewall插件的工作方式 Firewall插件的工作机制大概是…

FL Studio水果软件21.1新版!新增Hyper Chorus插件及自动更新功能

我们很高兴地宣布在去年12月发布重大版本更新后,FL Studio在2023年8月正式更新到21.1版。本次更新虽然只是维护性质,但我们还是为大家带来了一些全新的功能,包括通过钢琴卷中的音阶捕捉和自定义音符工具,引入更快、更有创意的音符…

4/150:寻找两个正序数组的中位数⭐

题目:寻找两个正序数组的中位数 给定两个大小分别为 m 和 n 的正序(从小到大)数组 nums1 和 nums2。请你找出并返回这两个正序数组的 中位数 。 算法的时间复杂度应该为 O(log (mn)) 。 题解1:暴力 暴力思路简介,…

实测有效的 8 个顶级Android 数据恢复工具

由于我们现在生活在一个依赖数字数据的时代,当重要文件从我们的 Android 手机中消失时,这将是一场数字噩梦。如果您没有预先备份Android手机上的数据或未能通过备份找到已删除的数据,那么选择最好的Android数据恢复软件是最佳选择。 因此&am…

uniapp中进行地图定位

目录 一、创建map 二、data中声明变量 三、获取当前位置信息&#xff0c;进行定位 四、在methods中写移动图标获取地名地址的方法 五、最终展示效果 一、创建map <!-- 地图展示 --><view class"mymap"><!-- <view class"mymap__map"…

大数据存储技术期中考点梳理

1.CAP理论 分布式系统的CAP理论: 首先将分布式系统中的三个特性进行如下归纳: 口(一致性(C):在分布式系统中的所有数据备份&#xff0c;在同一时刻是否有同样的值。(等于所有节点访问同一份最新的数据副本) 口可用性(A):在集群中一部分节点故障后&#xff0c;集群整体是否还能…

再探Java集合系列—ArrayList

适用于什么场景&#xff1f; 检索比较多的场景&#xff0c;例如学生成绩管理系统&#xff0c;老师对学生的成绩进行排名或查询操作 ArrayList有哪些特点&#xff1f; 1、ArrayList集合底层采用了数组数据结构&#xff0c;是Object类型 2、动态数组。ArrayList的默认初始容量…