airflow2.7.3 + celery + redis + mysql 安装部署测试

集群环境:

​ 3台 centos 7.9 (dp95、dp96、dp97) + python3.8

​ dp96:mysql8.0.36(mysql8.0离线安装)

​ dp95\dp96\dp97:celery 集群(Celery安装测试)

安装目标:
airflow2.7.3 + mysql + celery + redis

WebserverSchedulerworker
dp951
dp96111
dp971

Requirements

requirementsStable version (2.7.3)
Python3.8, 3.9, 3.10, 3.11
PlatformAMD64/ARM64(*)
Kubernetes1.24, 1.25, 1.26, 1.27
PostgreSQL11, 12, 13, 14, 15
MySQL5.7, 8.0
SQLite3.15.0+
MSSQL2017(*), 2019(*)

文章目录

    • 1、创建python环境
    • 2、安装 airflow 及其扩展包
    • 3、配置
      • 3.1 mysql 相关设置
      • 3.2 airflow配置
      • 3.3 airflow.cfg配置文件修改
      • 3.4 分发配置
      • 3.5 初始化airflow数据库
    • 4、启动
      • 4.1 配置 airflow 用户
      • 4.2 启动webserver、Scheduler
      • 4.3 启动 worker
    • 5、案例测试
    • 5.2 手动调度任务
    • 6、备注

1、创建python环境

# 1.1 创建Python3.8 环境
[zyp@dp96 ~]$ conda create --name airflow python=3.8
[zyp@dp96 ~]$ conda activate airflow   

# 报错:CommandNotFoundError: Your shell has not been properly configured to use 'conda activate'. 
# 原因:安装anaconda最后一步时,不想看见 (base) 取消了conda init 操作
# 处理 > conda init bash      
#     > source ~/.bashrc
# 运行以下命令以禁用环境提示符 (base):conda config --set changeps1 False

2、安装 airflow 及其扩展包

## 安装 airflow2.7
# 2.1 离线情况下:
#   a.可以使用公司pypi镜像源
#   b.在有网机器pip后,复制./lib/site-packages相应包或整个虚拟环境,粘贴到离线机器对应补录下(注意粘贴全,有时候包含./bin/xx文件)

(airflow)[zyp@dp96 ~]$ pip install apache-airflow==2.7.3  -i https://pypi.tuna.tsinghua.edu.cn/simple
(airflow)[zyp@dp96 ~]$ pip install apache-airflow-providers-celery   # celery[异步分布式任务队列] 连接器插件
(airflow)[zyp@dp96 ~]$ pip install apache-airflow-providers-redis    # redis 连接器插件
(airflow)[zyp@dp96 ~]$ pip install apache-airflow-providers-mysql    # mysql连接器插件(本机检测不到mysql,会报错)


# 2.2 直接复制整个airflow环境到其他机器(dp96 节点已安装mysql,分发整个Python环境,避免providers-mysql安装失败问题 )
(base)[zyp@dp96 ~]$ scp ~/anaconda3/envs/airflow zyp@dp95:~/anaconda3/envs/
(base)[zyp@dp96 ~]$ scp ~/anaconda3/envs/airflow zyp@dp97:~/anaconda3/envs/

3、配置

3.1 mysql 相关设置

官方配置

## mysql 相关设置
# /etc/my.cnf
[mysqld]
explicit_defaults_for_timestamp=1

# 创建MeatDATA database库
(airflow)[zyp@dp96 ~]$ mysql -u root -p
mysql> CREATE DATABASE airflow_db CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
mysql> CREATE USER 'airflow' IDENTIFIED BY '123456';
mysql> GRANT ALL PRIVILEGES ON airflow_db.* TO 'airflow';
mysql> flush privileges;

3.2 airflow配置

# 3.1 配置airflow工作目录和命令全局调用
(airflow)[zyp@dp96 ~]$ vim ~/.bash_profile
'''
export PATH=$PATH:$HOME/anaconda3/envs/airflow/bin
export AIRFLOW_HOME=/tmp/airflow2.7.3
'''
(airflow)[zyp@dp96 ~]$ source .bash_profile
(airflow)[zyp@dp96 ~]$ mkdir /tmp/airflow2.7.3

