使用 Confluent Cloud 的 Elasticsearch Connector 部署 Elastic Agent

作者:来自 Elastic Nima Rezainia

Confluent Cloud 用户现在可以使用更新后的 Elasticsearch Sink Connector 与 Elastic Agent 和 Elastic Integrations 来实现完全托管且高度可扩展的数据提取架构。

Elastic 和 Confluent 是关键的技术合作伙伴,我们很高兴宣布对该合作伙伴关系进行新的投资。Confluent 的 Kafka 是许多企业摄取架构的关键组件,它确保客户能够保证将关键的可观察性和安全性数据传输到他们的 Elasticsearch 集群中。我们一直致力于改进我们产品的结合方式。借助 Elastic Agent 的新 Kafka 输出和 Confluent 新改进的 Elasticsearch Sink Connectors,无缝地从边缘收集数据、通过 Kafka 将其传输到 Elasticsearch 集群从未如此简单。

在这篇博客中,我们研究了一种将 Elastic Agent 与 Confluent Cloud 的 Kafka 产品集成的简单方法,以减轻摄取业务关键数据的运营负担。

Elastic Agent 和 Confluent Cloud 的优势

Elastic Agent 和 Confluent Cloud 的更新版 Elasticsearch Sink connector 结合使用时,可为各种规模的组织提供无数优势。这种组合解决方案可以灵活地以高效且有弹性的方式处理任何类型的数据采集工作负载。

完全托管

Elastic Cloud Serverless 和 Confluent Cloud 结合使用时,可为用户提供完全托管的服务。这使得部署和采集几乎无限量的数据变得毫不费力,而无需担心节点、集群或扩展。

完全支持 Elastic 集成

300 多个 Elastic 集成中的任何一个都完全支持通过 Kafka 发送数据。在这篇博文中,我们概述了如何在两个平台之间建立连接。这可确保你能从我们在内置警报、SLO、AI 助手等方面的投资中受益。

解耦架构

Kafka 充当数据源(如 Elastic Agent 和 Logstash)和 Elasticsearch 之间的弹性缓冲区,将数据生产者与消费者解耦。这可以显著降低总体拥有成本,因为你可以根据典型的数据摄取量(而不是最大摄取量)调整 Elasticsearch 集群的大小。它还可以确保系统在数据量激增时具有弹性。

完全控制你的数据

借助我们新的 “Output per Integration - 每个集成输出” 功能,客户现在可以使用同一代理将不同的数据发送到不同的目的地。客户可以轻松地将安全日志直接发送到 Confluent Cloud/Kafka,这可以提供交付保证,同时将不太重要的应用程序日志和系统指标直接发送到 Elasticsearch。

部署参考架构

在以下部分中,我们将引导你了解使用 Confluent Cloud 的 Elasticsearch Sink Connector 将 Confluent Kafka 与 Elastic Agent 和 Elasticsearch 集成的方法之一。与任何流式传输和数据收集技术一样,根据特定用例,可以通过多种方式配置管道。这篇博文将重点介绍一个简单的架构,可以将其用作更复杂部署的起点。

该架构的一些亮点包括:

  • Elastic Agents 上的动态 Kafka 主题选择
  • Elasticsearch Sink Connectors,用于从 Confluent Kafka 到 Elasticsearch 的完全托管传输
  • 利用 Elastic 的 300 多个集成处理数据


先决条件

在开始之前,请确保你在 Confluent Cloud 中部署了 Kafka 集群,在 Elastic Cloud 中部署了 Elasticsearch 集群或项目,并安装并注册了 Elastic Agent。

为 Elastic Agent 配置 Confluent Cloud Kafka 集群

导航到 Confluent Cloud 中的 Kafka 集群,然后选择 “Cluster Settings”。找到并记下 Bootstrap Server 地址,稍后在 Fleet 中创建 Kafka 输出时我们将需要此值。

