集群环境:
3台 centos 7.9 (dp95、dp96、dp97) + python3.8
dp96:mysql8.0.36(mysql8.0离线安装)
dp95\dp96\dp97:celery 集群(Celery安装测试)
安装目标:
airflow2.7.3 + mysql + celery + redis
Webserver | Scheduler | worker | |
---|---|---|---|
dp95 | 1 | ||
dp96 | 1 | 1 | 1 |
dp97 | 1 |
Requirements
requirements | Stable version (2.7.3) |
---|---|
Python | 3.8, 3.9, 3.10, 3.11 |
Platform | AMD64/ARM64(*) |
Kubernetes | 1.24, 1.25, 1.26, 1.27 |
PostgreSQL | 11, 12, 13, 14, 15 |
MySQL | 5.7, 8.0 |
SQLite | 3.15.0+ |
MSSQL | 2017(*), 2019(*) |
文章目录
- 1、创建python环境
- 2、安装 airflow 及其扩展包
- 3、配置
- 3.1 mysql 相关设置
- 3.2 airflow配置
- 3.3 airflow.cfg配置文件修改
- 3.4 分发配置
- 3.5 初始化airflow数据库
- 4、启动
- 4.1 配置 airflow 用户
- 4.2 启动webserver、Scheduler
- 4.3 启动 worker
- 5、案例测试
- 5.2 手动调度任务
- 6、备注
1、创建python环境
# 1.1 创建Python3.8 环境
[zyp@dp96 ~]$ conda create --name airflow python=3.8
[zyp@dp96 ~]$ conda activate airflow
# 报错:CommandNotFoundError: Your shell has not been properly configured to use 'conda activate'.
# 原因:安装anaconda最后一步时,不想看见 (base) 取消了conda init 操作
# 处理 > conda init bash
# > source ~/.bashrc
# 运行以下命令以禁用环境提示符 (base):conda config --set changeps1 False
2、安装 airflow 及其扩展包
## 安装 airflow2.7
# 2.1 离线情况下:
# a.可以使用公司pypi镜像源
# b.在有网机器pip后,复制./lib/site-packages相应包或整个虚拟环境,粘贴到离线机器对应补录下(注意粘贴全,有时候包含./bin/xx文件)
(airflow)[zyp@dp96 ~]$ pip install apache-airflow==2.7.3 -i https://pypi.tuna.tsinghua.edu.cn/simple
(airflow)[zyp@dp96 ~]$ pip install apache-airflow-providers-celery # celery[异步分布式任务队列] 连接器插件
(airflow)[zyp@dp96 ~]$ pip install apache-airflow-providers-redis # redis 连接器插件
(airflow)[zyp@dp96 ~]$ pip install apache-airflow-providers-mysql # mysql连接器插件(本机检测不到mysql,会报错)
# 2.2 直接复制整个airflow环境到其他机器(dp96 节点已安装mysql,分发整个Python环境,避免providers-mysql安装失败问题 )
(base)[zyp@dp96 ~]$ scp ~/anaconda3/envs/airflow zyp@dp95:~/anaconda3/envs/
(base)[zyp@dp96 ~]$ scp ~/anaconda3/envs/airflow zyp@dp97:~/anaconda3/envs/
3、配置
3.1 mysql 相关设置
官方配置
## mysql 相关设置
# /etc/my.cnf
[mysqld]
explicit_defaults_for_timestamp=1
# 创建MeatDATA database库
(airflow)[zyp@dp96 ~]$ mysql -u root -p
mysql> CREATE DATABASE airflow_db CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
mysql> CREATE USER 'airflow' IDENTIFIED BY '123456';
mysql> GRANT ALL PRIVILEGES ON airflow_db.* TO 'airflow';
mysql> flush privileges;
3.2 airflow配置
# 3.1 配置airflow工作目录和命令全局调用
(airflow)[zyp@dp96 ~]$ vim ~/.bash_profile
'''
export PATH=$PATH:$HOME/anaconda3/envs/airflow/bin
export AIRFLOW_HOME=/tmp/airflow2.7.3
'''
(airflow)[zyp@dp96 ~]$ source .bash_profile
(airflow)[zyp@dp96 ~]$ mkdir /tmp/airflow2.7.3
# 3.2 生成配置文件
(airflow)[zyp@dp96 ~]$ airflow info
(airflow)[zyp@dp96 ~]$ ls /tmp/airflow2.7.3
'''
airflow.cfg logs
'''
# 3.3 修改配置文件
(airflow)[zyp@dp96 ~]$ vim airflow.cfg
3.3 airflow.cfg配置文件修改
## airflow.cfg
[core]
# dags文件地址
dags_folder = /tmp/airflow2.7.3/dags
# 时区
default_timezone = Asia/Shanghai # 默认utc
# 每个调度器可以并发运行的任务实例的最大数量
parallelism = 32 # 默认32,提升调度性能上可以修改
# 允许在每个DAG中并发运行的任务实例的最大数量
max_active_tasks_per_dag = 16 # 默认16
# executor
executor = CeleryExecutor # 默认SequentialExecutor
[database]
# 元数据库设置
sql_alchemy_conn = mysql+mysqldb://airflow:123456@dp96:3306/airflow_db # 默认sqlite:tmp/airflow2.7.3/airflow.db
# 数据库编码
sql_engine_encoding = utf-8
# 数据库,可以不用写
sql_alchemy_schema = airflow_db
[webserver]
# web端配置文件
config_file = /tmp/airflow2.7.3/webserver_config.py
# web 界面时区
default_ui_timezone = Asia/Shanghai # 默认utc
# 服务地址
web_server_host = 0.0.0.0
web_server_port = 8080
[celery]
# 启动worker时将使用的并发量
worker_concurrency = 16
# broker 消息队列
broker_url = redis://dp96:6379/0
# 任务执行状态结果
broker_url = redis://dp96:6379/1
3.4 分发配置
# airflow工作目录分发
(airflow)[zyp@dp96 ~]$ scp .bash_profile zyp@dp95:~
(airflow)[zyp@dp96 ~]$ scp .bash_profile zyp@dp97:~
# 配置文件分发
(airflow)[zyp@dp96 /tmp/airflow2.7.2]$ scp airflow.cfg zyp@dp95:/tmp/airflow2.7.3/
(airflow)[zyp@dp96 /tmp/airflow2.7.2]$ scp airflow.cfg zyp@dp97:/tmp/airflow2.7.3/
3.5 初始化airflow数据库
(airflow)[zyp@dp96 ~]$ airflow db migrate
报错:TypeError: init() missing 6 required positional arguments: ‘sequence’, ‘schema’, ‘bind_key’, ‘use_signer’, ‘permanent’, and ‘sid_length’
# 解决
(airflow)[zyp@dp96 ~]$ pip install Flask-Session==0.5.0
4、启动
4.1 配置 airflow 用户
# 创建 airflow 用户
(airflow)[zyp@dp96 ~]$airflow users create \
--username airflow \ # 用户名
--firstname airflow \
--lastname airflow \
--role Admin \ # 角色
--email xx@qq.com\
--password 123456 # 密码
4.2 启动webserver、Scheduler
(airflow)[zyp@dp96 ~]$ airflow webserver -D
(airflow)[zyp@dp96 ~]$ airflow scheduler -D
4.3 启动 worker
(airflow)[zyp@dp95 ~]$ airflow celery worker -D
(airflow)[zyp@dp96 ~]$ airflow celery worker -D
(airflow)[zyp@dp97 ~]$ airflow celery worker -D
报错:ImportError: libmysqlclient.so.21: cannot open shared object file: No such file or directory
解决:
# dp96 节点已安装mysql
(airflow)[root@dp96 ~]$ scp /usr/lib64/mysql/libmysqlclient.so.21 root@dp97:/usr/lib64/
如图,每个执行节点开启16个celery 进程,与airflow.cfg配置文件中参数 “worker_concurrency = 16” 相对应
5、案例测试
5.1 创建任务文件 python_operator.py,分别放到dp95、dp96、dp97节点 $AIRFLOW_HOME/dags文件下
# python_operator.py
import sys
import logging # 日志记录
import pprint # 美化打印
import time
import pendulum # 处理日期和时间
from airflow import DAG
from airflow.decorators import task,dag
# 构建DAG3种方式
# 第一种上下文管理器
with DAG(
dag_id='python_operator_test', # id
description='调度Python函数测试', # dag描述
schedule=None, # 调度规则,默认timedelta(days=1)
start_date=pendulum.datetime(2024,5,24), # 调度开始时间
end_date=None,
catchup=False, # 指示是否要回溯运行错过的任务实例
tags=['pyhon'] # 一个列表,用于给DAG添加标签。标签可以用于分类、过滤和搜索DAG
) as dag:
@task(task_id="print_the_context")
def print_context(ds=None,**kwargs):
pprint.pprint(kwargs)
print(ds)
logging.info(f'Pyhton解释器路径:{sys.executable}')
logging.info('任务执行完成')
run_this = print_context()
5.2 手动调度任务
6、备注
-
Worker不需要在任何进程注册即可执行任务,因此worker节点可以在不停机,不重启服务下的情况进行水平扩展。
-
在一个Airflow集群中我们只能一次运行一个Scheduler进程,如果有多个Scheduler运行,那么可能出现同一个任务被执行多次,导致任务流重复执行。
-
dag任务文件,需分别放置Celery集群每台机器$AIRFLOW_HOME/dags下.