# 3.2 生成配置文件
(airflow)[zyp@dp96 ~]$ airflow info
(airflow)[zyp@dp96 ~]$ ls /tmp/airflow2.7.3
'''
airflow.cfg  logs
'''

# 3.3 修改配置文件
(airflow)[zyp@dp96 ~]$ vim airflow.cfg

3.3 airflow.cfg配置文件修改

## airflow.cfg
[core]
# dags文件地址
dags_folder = /tmp/airflow2.7.3/dags
# 时区
default_timezone = Asia/Shanghai                           # 默认utc
# 每个调度器可以并发运行的任务实例的最大数量
parallelism = 32                                           # 默认32,提升调度性能上可以修改
# 允许在每个DAG中并发运行的任务实例的最大数量
max_active_tasks_per_dag = 16                              # 默认16
# executor
executor =  CeleryExecutor                                 # 默认SequentialExecutor

[database]
# 元数据库设置
sql_alchemy_conn = mysql+mysqldb://airflow:123456@dp96:3306/airflow_db  # 默认sqlite:tmp/airflow2.7.3/airflow.db
# 数据库编码
sql_engine_encoding = utf-8
# 数据库,可以不用写
sql_alchemy_schema = airflow_db

[webserver]
# web端配置文件
config_file = /tmp/airflow2.7.3/webserver_config.py
# web 界面时区 
default_ui_timezone = Asia/Shanghai                           # 默认utc
# 服务地址
web_server_host = 0.0.0.0
web_server_port = 8080


[celery]
# 启动worker时将使用的并发量
worker_concurrency = 16
# broker 消息队列
broker_url = redis://dp96:6379/0
# 任务执行状态结果
broker_url = redis://dp96:6379/1

3.4 分发配置

# airflow工作目录分发
(airflow)[zyp@dp96 ~]$ scp .bash_profile zyp@dp95:~
(airflow)[zyp@dp96 ~]$ scp .bash_profile zyp@dp97:~

# 配置文件分发
(airflow)[zyp@dp96 /tmp/airflow2.7.2]$ scp airflow.cfg zyp@dp95:/tmp/airflow2.7.3/
(airflow)[zyp@dp96 /tmp/airflow2.7.2]$ scp airflow.cfg zyp@dp97:/tmp/airflow2.7.3/

3.5 初始化airflow数据库

(airflow)[zyp@dp96 ~]$ airflow db migrate

报错:TypeError: init() missing 6 required positional arguments: ‘sequence’, ‘schema’, ‘bind_key’, ‘use_signer’, ‘permanent’, and ‘sid_length’

# 解决
(airflow)[zyp@dp96 ~]$ pip install Flask-Session==0.5.0

4、启动

4.1 配置 airflow 用户

# 创建 airflow 用户
(airflow)[zyp@dp96 ~]$airflow users create \
    --username airflow \                        # 用户名
    --firstname airflow \
    --lastname airflow \
    --role Admin \                              # 角色
    --email xx@qq.com\
    --password 123456                           # 密码

4.2 启动webserver、Scheduler

(airflow)[zyp@dp96 ~]$ airflow webserver -D
(airflow)[zyp@dp96 ~]$ airflow scheduler -D

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

4.3 启动 worker

(airflow)[zyp@dp95 ~]$ airflow celery worker -D
(airflow)[zyp@dp96 ~]$ airflow celery worker -D
(airflow)[zyp@dp97 ~]$ airflow celery worker -D

报错:ImportError: libmysqlclient.so.21: cannot open shared object file: No such file or directory

解决:

# dp96 节点已安装mysql 
(airflow)[root@dp96 ~]$ scp /usr/lib64/mysql/libmysqlclient.so.21 root@dp97:/usr/lib64/

在这里插入图片描述

如图,每个执行节点开启16个celery 进程,与airflow.cfg配置文件中参数 “worker_concurrency = 16” 相对应

5、案例测试

5.1 创建任务文件 python_operator.py,分别放到dp95、dp96、dp97节点 $AIRFLOW_HOME/dags文件下

#  python_operator.py
import sys
import logging   # 日志记录
import pprint    # 美化打印
import time     
import pendulum     # 处理日期和时间

from airflow import DAG
from airflow.decorators import task,dag

