Apache Zeppelin结合Apache Airflow使用1

Apache Zeppelin结合Apache Airflow使用1

文章目录

  • Apache Zeppelin结合Apache Airflow使用1
  • 前言
  • 一、安装Airflow
  • 二、使用步骤
    • 1.目标
    • 2.编写DAG
    • 2.加载、执行DAG
  • 总结


前言

之前学了Zeppelin的使用,今天开始结合Airflow串任务。

Apache Airflow和Apache Zeppelin是两个不同的工具,各自用于不同的目的。Airflow用于编排和调度工作流,而Zeppelin是一个交互式数据分析和可视化的笔记本工具。虽然它们有不同的主要用途,但可以结合使用以满足一些复杂的数据处理和分析需求。

下面是一些结合使用Airflow和Zeppelin的方式:

  1. Airflow调度Zeppelin Notebooks:

    • 使用Airflow编写调度任务,以便在特定时间或事件触发时运行Zeppelin笔记本。
    • 在Airflow中使用Zeppelin的REST API或CLI命令来触发Zeppelin笔记本的执行。
  2. 数据流管道:

    • 使用Airflow编排数据处理和转换任务,例如从数据源提取数据、清理和转换数据。
    • 在Zeppelin中创建笔记本,用于进一步的数据分析、可视化和报告生成。
    • Airflow任务完成后,触发Zeppelin笔记本执行以基于最新数据执行分析。
  3. 参数传递:

    • 通过Airflow参数传递,将一些参数值传递给Zeppelin笔记本,以便在不同任务之间共享信息。
    • Zeppelin笔记本可以从Airflow任务中获取参数值,以适应特定的数据分析需求。
  4. 日志和监控:

    • 使用Airflow监控工作流的运行情况,查看任务的日志和执行状态。
    • 在Zeppelin中记录和可视化Airflow工作流的关键指标,以获得更全面的工作流性能洞察。
  5. 整合数据存储:

    • Airflow可以用于从不同数据源中提取数据,然后将数据传递给Zeppelin进行进一步的分析。
    • Zeppelin可以使用Airflow任务生成的数据,进行更深入的数据挖掘和分析。

结合使用Airflow和Zeppelin能够充分发挥它们各自的优势,实现更全面、可控和可视化的数据处理和分析工作流。


一、安装Airflow

安装参考:
https://airflow.apache.org/docs/apache-airflow/stable/start.html

CentOS 7.9安装后启动会报错,还需要配置下sqlite,参考:https://airflow.apache.org/docs/apache-airflow/2.8.0/howto/set-up-database.html#setting-up-a-sqlite-database

