如何通过 Kafka 将数据导入 Elasticsearch

作者:来自 Elastic Andre Luiz

将 Apache Kafka 与 Elasticsearch 集成的分步指南,以便使用 Python、Docker Compose 和 Kafka Connect 实现高效的数据提取、索引和可视化。

在本文中,我们将展示如何将 Apache Kafka 与 Elasticsearch 集成以进行数据提取和索引。我们将概述 Kafka、其生产者(producers)和消费者(consumers)的概念,并创建一个日志索引,其中将通过 Apache Kafka 接收和索引消息。该项目以 Python 实现,代码可在 GitHub 上找到。

先决条件

  • Docker 和 Docker Compose:确保你的机器上安装了 Docker 和 Docker Compose。
  • Python 3.x:运行生产者和消费者脚本。

Apache Kafka 简介

Apache Kafka 是一个分布式流媒体平台,具有高可扩展性和可用性以及容错能力。在 Kafka 中,数据管理通过主要组件进行:

  • Broker/代理:负责在生产者和消费者之间存储和分发消息。
  • Zookeeper:管理和协调 Kafka 代理,控制集群的状态、分区领导者和消费者信息。
  • Topics/主题:发布和存储数据以供使用的渠道。
  • Consumers 及 Producers/消费者和生产者:生产者向主题发送数据,而消费者则检索该数据。

这些组件共同构成了 Kafka 生态系统,为数据流提供了强大的框架。

项目结构

为了理解数据提取过程,我们将其分为几个阶段:

  • 基础设施配置/Infrastructure Provisioning:设置 Docker 环境以支持 Kafka、Elasticsearch 和 Kibana。
  • 创建生产者/Producer Creation:实现 Kafka 生产者,将数据发送到日志主题。
  • 创建消费者/Consumer Creation:开发 Kafka 消费者以读取和索引 Elasticsearch 中的消息。
  • 提取验证/Ingestion Validation:验证和确认已发送和已使用的数据。

使用 Docker Compose 进行基础设施配置

我们利用 Docker Compose 来配置和管理必要的服务。下面,你将找到 Docker Compose 代码,它设置了 Apache Kafka、Elasticsearch 和 Kibana 集成所需的每项服务,确保数据提取过程。

docker-compose.yml

version: "3"

services:

  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    image: confluentinc/cp-kafka:latest
    container_name: kafka
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "9094:9094"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST:${HOST_IP}:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:8.15.1
    container_name: elasticsearch-8.15.1
    environment:
      - node.name=elasticsearch
      - xpack.security.enabled=false
      - discovery.type=single-node
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    volumes:
      - ./elasticsearch:/usr/share/elasticsearch/data
    ports:
      - 9200:9200

  kibana:
    image: docker.elastic.co/kibana/kibana:8.15.1
    container_name: kibana-8.15.1
    ports:
      - 5601:5601
    environment:
      ELASTICSEARCH_URL: http://elasticsearch:9200
      ELASTICSEARCH_HOSTS: '["http://elasticsearch:9200"]'

你可以直接从 Elasticsearch Labs GitHub repo 访问该文件。

使用 Kafka 生产器发送数据

生产器负责将消息发送到日志主题。通过批量发送消息,可以提高网络使用效率,允许使用 batch_size 和 linger_ms 设置进行优化,这两个设置分别控制批次的数量和延迟。配置 acks='all' 可确保消息持久存储,这对于重要的日志数据至关重要。

producer = KafkaProducer(
   bootstrap_servers=['localhost:9092'],  # Specifies the Kafka server to connect
   value_serializer=lambda x: json.dumps(x).encode('utf-8'),  # Serializes data as JSON and encodes it to UTF-8 before sending
   batch_size=16384,     # Sets the maximum batch size in bytes (here, 16 KB) for buffered messages before sending
   linger_ms=10,         # Sets the maximum delay (in milliseconds) before sending the batch
   acks='all'            # Specifies acknowledgment level; 'all' ensures message durability by waiting for all replicas to acknowledge
)


def generate_log_message():
   levels = ["INFO", "WARNING", "ERROR", "DEBUG"]
   messages = [
       "User login successful",
       "User login failed",
       "Database connection established",
       "Database connection failed",
       "Service started",
       "Service stopped",
       "Payment processed",
       "Payment failed"
   ]
   log_entry = {
       "level": random.choice(levels),
       "message": random.choice(messages),
       "timestamp": time.time()
   }
   return log_entry

def send_log_batches(topic, num_batches=5, batch_size=10):
   for i in range(num_batches):
       logger.info(f"Sending batch {i + 1}/{num_batches}")
       for _ in range(batch_size):
           log_message = generate_log_message()
           producer.send(topic, value=log_message)
       producer.flush()


