如何通过 Apache Airflow 将数据导入 Elasticsearch

作者:来自 Elastic Andre Luiz

了解如何通过 Apache Airflow 将数据导入 Elasticsearch。

Apache Airflow

Apache Airflow 是一个旨在创建、安排(schedule)和监控工作流的平台。它用于编排 ETL(Extract-Transform-Load) 流程、数据管道和其他复杂工作流,提供灵活性和可扩展性。它的可视化界面和实时监控功能使管道管理更易于访问和高效,让你可以跟踪执行的进度和结果。以下是它的四个主要支柱:

  • 动态:管道以 Python 定义,允许动态灵活地生成工作流。
  • 可扩展:Airflow 可以与各种环境集成,可以创建自定义运算符,并可以根据需要执行特定代码。
  • 优雅:管道以干净明确的方式编写。
  • 可扩展:其模块化架构使用消息队列来编排任意数量的工作器。

在实践中,Airflow 可用于以下场景:

  • 数据导入:编排将数据每日提取到 Elasticsearch 等数据库中。
  • 日志监控:管理日志文件的收集和处理,然后在 Elasticsearch 中进行分析以识别错误或异常。
  • 多种数据源集成:将来自不同系统(API、数据库、文件)的信息合并到 Elasticsearch 中的单个层中,简化搜索和报告。

DAG:Directed Acyclic Graphs - 有向无环图

在 Airflow 中,工作流由 DAG(有向无环图)表示。DAG 是一种定义任务执行顺序的结构。DAG 的主要特征是:

  • 由独立任务组成:每个任务代表一个工作单元,旨在独立执行。
  • 排序:任务的执行顺序在 DAG 中明确定义。
  • 可重用性:DAG 旨在重复执行,促进流程自动化。

Airflow 的主要组件

Airflow 生态系统由多个组件组成,它们共同协作以协调任务:

  • 调度程序 - scheduler:负责调度 DAG 并发送任务以供工作人员执行。
  • 执行器 - Exectutor:管理任务的执行,将其委托给工作人员。
  • Web 服务器 - Webserver:提供与 DAG 和任务交互的图形界面。
  • Dags 文件夹 - Dags folder:我们存储用 Python 编写的 DAG 的文件夹。
  • 元数据 - Metadata:作为工具存储库的数据库,由调度程序和执行器用于存储执行状态。

Apache Airflow 和 Elasticsearch

我们将演示如何使用 Apache Airflow 和 Elasticsearch 来协调任务并在 Elasticsearch 中索引结果。此演示的目标是创建一个任务管道来更新 Elasticsearch 索引中的记录。此索引包含电影数据库,用户可以在其中进行评分和分配评级。想象一个每天有数百个评级的场景,有必要保持评级记录更新。为此,将开发一个 DAG,它将每天执行,负责检索新的合并评级并更新索引中的记录。

在 DAG 流程中,我们将有一个获取评级的任务,然后是一个验证结果的任务。如果数据不存在,DAG 将被定向到失败任务。否则,数据将在 Elasticsearch 中编入索引。目标是通过一种带有负责计算分数的机制的方法检索评级,以更新索引中电影的评级字段。

使用 Apache Airflow 和 Elasticsearch 以及 Docker

要创建容器化环境,我们将使用 Apache Airflow 和 Docker。按照 “在 Docker 中运行 Airflow” 指南中的说明实际设置 Airflow。

至于 Elasticsearch,我将使用 Elastic Cloud 上的集群,但如果你愿意,也可以使用 Docker 配置 Elasticsearch。已经创建了一个包含电影目录的索引,其中电影数据已编入索引。这些电影的 “rating” 字段将被更新。

创建 DAG

通过 Docker 安装后,将创建一个文件夹结构,其中包括 dags 文件夹,我们必须将 DAG 文件放在该文件夹中,以便 Airflow 识别它们。

在此之前,我们需要确保安装了必要的依赖项。以下是此项目的依赖项:

pip install apache-airflow apache-airflow-providers-elasticsearch

我们将创建文件 update_ratings_movies.py 并开始编写任务代码。

现在,让我们导入必要的库:

from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.providers.elasticsearch.hooks.elasticsearch import ElasticsearchPythonHook

我们将使用 ElasticsearchPythonHook,这是一个通过抽象连接和使用外部 API 来简化 Airflow 和 Elasticsearch 集群之间集成的组件。