[root@slas bin]# airflow standalone
Traceback (most recent call last):
  File "/root/.pyenv/versions/3.9.10/bin/airflow", line 5, in <module>
    from airflow.__main__ import main
  File "/root/.pyenv/versions/3.9.10/lib/python3.9/site-packages/airflow/__init__.py", line 52, in <module>
    from airflow import configuration, settings
  File "/root/.pyenv/versions/3.9.10/lib/python3.9/site-packages/airflow/configuration.py", line 2326, in <module>
    conf.validate()
  File "/root/.pyenv/versions/3.9.10/lib/python3.9/site-packages/airflow/configuration.py", line 718, in validate
    self._validate_sqlite3_version()
  File "/root/.pyenv/versions/3.9.10/lib/python3.9/site-packages/airflow/configuration.py", line 824, in _validate_sqlite3_version
    raise AirflowConfigException(
airflow.exceptions.AirflowConfigException: error: SQLite C library too old (< 3.15.0). See https://airflow.apache.org/docs/apache-airflow/2.8.0/howto/set-up-database.html#setting-up-a-sqlite-database

二、使用步骤

1.目标

我想做个简单的demo,包括两个节点,实现如图所示功能,读取csv,去重:
在这里插入图片描述
csv文件输入在airflow上实现,去重在zeppelin上实现。

2.编写DAG

先实现extract_data_script.py,做个简单的读取csv指定列数据写入新的csv文件。

import argparse
import pandas as pd

def extract_and_write_data(date, output_csv, columns_to_extract):
    # 读取指定列的数据
    csv_file_path = f"/home/works/datasets/data_{date}.csv"
    df = pd.read_csv(csv_file_path, usecols=columns_to_extract)

    # 将数据写入新的 CSV 文件
    df.to_csv(output_csv, index=False)

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--date", type=str, required=True, help="Date parameter passed by Airflow")
    args = parser.parse_args()

    # 输出 CSV 文件路径(替换为实际的路径)
    output_csv_path = "/home/works/output/extracted_data.csv"

    # 指定要提取的列
    columns_to_extract = ['column1', 'column2', 'column3']

    # 调用函数进行数据提取和写入
    extract_and_write_data(args.date, output_csv_path, columns_to_extract)

然后在 Zeppelin 中创建一个 Python 笔记本(Notebook),其中包含被 Airflow DAG 调用的代码。加载先前从 output/extracted_data.csv 文件中提取的数据:

%python

# 导入必要的库
import pandas as pd

# 加载先前从 CSV 文件中提取的数据
csv_file_path = "/home/works/output/extracted_data.csv"
# 读取 CSV 文件
df = pd.read_csv(csv_file_path)

# 过滤掉 column1 为空的行
df = df[df['column1'].notnull()]

# 去重,以 column2、column3 字段为联合去重依据
deduplicated_df = df.drop_duplicates(subset=["column2", "column3"])

# 保存去重后的结果到新的 CSV 文件
deduplicated_df.to_csv("/home/works/output/dd_data.csv", index=False)

将这个 Zeppelin 笔记本保存,并记住笔记本的paragraph ID, Airflow DAG 需要使用这个 ID 来调用 Zeppelin 笔记本。

接下来,用VSCode编写zeppelin_integration.py代码如下,上传到$AIRFLOW_HOME/dags目录下:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2024, 1, 1),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'zeppelin_integration',
    default_args=default_args,
    schedule=timedelta(days=1),
)

extract_data_task = BashOperator(
    task_id='extract_data',
    bash_command='python /home/works/z/extract_data_script.py --date {{ ds }}',
    dag=dag,
)


run_zeppelin_notebook_task = BashOperator(
    task_id='run_zeppelin_notebook',
    bash_command='curl -X POST -HContent-Type:application/json http://IP:PORT/api/notebook/run/2JND7T68E/paragraph_1705372327640_1111015359',
    dag=dag,
)


# Set the task dependencies
extract_data_task >> run_zeppelin_notebook_task

2.加载、执行DAG

如下命令进行测试,先执行下代码看看语法是否都正确,然后list出tasks,并逐一test:

# python zeppelin_integration.py 

# airflow tasks list zeppelin_integration
extract_data
run_zeppelin_notebook

# airflow tasks test zeppelin_integration extract_data 20240122
[2024-01-22T08:57:45.805+0800] {dagbag.py:538} INFO - Filling up the DagBag from /root/airflow/dags
[2024-01-22T08:57:47.853+0800] {taskinstance.py:1957} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: zeppelin_integration.extract_data __airflow_temporary_run_2024-01-22T00:57:47.740537+00:00__ [None]>
[2024-01-22T08:57:47.860+0800] {taskinstance.py:1957} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: zeppelin_integration.extract_data __airflow_temporary_run_2024-01-22T00:57:47.740537+00:00__ [None]>
[2024-01-22T08:57:47.861+0800] {taskinstance.py:2171} INFO - Starting attempt 1 of 2
[2024-01-22T08:57:47.861+0800] {taskinstance.py:2250} WARNING - cannot record queued_duration for task extract_data because previous state change time has not been saved
[2024-01-22T08:57:47.862+0800] {taskinstance.py:2192} INFO - Executing <Task(BashOperator): extract_data> on 2024-01-20T00:00:00+00:00
[2024-01-22T08:57:47.900+0800] {taskinstance.py:2481} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='zeppelin_integration' AIRFLOW_CTX_TASK_ID='extract_data' AIRFLOW_CTX_EXECUTION_DATE='2024-01-20T00:00:00+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='__airflow_temporary_run_2024-01-22T00:57:47.740537+00:00__'
[2024-01-22T08:57:47.904+0800] {subprocess.py:63} INFO - Tmp dir root location: /tmp
[2024-01-22T08:57:47.905+0800] {subprocess.py:75} INFO - Running command: ['/bin/bash', '-c', 'python /home/works/z/extract_data_script.py --date 2024-01-20']
[2024-01-22T08:57:47.914+0800] {subprocess.py:86} INFO - Output:
[2024-01-22T08:57:48.553+0800] {subprocess.py:97} INFO - Command exited with return code 0
[2024-01-22T08:57:48.632+0800] {taskinstance.py:1138} INFO - Marking task as SUCCESS. dag_id=zeppelin_integration, task_id=extract_data, execution_date=20240120T000000, start_date=, end_date=20240122T005748