导航到左侧导航菜单中的主题并创建两个主题:

  • 名为 logs 的主题
  • 名为 metrics 的主题

接下来,导航到左侧导航菜单中的 API Keys:

  1. 单击 + 添加 API Key
  2. 选择 Service Account  API key type
  3. 为此 API Key 提供一个有意义的名称
  4. 授予 metrics 和 logs 主题的密钥写入权限
  5. 创建密钥

请记录提供的 Key 和 Secret,我们稍后在 Fleet 中配置 Kafka 输出时会用到它们。

配置 Elasticsearch 和 Elastic Agent

在本节中,我们将配置 Elastic Agent 以将数据发送到 Confluent Cloud 的 Kafka 集群,并将配置 Elasticsearch 以便它可以从 Confluent Cloud Elasticsearch Sink Connector 接收数据。

配置 Elastic Agent 以将数据发送到 Confluent Cloud

Elastic Fleet 简化了向 Kafka 和 Confluent Cloud 发送数据的过程。使用 Elastic Agent,可以轻松将 Kafka “输出” 附加到来自代理的所有数据,也可以将其仅应用于来自特定数据源的数据。

在左侧导航中找到 Fleet,单击 “Settings” 选项卡。在 “Settings” 选项卡上,找到 “Output” 部分,然后单击 “dd Output”。

执行以下步骤来配置新的 Kafka output:

  • 为 output 提供 Name
  • 将 Type 设置为 Kafka
  • 使用我们之前记下的 Bootstrap Server 地址填充主机字段。
  • 在身份验证下,使用 API 密钥填充用户名,使用我们之前记下的密码填充密码

  • 在 Topics 下,选择 Dynamic Topic,并将 Topic from field 设置为 data_stream.type

  • 单击 “Save and apply settings”

接下来,我们将导航到 Fleet 中的 “Agent Policies” 选项卡,然后单击以编辑我们要将 Kafka 输出附加到的代理策略。打开代理策略后,单击 “Settings” 选项卡,然后将 Output for integrations 和 Output for agent monitoring 更改为我们刚刚创建的 Kafka 输出。

为每个 Elastic 集成选择一个输出:要设置用于特定数据源的 Kafka 输出,请参阅集成级输出文档。

关于主题选择的说明:data_stream.type 字段是一个保留字段,如果我们发送的数据是日志,Elastic Agent 会自动将其设置为 logs,如果我们发送的数据是指标,则设置为 metrics。使用 data_stream.type 启用 Dynamic Topic 选择将导致 Elastic Agent 自动将指标路由到 metrics 主题,并将日志路由到 logs 主题。有关主题选择的信息,请参阅 Kafka 输出的主题设置文档。

在 Elasticsearch 中配置发布端点

接下来,我们将为 Confluent Cloud Sink Connector 设置两个发布端点(数据流),以便在将文档发布到 Elasticsearch 时使用:

  • 我们将创建一个数据流 logs-kafka.reroute-default 用于处理 logs
  • 我们将创建一个数据流 metrics-kafka.reroute-default 用于处理 metrics

如果我们让这些数据流中的数据保持原样,数据虽然可用,但我们会发现数据未经解析且缺乏重要的丰富内容。因此,我们还将创建两个索引模板和两个摄取管道,以确保数据由我们的 Elastic Integrations 处理。

创建 Elasticsearch 索引模板和摄取管道

以下步骤使用 Kibana 中的 Dev Tools,但所有这些步骤都可以通过 REST API 或使用 Stack Management 中的相关用户界面完成。

首先,我们将创建用于处理 logs 的索引模板和摄取管道:

PUT _index_template/logs-kafka.reroute
{
  "template": {
    "settings": {
      "index.default_pipeline": "logs-kafka.reroute"
    }
  },
  "index_patterns": [
    "logs-kafka.reroute-default"
  ],
  "data_stream": {}
}
PUT _ingest/pipeline/logs-kafka.reroute
{
  "processors": [
    {
      "reroute": {
        "dataset": [
          "{
  
  {data_stream.dataset}}"
        ],
        "namespace": [
          "{
  
  {data_stream.namespace}}"
        ]
      }
    }
  ]
}