接下来,我们定义 DAG,并指定其主要参数:

  • dag_id:DAG 的名称。
  • start_date:DAG 的启动时间。
  • schedule:定义周期(在我们的例子中是每日)。
  • doc_md:将导入并显示在 Airflow 界面中的文档。

定义任务

现在,让我们定义 DAG 的任务。第一个任务将负责检索电影评级数据。我们将使用 PythonOperator,并将 task_id 设置为“get_movie_ratings”。python_callable 参数将调用负责获取 ratings 的函数。

get_ratings_operator = PythonOperator(
   task_id='get_movie_ratings',
   python_callable=get_movie_ratings_task
)

接下来,我们需要验证结果是否有效。为此,我们将使用带有 BranchPythonOperator 的条件。task_id 将为 “validate_result”,python_callable 将调用验证函数。op_args 参数将用于将上一个任务 “get_movie_ratings” 的结果传递给验证函数。

validate_result = BranchPythonOperator(
   task_id='validate_result',
   python_callable=validate_result,
   op_args=["{
  
  { task_instance.xcom_pull(task_ids='get_movie_ratings') }}"]
)

如果验证成功,我们将从 “get_movie_ratings” 任务中获取数据并将其索引到 Elasticsearch 中。为此,我们将创建一个新任务 “index_movie_ratings”,它将使用 PythonOperator。op_args 参数将 “get_movie_ratings” 任务的结果传递给索引函数。

index_ratings_operator = PythonOperator(
   task_id='index_movie_ratings',
   python_callable=index_movie_ratings_task,
   op_args=["{
  
  { task_instance.xcom_pull(task_ids='get_movie_ratings') }}"]
)

如果验证表明失败,DAG 将继续执行失败通知任务。在此示例中,我们只是打印一条消息,但在实际场景中,我们可以配置警报来通知失败。

failed_get_rating_operator = PythonOperator(
   task_id='failed_get_rating_operator',
   python_callable=lambda: print('Ratings were False, skipping indexing.')
)

最后,我们定义任务依赖关系,确保它们以正确的顺序执行:

get_ratings_operator >> validate_result >> [index_ratings_operator, failed_get_rating_operator]

以下是我们 DAG 的完整代码:

"""
DAG update Rating Movies
"""
import ast
import random

from airflow import DAG
from datetime import datetime

from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.providers.elasticsearch.hooks.elasticsearch import ElasticsearchPythonHook


def index_movie_ratings_task(movies):
   es_hook = ElasticsearchPythonHook(hosts=None,
                                     es_conn_args={
                                         "cloud_id": "cloud_id"
                                         "api_key": "api-key"
                                     })
   es_client = es_hook.get_conn
   actions = []
   for movie in ast.literal_eval(movies):
       actions.append(
           {
               "update": {
                   "_id": movie["id"],
                   "_index": "movies"
               }
           }
       )
       actions.append(
           {
               "doc": {
                   "rating": movie["rating"]
               },
               "doc_as_upsert": True
           }
       )
   result = es_client.bulk(operations=actions)
   print(f"Ingestion completed.")
   print(result)
   return True


def get_movie_ratings_task():
   movies = [
       {"id": i, "rating": round(random.uniform(1, 10), 1)}
       for i in range(1, 100)
   ]
   return movies

def validate_result(result):
   if not result:
       return 'failed_get_rating_operator'
   else:
       return 'index_movie_ratings'


with DAG(
       dag_id="update_ratings_movies_2024",
       start_date=datetime(2024, 12, 29),
       schedule="@daily",
       doc_md=__doc__,
):
   get_ratings_operator = PythonOperator(
       task_id='get_movie_ratings',
       python_callable=get_movie_ratings_task
   )

   validate_result = BranchPythonOperator(
       task_id='validate_result',
       python_callable=validate_result,
       op_args=["{
  
  { task_instance.xcom_pull(task_ids='get_movie_ratings') }}"],
       provide_context=True
   )

   index_ratings_operator = PythonOperator(
       task_id='index_movie_ratings',
       python_callable=index_movie_ratings_task,
       op_args=["{
  
  { task_instance.xcom_pull(task_ids='get_movie_ratings') }}"]
   )

   failed_get_rating_operator = PythonOperator(
       task_id='failed_get_rating_operator',
       python_callable=lambda: print('Ratings were False, skipping indexing.')
   )

get_ratings_operator >> validate_result >> [index_ratings_operator, failed_get_rating_operator]

可视化 DAG 执行

在 Apache Airflow 界面中,我们可以可视化 DAG 的执行。只需转到 “DAG” 选项卡并找到你创建的 DAG 即可。