# airflow tasks test zeppelin_integration run_zeppelin_notebook 20240122
[2024-01-22T09:01:43.665+0800] {dagbag.py:538} INFO - Filling up the DagBag from /root/airflow/dags
[2024-01-22T09:01:45.835+0800] {taskinstance.py:1957} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: zeppelin_integration.run_zeppelin_notebook __airflow_temporary_run_2024-01-22T01:01:45.733341+00:00__ [None]>
[2024-01-22T09:01:45.843+0800] {taskinstance.py:1957} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: zeppelin_integration.run_zeppelin_notebook __airflow_temporary_run_2024-01-22T01:01:45.733341+00:00__ [None]>
[2024-01-22T09:01:45.844+0800] {taskinstance.py:2171} INFO - Starting attempt 1 of 2
[2024-01-22T09:01:45.844+0800] {taskinstance.py:2250} WARNING - cannot record queued_duration for task run_zeppelin_notebook because previous state change time has not been saved
[2024-01-22T09:01:45.845+0800] {taskinstance.py:2192} INFO - Executing <Task(BashOperator): run_zeppelin_notebook> on 2024-01-22T00:00:00+00:00
[2024-01-22T09:01:45.904+0800] {taskinstance.py:2481} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='zeppelin_integration' AIRFLOW_CTX_TASK_ID='run_zeppelin_notebook' AIRFLOW_CTX_EXECUTION_DATE='2024-01-22T00:00:00+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='__airflow_temporary_run_2024-01-22T01:01:45.733341+00:00__'
[2024-01-22T09:01:45.909+0800] {subprocess.py:63} INFO - Tmp dir root location: /tmp
[2024-01-22T09:01:45.910+0800] {subprocess.py:75} INFO - Running command: ['/bin/bash', '-c', 'curl -X POST -HContent-Type:application/json http://100.100.30.220:8181/api/notebook/run/2JND7T68E/paragraph_1705372327640_1111015359']
[2024-01-22T09:01:45.921+0800] {subprocess.py:86} INFO - Output:
[2024-01-22T09:01:45.931+0800] {subprocess.py:93} INFO -   % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
[2024-01-22T09:01:45.931+0800] {subprocess.py:93} INFO -                                  Dload  Upload   Total   Spent    Left  Speed
100    50  100    50    0     0      8      0  0:00:06  0:00:06 --:--:--    12
[2024-01-22T09:01:52.003+0800] {subprocess.py:93} INFO - {"status":"OK","body":{"code":"SUCCESS","msg":[]}}
[2024-01-22T09:01:52.003+0800] {subprocess.py:97} INFO - Command exited with return code 0
[2024-01-22T09:01:52.098+0800] {taskinstance.py:1138} INFO - Marking task as SUCCESS. dag_id=zeppelin_integration, task_id=run_zeppelin_notebook, execution_date=20240122T000000, start_date=, end_date=20240122T010152

最后用命令airflow scheduler将它添加到airflow里。

# airflow scheduler
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
[2024-01-22T09:28:21.829+0800] {task_context_logger.py:63} INFO - Task context logging is enabled
[2024-01-22T09:28:21.831+0800] {executor_loader.py:115} INFO - Loaded executor: SequentialExecutor
[2024-01-22T09:28:21.868+0800] {scheduler_job_runner.py:808} INFO - Starting the scheduler
[2024-01-22T09:28:21.869+0800] {scheduler_job_runner.py:815} INFO - Processing each file at most -1 times
。。。

页面上会增加一个DAG,如图:
在这里插入图片描述
在Actions里可以点击执行。