接下来,我们将创建用于处理 metrics 的索引模板和提取管道:

PUT _index_template/metrics-kafka.reroute
{
  "template": {
    "settings": {
      "index.default_pipeline": "metrics-kafka.reroute"
    }
  },
  "index_patterns": [
    "metrics-kafka.reroute-default"
  ],
  "data_stream": {}
}
PUT _ingest/pipeline/metrics-kafka.reroute
{
  "processors": [
    {
      "reroute": {
        "dataset": [
          "{
  
  {data_stream.dataset}}"
        ],
        "namespace": [
          "{
  
  {data_stream.namespace}}"
        ]
      }
    }
  ]
}

关于 rerouting 的说明:下面是一个实际示例,其中与 Linux 网络指标相关的文档将首先进入 metrics-kafka.reroute-default,然后此 Ingest Pipeline 将检查该文档并发现 data_stream.dataset 设置为 system.network,data_stream.namespace 设置为 default。它将使用这些值将文档从 metrics-kafka.reroute-default 重新路由到 metrics-system.network-default,然后由系统集成进行处理。

配置 Confluent Cloud Elasticsearch Sink 连接器

现在是时候配置 Confluent Cloud Elasticsearch Sink 连接器了。我们将执行以下步骤两次并创建两个单独的连接器,一个用于 logs 的连接器,一个用于 metrics 的连接器。如果所需的设置不同,我们将突出显示正确的值。

导航到 Confluent Cloud 中的 Kafka 集群,然后从左侧导航菜单中选择 Connectors。在 Connectors 页面上,从可用的连接器目录中选择 Elasticsearch Service Sink。

Confluent Cloud 为用户提供了配置连接器的简化工作流程。这里我们将逐步介绍该过程的每个步骤:

步骤 1:主题选择

首先,我们将根据要部署的连接器选择连接器将从中消费数据的主题:

  • 部署用于 logs 的 Elasticsearch Sink 连接器时,请选择 logs 主题。
  • 部署用于 metrics 的 Elasticsearch Sink 连接器时,请选择 metrics 主题。

步骤 2:Kafka 凭据

选择 KAFKA_API_KEY 作为集群身份验证模式。提供前面提到的 API Key 和 Secret

我们收集所需的 Confluent Cloud Cluster 信息。

步骤 3:身份验证

提供我们的 Elasticsearch 集群的 Elasticsearch Endpoint 地址作为连接 URI。连接用户和连接密码是 Elasticsearch 中帐户的身份验证信息,Elasticsearch Sink Connector 将使用这些信息将数据写入 Elasticsearch。

步骤 4:配置

在此步骤中,我们将 Input Kafka record value format 设置为 JSON。接下来,展开 Advanced Configuration。

  1. 我们将 Data Stream Dataset 设置为 kafka.reroute
  2. 我们将根据要部署的连接器设置 Data Stream Type:
    • 在为 logs 部署 Elasticsearch Sink 连接器时,我们将 Data Stream Type 设置为 logs
    • 在为 metrics 部署 Elasticsearch Sink 连接器时,我们将Data Stream Type 设置为 metrics
  3. 其他设置的正确值将取决于具体环境。

步骤 5:调整大小

在此步骤中,请注意 Confluent Cloud 为我们的部署提供了建议的最少任务数。遵循此处的建议是大多数部署的良好起点。

步骤 6:检查和启动

查看 Connector configuration 和 Connector pricing 部分,如果一切正常,则是时候单击 continue 并启动连接器了!连接器可能会报告为配置,但很快就会开始使用来自 Kafka 主题的数据并将其写入 Elasticsearch 集群。

你现在可以导航到 Kibana 中的发现并找到流入 Elasticsearch 的日志!还请查看 Confluent Cloud 为你的新 Elasticsearch Sink Connector 部署提供的实时指标。