下面,我们可以直观地看到任务的执行情况及其各自的状态。通过选择特定日期的执行,我们可以访问每个任务的日志。请注意,在 index_movie_ratings 任务中,我们可以在索引中看到索引结果,并且它已成功完成。

在其他选项卡中,可以访问有关任务和 DAG 的其他信息,以协助分析和解决潜在问题。

结论

在本文中,我们演示了如何将 Apache Airflow 与 Elasticsearch 集成以创建数据提取解决方案。我们展示了如何配置 DAG、定义负责检索、验证和索引电影数据的任务,以及如何在 Airflow 界面中监控和可视化这些任务的执行。

这种方法可以轻松适应不同类型的数据和工作流,使 Airflow 成为在各种场景中编排数据管道的有用工具。

参考资料:

Apache AirFlow

  • https://airflow.apache.org/

使用 Docker 安装 Apache Airflow

  • https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html

Elasticsearch Python Hook

  • https://airflow.apache.org/docs/apache-airflow-providers-elasticsearch/stable/hooks/elasticsearch_python_hook.html

Python 运算符

  • https://airflow.apache.org/docs/apache-airflow/stable/howto/operator/python.html

想要获得 Elastic 认证?了解下一期 Elasticsearch 工程师培训何时开始!

Elasticsearch 包含许多新功能,可帮助你为你的用例构建最佳搜索解决方案。深入了解我们的示例笔记本以了解更多信息,开始免费云试用,或立即在吗的本地机器上试用 Elastic。

原文:How to ingest data to Elasticsearch through Apache Airflow - Elasticsearch Labs

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/955892.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

电脑风扇声音大怎么办? 原因及解决方法

电脑风扇是电脑的重要组件之一,它的作用是为电脑的各个部件提供冷却,防止电脑过热。然而,有时候我们会发现电脑风扇的声音特别大,不仅影响我们的使用体验,也可能是电脑出现了一些问题。那么,电脑风扇声音大…

Oracle报错ORA-01078、LRM-00109

虚拟机异常关机后,rac数据库备机无法启动数据库,报错如下 解决方法: 找到如下路径文件 执行: cp init.ora.016202516818 /u01/app/oracle/product/19.3.0/db/dbs/ mv init.ora.016202516818 initplm2.ora 再次进入命令行sqlpl…

.Net Core微服务入门系列(一)——项目搭建

系列文章目录 1、.Net Core微服务入门系列(一)——项目搭建 2、.Net Core微服务入门全纪录(二)——Consul-服务注册与发现(上) 3、.Net Core微服务入门全纪录(三)——Consul-服务注…

Ability Kit-程序框架服务(类似Android Activity)

文章目录 Ability Kit(程序框架服务)简介Stage模型开发概述Stage模型应用组件应用/组件级配置UIAbility组件概述概述声明配置 生命周期概述生命周期状态说明Create状态WindowStageCreate**和**WindowStageDestroy状态WindowStageWillDestroy状态Foregrou…

Harmony OS 5.0.1 模拟器报未开启 Hyper-V解决方法

程序员Feri一名12年的程序员,做过开发带过团队创过业,擅长Java、嵌入式、鸿蒙、人工智能等,专注于程序员成长那点儿事,希望在成长的路上有你相伴!君志所向,一往无前! 今天在写Harmony NEXT版本的元服务的时候,突然模拟器无法启动了&#xff0…

WPS数据分析000004

目录 一、表格阅读技巧 冻结窗格 拆分窗口 新建窗口 阅读模式 护眼模式 二、表格打印技巧 打印预览 打印缩放 打印区域 打印标题 分页打印 打印位置 页眉页脚 逐份打印 三、表格保护技巧 锁定单元格 隐藏公式 文档权限 文件加密 一、表格阅读技巧 冻结窗…

LabVIEW桥接传感器数据采集与校准程序

该程序设计用于采集来自桥接传感器的数据,执行必要的设置(如桥接配置、信号采集参数、时间与触发设置),并进行适当的标定和偏移校正,最终通过图表呈现采集到的数据信息。程序包括多个模块,用于配置通道、触…

2025西湖论剑-babytrace