总结

以上就是今天要讲的内容,总体来说集成两个工具还是很方便的,期待后面更多的应用。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/341771.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

如何使用固定公网地址访问多个本地Nginx服务搭建的网站

文章目录 1. 下载windows版Nginx2. 配置Nginx3. 测试局域网访问4. cpolar内网穿透5. 测试公网访问6. 配置固定二级子域名7. 测试访问公网固定二级子域名 本文主要介绍如何在Windows系统对Nginx进行配置&#xff0c;并结合cpolar内网穿透工具实现固定公网地址远程访问多个本地站…

学习笔记-李沐动手学深度学习(一)(01-07,概述、数据操作、tensor操作、数学基础、自动求导)

个人随笔 第三列是 jupyter记事本 官方github上啥都有&#xff08;代码、jupyter记事本、胶片&#xff09; https://github.com/d2l-ai 多体会 【梯度指向的是值变化最大的方向】 符号 维度 &#xff08;弹幕说&#xff09;2&#xff0c;3&#xff0c;4越后面维度越低 4…

dubbo:深入理解Apache Dubbo与实战

dubbo核心组件 层次名 作 用 Service 业务层。包括业务代码的接口与实现&#xff0c;即开发者实现的业务代码 config 配置层。主要围绕ServiceConfig &#xff08;暴露的服务配置&#xff09;和ReferenceConfig &#xff08;引用的服务配置&#xff09;两个实现类展开&#xf…

canvas绘制旋转的椭圆花

查看专栏目录 canvas实例应用100专栏&#xff0c;提供canvas的基础知识&#xff0c;高级动画&#xff0c;相关应用扩展等信息。canvas作为html的一部分&#xff0c;是图像图标地图可视化的一个重要的基础&#xff0c;学好了canvas&#xff0c;在其他的一些应用上将会起到非常重…

网络安全全栈培训笔记(57-服务攻防-应用协议RsyncSSHRDPFTP漏洞批量扫描口令拆解)

第57天 服务攻防-应用协议&Rsync&SSH&RDP&FTP&漏洞批量扫描&口令拆解 知识点&#xff1a; 1、服务攻防-远程控制&文件传输等 2、远程控制-RDP&RDP&弱口令&漏洞 3、文件传输-FTP&Rsyc&弱口令&漏洞 章节内容&#xff1a; …

【Java】--网络编程:基于TCP协议的网络通信

【Java】–网络编程&#xff1a;基于TCP协议的网络通信 文章目录 【Java】--网络编程&#xff1a;基于TCP协议的网络通信一、TCP协议1.1 概念1.2 三次握手1.2.1 文字描述1.2.2 画图演示 1.3 四次挥手1.3.1 文字描述1.3.2 画图演示 二、基于TCP的Socket网络编程2.1 概念2.2 服务…

c#中使用UTF-8编码处理多语言文本的有效策略

使用UTF-8编码处理多语言文本的有效策略 在当今的全球化时代&#xff0c;软件开发者常常需要处理包含多种语言的文本。这不仅涉及英文和其他西方语言&#xff0c;还包括中文、日文、韩文等多字节字符系统。在这篇博客中&#xff0c;我将探讨如何有效地使用UTF-8编码来处理混合语…

大模型实战营Day5笔记

大模型部署背景 大模型部署是指将训练好的模型在特定的软硬件环境中启动的过程&#xff0c;使模型能够接收输入并返回预测结果。大模型的内存开销巨大&#xff0c;7B模型仅权重需要14G内存。另外大模型是自回归生成&#xff0c;需要缓存Attention的 k/v。 LMDeploy 简…

学生宿舍人走断电管理系统的意义和功能

学生宿舍人走断电管理系统是石家庄光大远通电气公司一款智能化的电力管理设备&#xff0c;旨在解决学生宿舍安全用电问题。以下是一些该系统的功能特点: 1.智能控制:系统能够自动识别宿舍内是否有人&#xff0c;当无人时自动断电&#xff0c;避免能源浪费和安全事故的发生。 2.…

Prometheus插件安装kafka_exporter