if __name__ == "__main__":
   topic = "logs"
   send_log_batches(topic)
   producer.close()

当启动 producer 的时候,会批量的向 topic 发送消息,如下图:

INFO:kafka.conn:Set configuration …
INFO:log_producer:Sending batch 1/5 
INFO:log_producer:Sending batch 2/5
INFO:log_producer:Sending batch 3/5
INFO:log_producer:Sending batch 4/5

使用 Kafka Consumer 消费和索引数据

Consumer 旨在高效处理消息,消费来自日志主题的批次并将其索引到 Elasticsearch 中。使用 auto_offset_reset='latest',可确保 Consumer 开始处理最新消息,忽略较旧的消息,max_poll_records=10 将批次限制为 10 条消息。使用 fetch_max_wait_ms=2000,Consumer 最多等待 2 秒以积累足够的消息,然后再处理批次。

在其主循环中,Consumer 消费日志消息、处理并将每个批次索引到 Elasticsearch 中,确保持续的数据摄取。

consumer = KafkaConsumer(
   'logs',                               
   bootstrap_servers=['localhost:9092'],
   auto_offset_reset='latest',            # Ensures reading from the latest offset if the group has no offset stored
   enable_auto_commit=True,               # Automatically commits the offset after processing
   group_id='log_consumer_group',         # Specifies the consumer group to manage offset tracking
   max_poll_records=10,                   # Maximum number of messages per batch
   fetch_max_wait_ms=2000                 # Maximum wait time to form a batch (in ms)
)

def create_bulk_actions(logs):
   for log in logs:
       yield {
           "_index": "logs",
           "_source": {
               'level': log['level'],
               'message': log['message'],
               'timestamp': log['timestamp']
           }
       }

if __name__ == "__main__":
   try:
       print("Starting message processing…")
       while True:

           messages = consumer.poll(timeout_ms=1000)  # Poll receive messages

           # process each batch messages
           for _, records in messages.items():
               logs = [json.loads(record.value) for record in records]
               bulk_actions = create_bulk_actions(logs)
               response = helpers.bulk(es, bulk_actions)
               print(f"Indexed {response[0]} logs.")
   except Exception as e:
       print(f"Erro: {e}")
   finally:
       consumer.close()
       print(f"Finish")

在 Kibana 中可视化数据

借助 Kibana,我们可以探索和验证从 Kafka 提取并在 Elasticsearch 中编入索引的数据。通过访问 Kibana 中的开发工具,你可以查看已编入索引的消息并确认数据符合预期。例如,如果我们的 Kafka 生产者发送了 5 个批次,每个批次 10 条消息,我们应该在索引中看到总共 50 条记录。

要验证数据,你可以在 Dev Tools 部分使用以下查询:

GET /logs/_search
{
  "query": {
    "match_all": {}
  }
}

相应:

此外,Kibana 还提供了创建可视化和仪表板的功能,可帮助使分析更加直观和具有交互性。下面,你可以看到我们创建的一些仪表板和可视化示例,它们以各种格式展示了数据,增强了我们对所处理信息的理解。

使用 Kafka Connect 进行数据提取

Kafka Connect 是一种旨在促进数据源和目标(接收器)之间的集成的服务,例如数据库或文件系统。它使用预定义的连接器来自动处理数据移动。在我们的例子中,Elasticsearch 充当数据接收器。

使用 Kafka Connect,我们可以简化数据提取过程,无需手动将数据提取工作流实施到 Elasticsearch 中。借助适当的连接器,Kafka Connect 允许将发送到 Kafka 主题的数据直接在 Elasticsearch 中编入索引,只需进行最少的设置,无需额外编码。

使用 Kafka Connect

要实现 Kafka Connect,我们将 kafka-connect 服务添加到我们的 Docker Compose 设置中。此配置的一个关键部分是安装 Elasticsearch 连接器,它将处理数据索引。

配置服务并创建 Kafka Connect 容器后,将需要一个 Elasticsearch 连接器的配置文件。此文件定义基本参数,例如:

  • connection.url:Elasticsearch 的连接 URL。
  • topics:连接器将监视的 Kafka 主题(在本例中为 “logs”)。
  • type.name:Elasticsearch 中的文档类型(通常为 _doc)。
  • value.converter:将 Kafka 消息转换为 JSON 格式。
  • value.converter.schemas.enable:指定是否应包含架构。
  • schema.ignorekey.ignore:在索引期间忽略 Kafka 架构和键的设置。