前言 就做了下题目,pwn1/3 都是签到,pwn2 后面绕 ptrace 有点意思,简单记录一下 漏洞分析 子进程中的读/写功能没有检查负数的情况,存在越界读写: void __fastcall get_value(__int64 *int64_arr) {__int64 ll; //…

【统计的思想】假设检验(一)

假设检验是统计学里的重要方法,同时也是一种“在理想与现实之间观察求索”的测试活动。假设检验从概率的角度去考察理想与现实之间的关系,籍此来缓解测试可信性问题。 我们先来看一个例子。民航旅客服务系统,简称PSS系统,有一种业…

GPT-5 传言:一场正在幕后发生的 AI 变革

新的一年,让我们从一个引人入胜的话题开始:如果我告诉你,GPT-5 并非虚构,而是真实存在呢?它不仅真实存在,而且正在你看不见的地方悄然塑造着世界。我的基本假设是:OpenAI 已经秘密开发出 GPT-5&…

【20】Word:小许-质量管理-论文❗

目录 题目​ NO1.2.3.4.5 NO6.7 NO8 NO9 NO10.11 题目 NO1.2.3.4.5 另存为“Word.docx”文件在考生文件夹下,F12Fn是另存为的作用布局→页面设置对话框→纸张:大小A4→页边距:上下左右不连续ctrl选择除表格外的所有内容→开始→字体对…

Leetcode - 周赛432

目录 一、3417. 跳过交替单元格的之字形遍历二、3418. 机器人可以获得的最大金币数三、3419. 图的最大边权的最小值四、3420. 统计 K 次操作以内得到非递减子数组的数目 一、3417. 跳过交替单元格的之字形遍历 题目链接 本题是一道模拟题,第一行走0,2&…

ASP.NET Core - 配置系统之配置提供程序

ASP.NET Core - 配置系统之配置提供程序 3. 配置提供程序3.1 文件配置提供程序3.1.1 JSON配置提供程序3.1.2 XML配置提供程序3.1.3 INI配置提供程序 3.2 环境变量配置提供程序3.3 命令行配置提供程序3.4 内存配置提供程序3.5 配置加载顺序 3.6 默认配置来源 3. 配置提供程序 前…

探索与创作:2024年CSDN平台上的成长与突破

文章目录 我与CSDN的初次邂逅初学阶段的阅读CSDN:编程新手的避风港初学者的福音:细致入微的知识讲解考试复习神器:技术总结的“救命指南”曾经的自己:为何迟迟不迈出写博客的第一步兴趣萌芽:从“读”到“想写”的初体验…

CSS中样式继承+优先级

继承属性和非继承属性 一、定义及分类 1、继承属性是指在父元素上设置了这些属性后,子元素会自动继承这些属性的值,除非子元素显式地设置了不同的值。 常见的继承属性: 字体 font 系列文本text-align text-ident line-height letter-spacing颜色 col…

macOS 安装JDK17

文章目录 前言介绍新特性下载安装1.下载完成后打开downloads 双击进行安装2.配置环境变量3.测试快速切换JDK 小结 前言 近期找开源软件,发现很多都已经使用JDK17springboot3 了,之前的JDK8已经被替换下场,所以今天就在本机安装了JDK17&#…

ChatGPT大模型极简应用开发-CH1-初识 GPT-4 和 ChatGPT

文章目录 1.1 LLM 概述1.1.1 语言模型和NLP基础1.1.2 Transformer及在LLM中的作用1.1.3 解密 GPT 模型的标记化和预测步骤 1.2 GPT 模型简史:从 GPT-1 到 GPT-41.2.1 GPT11.2.2 GPT21.2.3 GPT-31.2.4 从 GPT-3 到 InstructGPT1.2.5 GPT-3.5、Codex 和 ChatGPT1.2.6 …

vector迭代器的使用以及迭代器失效

一、iterator的使用注意 begin与end 遵循左闭右开的原则,begin 指向vector的第一个元素,end 指向vector的最后一个元素的往下一个位置。 rbegin 与 rend rbegin指向最后一个元素的位置,rend指向第一个元素的往前一个位置。 二、vector的常…

【Linux】15.Linux进程概念(4)

文章目录 程序地址空间前景回顾C语言空间布局图:代码1代码2代码3代码4代码5代码6代码7 程序地址空间前景回顾 历史核心问题: pid_t id fork(); if(id 0) else if(id>0) 为什么一个id可以放两个值呢?之前没有仔细讲。 C语言空间布局图&am…

【无法下载github文件】虚拟机下ubuntu无法拉取github文件

修改hosts来进行解决。 步骤一:打开hosts文件 sudo vim /etc/hosts步骤二:查询 github.com的ip地址 https://sites.ipaddress.com/github.com/#ipinfo将github.com的ip地址添加到hosts文件末尾,如下所示。 140.82.114.3 github.com步骤三…