下载地址 https://github.com/danielqsj/kafka_exporter/releases 解压 tar -zxvf kafka_exporter-1.7.0.linux-amd64.tar.gzmv kafka_exporter-1.7.0.linux-amd64 kafka_exporter服务配置 cd /usr/lib/systemd/systemvi kafka_exporter.service内容如下 [Unit] Descript…

容器技术2-镜像与容器储存

目录 一、镜像制作 1、ddocker build 2、docker commit 二、镜像存储 1、公共仓库 2、私有仓库 三、镜像使用 四、容器存储 1、镜像元数据 2、存储驱动 3、数据卷 一、镜像制作 1、ddocker build 基于 Dockerfile 自动构建镜像 其机制为&#xff1a;每一行都会基于…

【webrtc】neteq测试工程

设置git代理 $ git config --global http.https://github.com.proxy socks5://127.0.0.1:7890 git config --global https.https://github.com.proxy socks5://127.0.0.1:7890导入cmake直接构建 win32 debug v143 编译opus Build started...

【零基础入门TypeScript】数组

目录 数组的特点 声明和初始化数组 句法 访问数组元素 示例&#xff1a;简单数组 示例&#xff1a;单语句声明和初始化 数组对象 例子 示例&#xff1a;数组构造函数接受逗号分隔值 数组方法 数组解构 例子 使用 for…in 循环遍历数组 TypeScript 中的数组 使用变…

vue:element-ui表单动态验证规则

一、需求&#xff1a; 实现当是否发送消息选择是时&#xff0c;业务类型字段必填。 二、实现&#xff1a; 当你在一个表单中使用 el-form 和 el-form-item 来创建表单项时&#xff0c;el-form-item 的 :rules 属性可以用来设置该表单项的验证规则。我们希望根据用户在 "…

前端JS加密与Buspsuite的坦诚相待

前端JS加密测试场景下的困惑 在渗透测试过程中经常会遇到JS前端加密的场景&#xff0c;假如不借助任何工具的情况下&#xff0c;我们一般是把JS代码进行扣取&#xff0c;本地进行加解密生成Payload&#xff0c;然后在Burpsuite里进行Payload替换。这种方式就存在一个很明显的问…

机器学习:什么是监督学习和无监督学习

目录 一、监督学习 &#xff08;一&#xff09;回归 &#xff08;二&#xff09;分类 二、无监督学习 聚类 一、监督学习 介绍&#xff1a;监督学习是指学习输入到输出&#xff08;x->y&#xff09;映射的机器学习算法&#xff0c;监督即理解为&#xff1a;已知正确答案…

【Web前端开发基础】CSS的定位和装饰

CSS的定位和装饰 目录 CSS的定位和装饰一、学习目标二、文章内容2.1 定位2.1.1 定位的基本介绍2.1.2 定位的基本使用2.1.3 静态定位2.1.4 相对定位2.1.5 绝对定位2.1.6 子绝父相2.1.7 固定定位2.1.8元素的层级关系 2.2 装饰2.2.1 垂直对齐方式2.2.2 光标类型2.2.3 边框圆角2.2.…

Keepalived + Nginx双主架构

Keepalived Nginx双主架构 环境准备&#xff1a; keepalived_master1服务器nginx&#xff1a;172.20.26.167 keepalived_master2服务器nginx&#xff1a;172.20.26.198 各服务器关闭selinux、防火墙等服务。 开机安装部署nginx 在172.20.26.167服务器上 yum install ngi…

基于ADAS的车道线检测算法matlab仿真

目录 1.课题概述 2.系统仿真结果 3.核心程序与模型 4.系统原理简介 4.1 图像预处理 4.2 车道线特征提取 4.3 车道线跟踪 5.完整工程文件 1.课题概述 基于ADAS的车道线检测算法,通过hough变换和边缘检测方法提取视频样板中的车道线&#xff0c;然后根据车道线的弯曲情况…

在Excel中批量添加前后缀的三种方法,总有一种适合 你

你可以使用高级Excel函数将前缀和后缀快速应用于列。在使用大型电子表格时,为每个单元格添加后缀或前缀可能会花费很长时间,并使你疲惫不堪。 在这里,你可以通过几种快速简单的方式添加后缀或前缀,从而减少所需的手动操作。​我们将通过三种不同的方法向Excel电子表格添加…