如果你只部署了第一个 logs 接收器连接器,你现在可以重复上述步骤来部署第二个 metrics 接收器连接器。

享受完全托管的数据采集架构

如果你遵循上述步骤,那么恭喜你。你已成功:

  1. 配置 Elastic Agent 以将日志和指标发送到 Kafka 中的专用主题
  2. 在 Elasticsearch 中创建发布端点(数据流),专用于处理来自 Elasticsearch Sink Connector 的数据
  3. 配置托管 Elasticsearch Sink Connector 以使用来自多个主题的数据并将该数据发布到 Elasticsearch

接下来,你应该启用其他集成,部署更多 Elastic Agent,在 Kibana 中探索你的数据,并享受 Elastic Serverless 和 Confluent Cloud 完全托管的数据采集架构带来的好处!

原文:Deploying Elastic Agent with Confluent Cloud's Elasticsearch Connector — Elastic Observability Labs

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/961487.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【数据结构】初识链表

顺序表的优缺点 缺点: 中间/头部的插入删除,时间复杂度效率较低,为O(N) 空间不够的时候需要扩容。 如果是异地扩容,增容需要申请新空间,拷贝数据,释放旧空间,会有不小的消耗。 扩容可能会存在…

deepseek R1的确不错,特别是深度思考模式

deepseek R1的确不错,特别是深度思考模式,每次都能自我反省改进。比如我让 它写文案: 【赛博朋克版程序员新春密码——2025我们来破局】 亲爱的代码骑士们: 当CtrlS的肌肉记忆遇上抢票插件,当Spring Boot的…

Python爬虫之——Cookie存储器

目录 专栏导读1、背景介绍2、库的安装3、核心代码4、完整代码总结 专栏导读 🌸 欢迎来到Python办公自动化专栏—Python处理办公问题,解放您的双手 🏳️‍🌈 博客主页:请点击——> 一晌小贪欢的博客主页求关注 &…

VMware 中Ubuntu无网络连接/无网络标识解决方法【已解决】

参考文档 Ubuntu无网络连接/无网络标识解决方法_ubuntu没网-CSDN博客 再我们正常使用VMware时,就以Ubuntu举例可能有时候出现无网络连接,甚至出现无网络标识的情况,那么废话不多说直接上教程 环境:无网络 解决方案&#…

CSS(快速入门)

欢迎大家来到我的博客~欢迎大家对我的博客提出指导,有错误的地方会改进的哦~点击这里了解更多内容 目录 一、什么是CSS?二、基本语法规范三、CSS选择器3.1 标签选择器3.2 id选择器3.3 class选择器3.4 通配符选择器3.5 复合选择器 四、常用CSS样式4.1 color4.2 font…

网络安全攻防实战:从基础防护到高级对抗

📝个人主页🌹:一ge科研小菜鸡-CSDN博客 🌹🌹期待您的关注 🌹🌹 引言 在信息化时代,网络安全已经成为企业、政府和个人必须重视的问题。从数据泄露到勒索软件攻击,每一次…

PETSc源码分析: Optimization Solvers

本文结合PETSc源代码,分析PETSc中的优化求解器。 注1:限于研究水平,分析难免不当,欢迎批评指正。 注2:文章内容会不定期更新。 参考文献 Balay S. PETSc/TAO Users Manual, Revision 3.22. Argonne National Labora…

单细胞-第四节 多样本数据分析,下游画图

文件在单细胞\5_GC_py\1_single_cell\2_plots.Rmd 1.细胞数量条形图 rm(list ls()) library(Seurat) load("seu.obj.Rdata")dat as.data.frame(table(Idents(seu.obj))) dat$label paste(dat$Var1,dat$Freq,sep ":") head(dat) library(ggplot2) lib…

基于Arcsoft的人脸识别