以下是在 Kafka Connect 中创建 Elasticsearch 连接器的 curl 命令:

curl --location '{{url}}/connectors' \
--header 'Content-Type: application/json' \
--data '{
    "name": "elasticsearch-sink-connector",
    "config": {
        "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
        "topics": "logs",
        "connection.url": "http://elasticsearch:9200",
        "type.name": "_doc",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "false",
        "schema.ignore": "true",
        "key.ignore": "true"
    }
}'

通过此配置,Kafka Connect 将自动开始提取发送到 “logs” 主题的数据并在 Elasticsearch 中对其进行索引。这种方法允许完全自动化的数据提取和索引,而无需额外的编码,从而简化整个集成过程。

结论

集成 Kafka 和 Elasticsearch 为实时数据提取和分析创建了一个强大的管道。本指南提供了一种构建强大数据提取架构的基础方法,并在 Kibana 中实现无缝可视化和分析,以适应未来更复杂的要求。

此外,使用 Kafka Connect 使 Kafka 和 Elasticsearch 之间的集成更加简化,无需额外的代码来处理和索引数据。Kafka Connect 使发送到特定主题的数据能够以最少的配置自动在 Elasticsearch 中编入索引。

想要获得 Elastic 认证?了解下一次 Elasticsearch 工程师培训的时间!

Elasticsearch 包含许多新功能,可帮助你为你的用例构建最佳搜索解决方案。深入了解我们的示例笔记本以了解更多信息,开始免费云试用,或立即在本地机器上试用 Elastic。

原文:How to ingest data to Elasticsearch through Kafka - Elasticsearch Labs

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/943123.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

深入浅出:AWT的基本组件及其应用

目录 前言 1. AWT简介 2. AWT基本组件 2.1 Button:按钮 2.2 Label:标签 ​编辑 2.3 TextField:文本框 2.4 Checkbox:复选框 2.5 Choice:下拉菜单 2.6 List:列表 综合案例 注意 3. AWT事件处理 …

Go Energy 跨平台框架 v2.5.1 发布

Energy 框架 是Go语言基于CEF 和 LCL 开发的跨平台 GUI 框架, 具体丰富的系统原生 UI 控件集, 丰富的 CEF 功能 API,简化且不失功能的 CEF 功能 API 使用。 特性? 特性描述跨平台支持 Windows, macOS, Linux简单Go语言的简单特性,使用简单…

JS 异步 ( 一、异步概念、Web worker 基本使用 )

文章目录 异步代码异步执行概念ES6 之前的异步 Web worker 异步 代码异步执行概念 通常代码是自上而下同步执行的,既后面的代码必须等待前面的代码执行完才会执行,而异步执行则是将主线程中的某段代码交由子线程去执行,当交给子线程后&…

机器学习(二)-简单线性回归

文章目录 1. 简单线性回归理论2. python通过简单线性回归预测房价2.1 预测数据2.2导入标准库2.3 导入数据2.4 划分数据集2.5 导入线性回归模块2.6 对测试集进行预测2.7 计算均方误差 J2.8 计算参数 w0、w12.9 可视化训练集拟合结果2.10 可视化测试集拟合结果2.11 保存模型2.12 …

Java字符串操作利器:StringBuffer与StringBuilder类详解

在处理字符串变更时,StringBuffer和StringBuilder类是优选工具。与String类不同,StringBuffer和StringBuilder允许对象被多次修改,而不会生成新的未使用对象。 StringBuilder类自Java 5起引入,其与StringBuffer的主要区别在于Stri…

软件确认测试报告的内容和作用简析

软件确认测试报告是对软件确认测试过程及结果的正式记录,是评估软件质量的重要依据。它不仅对开发团队起到反馈作用,更是决策层判断软件是否可以交付的重要参考。 一、软件确认测试报告包括的内容   1、测试目的:明确此次测试的目的和所要…

结构体(初阶)

结构体: 结构体类型的声明 结构体初始化 结构成员访问 结构体传参 1.结构体的声明 1.1结构的基础知识 结构是一些值的集合,这些值称为成员变量。结构的每个成员可以是不同类型的变量。 1.2结构的声明 struct tag { member - list; }variable-lis…

详解VHDL如何编写Testbench

1.概述 仿真测试平台文件(Testbench)是可以用来验证所设计的硬件模型正确性的 VHDL模型,它为所测试的元件提供了激励信号,可以以波形的方式显示仿真结果或把测试结果存储到文件中。这里所说的激励信号可以直接集成在测试平台文件中,也可以从…

React 第二十节 useRef 用途使用技巧注意事项详解