# 构建DAG3种方式
# 第一种上下文管理器
with DAG(
     dag_id='python_operator_test',                          # id
     description='调度Python函数测试',                       # dag描述
     schedule=None,                                          # 调度规则,默认timedelta(days=1) 
     start_date=pendulum.datetime(2024,5,24),                # 调度开始时间
     end_date=None,
     catchup=False,                                          # 指示是否要回溯运行错过的任务实例
     tags=['pyhon']                                          # 一个列表,用于给DAG添加标签。标签可以用于分类、过滤和搜索DAG
     ) as dag:
    
    @task(task_id="print_the_context")
    def print_context(ds=None,**kwargs):
        pprint.pprint(kwargs)
        print(ds)
        logging.info(f'Pyhton解释器路径:{sys.executable}')
        logging.info('任务执行完成')
    
    run_this = print_context()

5.2 手动调度任务

在这里插入图片描述
在这里插入图片描述

6、备注

  • Worker不需要在任何进程注册即可执行任务,因此worker节点可以在不停机,不重启服务下的情况进行水平扩展。

  • 在一个Airflow集群中我们只能一次运行一个Scheduler进程,如果有多个Scheduler运行,那么可能出现同一个任务被执行多次,导致任务流重复执行。

  • dag任务文件,需分别放置Celery集群每台机器$AIRFLOW_HOME/dags下.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/638801.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Java 商品入库系统 案例