目录 一、前言 二、使用方法 三、获取SDK 四、人脸检测/人脸识别 五、代码实现 一、前言 face++,百度ai,虹软,face_recognition,其中除了face_recognition是python免费的一个库安装好响应的库直接运行就好,另外三个需要填入相关申请的信息id和key。 分别对应着相应的人…

计算机网络之链路层

本文章目录结构出自于《王道计算机考研 计算机网络_哔哩哔哩_bilibili》 02 数据链路层 在网上看到其他人做了详细的笔记,就不再多余写了,直接参考着学习吧。 1 详解数据链路层-数据链路层的功能【王道计算机网络笔记】_wx63088f6683f8f的技术博客_51C…

Python数据分析-数据加载与存储(六)

title: ‘Python数据分析:数据加载与存储(六)’ tags: python数据分析 categories:python数据分析 keywords:python数据分析 cover: …/img/404_icecream_whale.png description: 本文讲述了使用pandas加载和存储数据的操作,和一些文件的格式…

计算机网络 IP 网络层 2 (重置版)

IP的简介: IP 地址是互联网协议地址(Internet Protocol Address)的简称,是分配给连接到互联网的设备的唯一标识符,用于在网络中定位和通信。 IP编制的历史阶段: 1,分类的IP地址: …

大数据相关职位介绍之二(数据治理,数据库管理员, 数据资产管理师,数据质量专员)

大数据相关职位介绍之二(数据治理,数据库管理员, 数据资产管理师,数据质量专员) 文章目录 大数据相关职位介绍之二(数据治理,数据库管理员, 数据资产管理师,数据质量专员…

编译dpdk19.08.2中example时一系列报错解决

dpdk19.08编译过程全解 dpdk 介绍问题描述编译过程执行Step 1报错一解决方式 报错二解决方式 继续执行Step 248的时候报错 49没有修改成功输入60退出 使用过程执行make报错一解决方式 继续make报错二解决方式 继续make执行生成文件helloworld报错三解决方式 执行make 完成参考链…

最优化问题 - 内点法

以下是一种循序推理的方式,来帮助你从基础概念出发,理解 内点法(Interior-Point Method, IPM) 是什么、为什么要用它,以及它是如何工作的。 1. 问题起点:带不等式约束的优化 假设你有一个带不等式约束的优…

Linux下Ubuntun系统报错find_package(BLAS REQUIRED)找不到

Linux下Ubuntun系统报错find_package(BLAS REQUIRED)找不到 这次在windows的WSL2中遇到了一个非常奇怪的错误,就是 CMake Error at /usr/share/cmake-3.22/Modules/FindPackageHandleStandardArgs.cmake:230 (message):Could NOT find BLAS (missing: BLAS_LIBRAR…

Ubuntu Server 安装 XFCE4桌面

Ubuntu Server没有桌面环境,一些软件有桌面环境使用起来才更加方便,所以我尝试安装桌面环境。常用的桌面环境有:GNOME、KDE Plasma、XFCE4等。这里我选择安装XFCE4桌面环境,主要因为它是一个极轻量级的桌面环境,适合内…

芯片AI深度实战:实战篇之vim chat

利用vim-ollama这个vim插件,可以在vim内和本地大模型聊天。 系列文章: 芯片AI深度实战:基础篇之Ollama-CSDN博客 芯片AI深度实战:基础篇之langchain-CSDN博客 芯片AI深度实战:实战篇之vim chat-CSDN博客 芯片AI深度…

线程概念、操作

一、背景知识 1、地址空间进一步理解 在父子进程对同一变量进行修改时发生写时拷贝,这时候拷贝的基本单位是4KB,会将该变量所在的页框全拷贝一份,这是因为修改该变量很有可能会修改其周围的变量(局部性原理)&#xf…

设置jmeter外观颜色

设置jmeter外观颜色 方法: 步骤一、点击顶部选项 ->外观,这里提供了不同的主题,可选自己喜欢的风格。 步骤二、选择后,弹框提示点击Yes。