简述 useRef 用于操作不需要在视图上渲染的属性数据,用于访问真实的DOM节点,或者React组件的实例对象,允许直接操作DOM元素或者是组件; 写法 const inpRef useRef(params)参数: useRef(params),接收的 …

SQL子查询和having实例

有2个表如下;一个是站点信息,一个是站点不同时间的访问量, 现在要获取总访问量大于200的网站; 先执行如下sql,不包括having子句看一下,获得的是所有站点的总访问量; 这应是一个子查询&#xf…

【seatunnel】数据同步软件安装

【seatunnel】数据同步软件安装 下载 wget https://dlcdn.apache.org/seatunnel/2.3.8/apache-seatunnel-2.3.8-bin.tar.gz wget https://dlcdn.apache.org/seatunnel/seatunnel-web/1.0.2/apache-seatunnel-web-1.0.2-bin.tar.gz1、安装seatunnel Server 解压 tar zxvf ap…

散斑/横向剪切/迈克尔逊/干涉条纹仿真技术分析

摘要 本博文提供了多种数据类型的干涉条纹仿真,并展示了它们对应的散斑干涉条纹。还分别给出了横向剪切干涉以及剪切散斑干涉条纹的仿真。 一、迈克尔逊干涉与散斑干涉仿真 下图为干涉条纹与对应的散斑干涉条纹的仿真示意图。其中,干涉条纹可认为是源…

如何通过采购管理系统实现智能化采购?

随着人工智能、大数据等技术的快速发展,采购管理逐步迈入智能化时代。智能化采购不仅提升了效率,还为企业提供了更精准的采购决策支持。本文将从智能化采购的优势出发,探讨采购管理系统如何助力企业实现这一目标。 文中用到的采购管理系统&a…

【论文阅读笔记】IC-Light

SCALING IN-THE-WILD TRAINING FOR DIFFUSION-BASED ILLUMINATION HARMONIZATION AND EDITING BY IMPOSING CONSISTENT LIGHT TRANSPORT 通过施加一致的光线传输来扩展基于扩散模型的真实场景光照协调与编辑训练 前言摘要引言相关工作基于学习的基于扩散模型的外观和光照操纵光…

594: Maximum Tape Utilization Ratio

解法&#xff1a; 对于该题有以下错误&#xff08;敬希评论区指正 1.dp定义在全局会wa struct node {int count; // 当前容量下能够存储的程序数量int sum; // 当前容量下所占用的磁带长度vector<int> path; // 当前容量下选择的程序的路径&#xff08;存放的程序…

流量主微信小程序工具类去水印

工具类微信小程序流量主带后台管理&#xff0c;可开通广告&#xff0c;带自有后台管理&#xff0c;不借助第三方接口 介绍 支持抖音&#xff0c;小红书&#xff0c;哔哩哔哩视频水印去除&#xff0c;功能实现不借助第三方平台。可实现微信小程序流量主广告变现功能&#xff0c…

04软件测试需求分析案例-用户登录

通读文档&#xff0c;提取信息&#xff0c;提出问题&#xff0c;整理为需求。 从需求规格说明、设计说明、配置说明等文档获取原始需求&#xff0c;通读原始需求&#xff0c;分析有哪些功能&#xff0c;每种功能要完成什么业务&#xff0c;业务该如何实现&#xff0c;业务逻辑…

DX12 快速教程(2) —— 渲染天蓝色窗口

快速导航 新建项目 "002-DrawSkyblueWindow"DirectX 12 入门1. COM 技术&#xff1a;DirectX 的中流砥柱什么是 COM 技术COM 智能指针 2.创建 D3D12 调试层设备&#xff1a;CreateDebugDevice什么是调试层如何创建并使用调试层 3.创建 D3D12 设备&#xff1a;CreateD…

《计算机组成及汇编语言原理》阅读笔记:p116-p120

《计算机组成及汇编语言原理》学习第 7 天&#xff0c;p116-p120 总结&#xff0c;总计 5 页。 一、技术总结 1.CPU优化 (1)increase overall performance number 例如&#xff1a;16位电脑提升到32位电脑。 (2)multiprocessing One way to make computers more useful i…

【蓝桥杯每日一题】12.18

&#x1f3dd;️专栏&#xff1a; 【蓝桥杯备篇】 &#x1f305;主页&#xff1a; f狐o狸x 从今天开始&#xff0c;笨狐狸&#xff0c;啊呸&#xff0c;本狐狸要开始漫长的蓝桥杯备战啦&#xff0c;将会长期更新每日一题这个专栏&#xff0c;直到蓝桥杯结束&#xff0c;各位一起…