测试类 package 练习.商品入库系统;import java.util.ArrayList; import java.util.Scanner; public class Test {public static final int Enrool 1;public static final int Search 2;public static final int Delect 3;public static final int Exit 4;public static…

交换机部分综合实验

实验要求 1.内网IP地址使用172.16.0.0/16 2.sw1和sW2之间互为备份; 3.VRRP/mstp/vlan/eth-trunk均使用; 4.所有pc均通过DHcP获取Ip地址; 5.ISP只配置IP地址; 6.所有电脑可以正常访问IsP路由器环回 实验拓扑 实验思路 1.给交换机创建vlan,并将接口划入vlan 2.在SW1和…

java实现List对象转geojson文本返回前端

1.业务需求 查询带有经纬度数据的list列表,将其转为geojson格式给前端。 2.GeoJson格式说明 GeoJSON是一种对各种地理数据结构进行编码的格式,基于Javascript对象表示法(JavaScript Object Notation, 简称JSON)的地理空间信息数据交换格式。GeoJSON对…

3D透视图模型转模型变形?---模大狮模型网

3D建模是数字艺术和设计领域中的重要技术,它可以为我们带来丰富多彩的视觉体验和创意表达。在本文中,我们将探讨一个引人注目的话题:3D透视图中模型转换是否会导致变形?通过深入探讨这个问题,我们希望能够帮助您更好地理解在3D建…

131. 面试中关于架构设计都需要了解哪些内容?

文章目录 一、社区系统架构组件概览1. 系统拆分2. CDN、Nginx静态缓存、JVM本地缓存3. Redis缓存4. MQ5. 分库分表6. 读写分离7. ElasticSearch 二、商城系统-亿级商品如何存储三、对账系统-分布式事务一致性四、统计系统-海量计数六、系统设计 - 微软1、需求收集2、顶层设计3、…

Zoho CRM怎么样?云衔科技为企业提供采购优惠!

企业对于客户关系管理(CRM)系统的需求日益增加,Zoho CRM作为一款备受赞誉的国际CRM服务提供商,凭借其全面的功能、出色的用户体验和卓越的性价比,成为了众多企业数字化转型的得力助手。 Zoho CRM是一款覆盖客户全生命…

【webrtc】m98:Call的创建及Call对音频接收处理

call中多個流共享相同的辅助组件 这几个是与外部共用的 线程传输send控制module 线程任务队列工厂call的辅助组件中各种统计以及接收测的cc是自己创建的 call自己的多个辅助组件是外部传递来的 call 创建多个接收流 这里用一个set 来保存所有指针,并没有要map的意思:

2024年贵州特岗教师招聘报名流程,速速查收哦!

2024年贵州特岗教师招聘报名流程,速速查收哦!

Thingsboard规则链:Alarm Status Filter节点详解

在物联网(IoT)平台的世界里,数据处理与自动化响应是核心功能之一。作为其中的佼佼者,Thingsboard提供了一套强大的规则引擎系统,允许用户基于设备上报的数据构建复杂的自动化逻辑。在这套规则引擎中,Alarm Status Filter节点扮演了…

RedisTemplateAPI:List

文章目录 ⛄介绍⛄List的常见命令有⛄RedisTemplate API❄️❄️添加缓存❄️❄️将List放入缓存❄️❄️设置过期时间(单独设置)❄️❄️获取List缓存全部内容(起始索引,结束索引)❄️❄️从左或从右弹出一个元素❄️❄️根据索引查询元素❄…

AI巨头争相与Reddit合作:为何一个古老的论坛成为AI训练的“宝藏”?

每周跟踪AI热点新闻动向和震撼发展 想要探索生成式人工智能的前沿进展吗?订阅我们的简报,深入解析最新的技术突破、实际应用案例和未来的趋势。与全球数同行一同,从行业内部的深度分析和实用指南中受益。不要错过这个机会,成为AI领…

RDDM论文阅读笔记

CVPR2024的残差去噪模型。把diffusion 模型的加噪过程分解为残差diffusion和noise diffusion,其中残差diffusion模拟从target image到degraded image的过程,而noise diffusion则是原来的diffusion过程,即从图片到高斯噪声的加噪过程。前者可以…

Error:(6, 43) java: 程序包org.springframework.data.redis.core不存在

目录 一、在做SpringBoot整合Redis的项目时,报错: 二、尝试 三、解决办法 一、在做SpringBoot整合Redis的项目时,报错: 二、尝试 给依赖加版本号,并且把版本换了个遍,也不行,也去update过ma…

hls.js实现分片播放视频

前言&#xff1a;hls.js官网&#xff1a;hls.js - npm 一、demo——在HTML中使用 <audio id"audio" controls></audio><script src"https://cdn.jsdelivr.net/npm/hls.jslatest"></script> <script>document.addEventList…

华为鸿蒙认证培训 | 讯方技术成为首批鸿蒙原生应用开发及培训服务商

5月20日&#xff0c;鸿蒙原生应用合作交流推介会-深圳站在深圳中洲万豪酒店隆重举行。讯方技术作为鸿蒙钻石服务商受邀参与此次活动&#xff0c;活动由讯方技术总裁刘国锋、执行副总裁刘铭皓、教学资源部部长张俊豪共同出席。 本次活动由深圳政府指导&#xff0c;鸿蒙生态官方…

AI边缘计算高效赋能,打造智慧社区安防管理解决方案

一、背景需求分析 随着信息技术的飞速发展&#xff0c;智慧社区建设已成为提升社区治理和服务水平的重要方向。通过深度整合大数据、云计算和人工智能等前沿技术&#xff0c;致力于构建信息化、智能化的新型社区治理体系。根据《关于深入推进智慧社区建设的意见》的指引&#…

泰克TBS2204B示波器如何设置存储时间?

示波器是电子测量领域中不可或缺的重要仪器之一。泰克公司生产的TBS2204B数字示波器是一款广受欢迎的中端市场产品&#xff0c;其中存储时间设置是用户需要掌握的关键操作之一。 TBS2204B示波器的存储时间设置涉及以下几个方面&#xff1a; 1. 存储时间基准 存储时间基准决定…

办公楼智慧公厕解决方案云平台,助力办公环境品质提升

在现代化的办公楼中&#xff0c;智慧公厕解决方案云平台正发挥着至关重要的作用&#xff0c;有力地助力办公环境品质的提升。 一、云平台优势 智慧公厕云平台具有高效的集成性&#xff0c;将各种设备和信息整合在一起&#xff0c;实现了统一管理和调度。云平台还可以和海量的设…

Threes 特效 炫酷传送门HTML5动画特效

基于Three.js的HTML5 3D动画&#xff0c;这个动画模拟了游戏中的一个炫酷的3D场景&#xff0c;支持360度视角查看&#xff0c;也支持鼠标滚轮进行缩放。画面中主要展现了一个游戏中传送门的效果&#xff0c;同时还有路两边的围栏、灯笼、石头&#xff0c;以及星光闪闪的萤火虫&…