Elasticsearch:使用 BigQuery 提取数据

作者:来自 Elastic Jeffrey Rengifo

了解如何使用 Python 在 Elasticsearch 中索引和搜索 Google BigQuery 数据。

BigQuery 是 Google 的一个平台,允许你将来自不同来源和服务的数据集中到一个存储库中。它还支持数据分析,并可使用生成式 AI (GenAI) 和机器学习 (ML) 工具。以下是将数据引入 BigQuery 的方式:

将所有这些来源的数据索引到 Elasticsearch,可帮助你集中数据源,从而获得更好的可观测性体验。

在本文中,你将学习如何使用 Python 将 BigQuery 数据索引到 Elasticsearch,使你能够统一来自不同系统的数据,以便进行搜索和分析。

你可以在此 Google Colab 笔记本中使用本文提供的示例。

步骤

  1. 准备 BigQuery
  2. 配置 BigQuery Python 客户端
  3. 将数据索引到 Elasticsearch
  4. 搜索数据

准备 BigQuery

要使用 BigQuery,你需要访问 Google Cloud 控制台并创建一个项目。完成后,你将被重定向到以下界面:

BigQuery 允许你从 Google Drive 和 Google Cloud Storage 传输数据,并支持上传本地文件。要将数据上传到 BigQuery,你首先需要创建一个数据集。创建一个并命名为 "server-logs",这样我们就可以上传一些文件。

在本文中,我们将上传一个包含不同类型文章的本地数据集。请查看 BigQuery 的官方文档,了解如何上传本地文件。

数据集

我们将上传到 BigQuery 的文件包含服务器日志数据,其中包含 HTTP 响应及其描述,文件格式为 ndjson。该 ndjson 文件包含以下字段:ip_address_timestamphttp_methodendpointstatus_coderesponse_timestatus_code_description

BigQuery 将从该文件中提取数据,然后我们将使用 Python 进行整理,并将其索引到 Elasticsearch。

创建一个名为 logs.ndjson 的文件,并填充以下内容:

{"ip_address": "192.168.1.3", "_timestamp": "2024-12-03T12:00:03Z", "http_method": "GET", "endpoint": "/about", "status_code": "404", "response_time": 89, "status_code_description": "The requested contact page does not exist or was removed."}
{"ip_address": "192.168.1.3", "_timestamp": "2024-12-03T12:00:07Z", "http_method": "GET", "endpoint": "/contact", "status_code": "404", "response_time": 76, "status_code_description": "The requested contact page does not exist or was removed."}
{"ip_address": "192.168.1.1", "_timestamp": "2024-12-03T12:00:01Z", "http_method": "GET", "endpoint": "/home", "status_code": "200", "response_time": 123, "status_code_description": "OK"}
{"ip_address": "192.168.1.1", "_timestamp": "2024-12-03T12:00:04Z", "http_method": "GET", "endpoint": "/products", "status_code": "200", "response_time": 156, "status_code_description": "OK"}
{"ip_address": "192.168.1.2", "_timestamp": "2024-12-03T12:00:05Z", "http_method": "GET", "endpoint": "/home", "status_code": "200", "response_time": 101, "status_code_description": "OK"}
{"ip_address": "192.168.1.2", "_timestamp": "2024-12-03T12:00:08Z", "http_method": "GET", "endpoint": "/home", "status_code": "200", "response_time": 98, "status_code_description": "OK"}
{"ip_address": "192.168.1.6", "_timestamp": "2024-12-03T12:00:10Z", "http_method": "GET", "endpoint": "/home", "status_code": "200", "response_time": 105, "status_code_description": "OK"}
{"ip_address": "192.168.1.2", "_timestamp": "2024-12-03T12:00:02Z", "http_method": "POST", "endpoint": "/login", "status_code": "500", "response_time": 340, "status_code_description": "Internal error while processing the payment gateway."}
{"ip_address": "192.168.1.5", "_timestamp": "2024-12-03T12:00:09Z", "http_method": "POST", "endpoint": "/payment", "status_code": "500", "response_time": 512, "status_code_description": "Internal error while processing the payment gateway."}
{"ip_address": "192.168.1.4", "_timestamp": "2024-12-03T12:00:06Z", "http_method": "POST", "endpoint": "/checkout", "status_code": "503", "response_time": 450, "status_code_description": "Service temporarily unavailable during the checkout process."}

我们将此文件上传到刚刚创建的数据集(显示为 "server_logs"),并使用 "logs" 作为表名(显示为 "table id")。

完成后,你的文件应如下所示:

配置 BigQuery Python 客户端

下面,我们将学习如何使用 BigQuery Python 客户端和 Google Colab 来构建一个应用。

1. 安装依赖

首先,我们需要安装以下依赖项:

!pip install google-cloud-bigquery elasticsearch==8.16

google-cloud-bigquery 依赖项提供了访问 BigQuery 数据所需的工具,elasticsearch 允许连接到 Elastic 并索引数据,而 getpass 可用于输入敏感变量而不在代码中暴露它们。

让我们导入所有必要的依赖项:

from elasticsearch import Elasticsearch, exceptions
from google.cloud import bigquery
from google.colab import auth
from getpass import getpass
from datetime import datetime
import json

我们还需要声明其他变量,并初始化 Elasticsearch 的 Python 客户端:

ELASTICSEARCH_ENDPOINT = getpass("Elasticsearch endpoint: ")
ELASTIC_API_KEY = getpass("Elastic Api Key: ")
# Google Cloud project name and BigQuery dataset name
PROJECT_ID = "elasticsearch-bigquery"
# dataset_id in format <your-project-name>.<your-dataset-name>
DATASET_ID = f'{PROJECT_ID}.server-logs'
# Elasticsearch client
es_client = Elasticsearch(
    ELASTICSEARCH_ENDPOINT,
    api_key=ELASTIC_API_KEY,
)

2. 身份验证

为了获取使用 BigQuery 所需的凭证,我们将使用认证。运行下面的命令行,并选择你用于创建 Google Cloud 项目的相同账户:

auth.authenticate_user()

现在,让我们查看 BigQuery 中的数据:

client = bigquery.Client(project=PROJECT_ID)
# Getting tables from dataset
tables = client.list_tables(DATASET_ID)
data = {}
for table in tables:
    # Table id must be in format <dataset_name>.<table_name>
    table_id = f"{DATASET_ID}.{table.table_id}"
    print(f"Processing table: {table.table_id}")
    # Query to retrieve BigQuery tables data
    query = f"""
        SELECT *
        FROM `{table_id}`
    """
    query_job = client.query(query)
    results = query_job.result()
    print(f"Results for table: {table.table_id}:")
    data[table.table_id] = []
    for row in results:
        # Saving data with key=table_id
        data[table.table_id].append(dict(row))
        print(row)

# variable with data
logs_data = data['logs']

这应该是你看到的结果:

Processing table: logs
Results for table: logs:
Row(('The requested contact page does not exist or was removed.', 404, 'GET', '/about', datetime.datetime(2024, 12, 3, 12, 0, 3, tzinfo=datetime.timezone.utc), 89, '192.168.1.3'), {'status_code_description': 0, 'status_code': 1, 'http_method': 2, 'endpoint': 3, '_timestamp': 4, 'response_time': 5, 'ip_address': 6})
Row(('The requested contact page does not exist or was removed.', 404, 'GET', '/contact', datetime.datetime(2024, 12, 3, 12, 0, 7, tzinfo=datetime.timezone.utc), 76, '192.168.1.3'), {'status_code_description': 0, 'status_code': 1, 'http_method': 2, 'endpoint': 3, '_timestamp': 4, 'response_time': 5, 'ip_address': 6})
Row(('OK', 200, 'GET', '/home', datetime.datetime(2024, 12, 3, 12, 0, 1, tzinfo=datetime.timezone.utc), 123, '192.168.1.1'), {'status_code_description': 0, 'status_code': 1, 'http_method': 2, 'endpoint': 3, '_timestamp': 4, 'response_time': 5, 'ip_address': 6})
Row(('OK', 200, 'GET', '/products', datetime.datetime(2024, 12, 3, 12, 0, 4, tzinfo=datetime.timezone.utc), 156, '192.168.1.1'), {'status_code_description': 0, 'status_code': 1, 'http_method': 2, 'endpoint': 3, '_timestamp': 4, 'response_time': 5, 'ip_address': 6})
Row(('OK', 200, 'GET', '/home', datetime.datetime(2024, 12, 3, 12, 0, 5, tzinfo=datetime.timezone.utc), 101, '192.168.1.2'), {'status_code_description': 0, 'status_code': 1, 'http_method': 2, 'endpoint': 3, '_timestamp': 4, 'response_time': 5, 'ip_address': 6})
Row(('OK', 200, 'GET', '/home', datetime.datetime(2024, 12, 3, 12, 0, 8, tzinfo=datetime.timezone.utc), 98, '192.168.1.2'), {'status_code_description': 0, 'status_code': 1, 'http_method': 2, 'endpoint': 3, '_timestamp': 4, 'response_time': 5, 'ip_address': 6})
Row(('OK', 200, 'GET', '/home', datetime.datetime(2024, 12, 3, 12, 0, 10, tzinfo=datetime.timezone.utc), 105, '192.168.1.6'), {'status_code_description': 0, 'status_code': 1, 'http_method': 2, 'endpoint': 3, '_timestamp': 4, 'response_time': 5, 'ip_address': 6})
Row(('Internal error while processing the payment gateway.', 500, 'POST', '/login', datetime.datetime(2024, 12, 3, 12, 0, 2, tzinfo=datetime.timezone.utc), 340, '192.168.1.2'), {'status_code_description': 0, 'status_code': 1, 'http_method': 2, 'endpoint': 3, '_timestamp': 4, 'response_time': 5, 'ip_address': 6})
Row(('Internal error while processing the payment gateway.', 500, 'POST', '/payment', datetime.datetime(2024, 12, 3, 12, 0, 9, tzinfo=datetime.timezone.utc), 512, '192.168.1.5'), {'status_code_description': 0, 'status_code': 1, 'http_method': 2, 'endpoint': 3, '_timestamp': 4, 'response_time': 5, 'ip_address': 6})
Row(('Service temporarily unavailable during the checkout process.', 503, 'POST', '/checkout', datetime.datetime(2024, 12, 3, 12, 0, 6, tzinfo=datetime.timezone.utc), 450, '192.168.1.4'), {'status_code_description': 0, 'status_code': 1, 'http_method': 2, 'endpoint': 3, '_timestamp': 4, 'response_time': 5, 'ip_address': 6})

通过这段简单的代码,我们已经从 BigQuery 提取了数据,并将其存储在 logs_data 变量中,现在可以将其与 Elasticsearch 一起使用。

将数据索引到 Elasticsearch

我们将首先在 Kibana Devtools 控制台中定义数据结构:

es_client.indices.create(
        index="bigquery-logs",
        body={
            "mappings": {
                "properties": {
                    "status_code_description": {"type": "match_only_text"},
                    "status_code": {"type": "keyword"},
                    "@timestamp": {"type": "date"},
                    "ip_address": {"type": "ip"},
                    "http_method": {"type": "keyword"},
                    "endpoint": {"type": "keyword"},
                    "response_time": {"type": "integer"},
                }
            }
        }
    )

match_only_text 字段是 text 字段类型的一种变体,通过不存储用于计算分数的元数据来节省磁盘空间。我们使用它,因为日志通常是时间为中心的,即日期比文本字段中的匹配质量更为重要。使用 textfield 的查询与使用 match_only_text 字段的查询是兼容的。

我们将使用 Elasticsearch _bulk API 来索引这些文件:

 bulk_data = []

for log_entry in logs_data:
    # Convert timestamp to ISO 8601 string
    timestamp_iso8601 = log_entry["_timestamp"].isoformat()

    # Prepare action metadata
    action_metadata = {
        "index": {
            "_index": "bigquery-logs",
            "_id": f"{log_entry['ip_address']}-{timestamp_iso8601}"
        }
    }
    # Prepare document
    document = {
        "ip_address": log_entry["ip_address"],
        "status_code": log_entry["status_code"],
        "@timestamp": timestamp_iso8601,
        "http_method": log_entry["http_method"],
        "endpoint": log_entry["endpoint"],
        "response_time": log_entry["response_time"],
        "status_code_description": log_entry["status_code_description"]
    }
    # Append to bulk data
    bulk_data.append(action_metadata)
    bulk_data.append(document)

print(bulk_data)
# Indexing data
response = es_client.bulk(body=bulk_data)

搜索数据

现在,我们可以使用来自 bigquery-logs 索引的数据运行查询。

在这个例子中,我们将使用来自服务器的错误描述(status_code_description 字段)进行搜索。此外,我们将按日期对结果进行排序,并获取错误的 IP 地址:

es_client.search(
    index="bigquery-logs",
    body={
        "query": {"match": {"status_code_description": "error"}},
        "sort": [{"@timestamp": {"order": "desc"}}],
        "aggs": {"by_ip": {"terms": {"field": "ip_address", "size": 10}}},
    },
)

这是结果:

{
  ...
  "hits": {
    ...
    "hits": [
      {
        "_index": "bigquery-logs",
        "_id": "192.168.1.5-2024-12-03T12:00:09+00:00",
        "_score": null,
        "_source": {
          "ip_address": "192.168.1.5",
          "status_code": 500,
          "@timestamp": "2024-12-03T12:00:09+00:00",
          "http_method": "POST",
          "endpoint": "/payment",
          "response_time": 512,
          "status_code_description": "Internal error while processing the payment gateway."
        },
        "sort": [
          1733227209000
        ]
      },
      {
        "_index": "bigquery-logs",
        "_id": "192.168.1.2-2024-12-03T12:00:02+00:00",
        "_score": null,
        "_source": {
          "ip_address": "192.168.1.2",
          "status_code": 500,
          "@timestamp": "2024-12-03T12:00:02+00:00",
          "http_method": "POST",
          "endpoint": "/login",
          "response_time": 340,
          "status_code_description": "Internal error while processing the payment gateway."
        },
        "sort": [
          1733227202000
        ]
      }
    ]
  },
  "aggregations": {
    "by_ip": {
      "doc_count_error_upper_bound": 0,
      "sum_other_doc_count": 0,
      "buckets": [
        {
          "key": "192.168.1.2",
          "doc_count": 1
        },
        {
          "key": "192.168.1.5",
          "doc_count": 1
        }
      ]
    }
  }
}

结论

像 BigQuery 这样的工具有助于集中信息,对于数据管理非常有用。除了搜索,将 BigQuery 与 Elasticsearch 一起使用,可以利用机器学习和数据分析的强大功能,更简单、更快速地检测或分析问题。

想要获得 Elastic 认证吗?了解下一期 Elasticsearch 工程师培训的时间!

Elasticsearch 提供了许多新功能,帮助你为自己的使用案例构建最佳的搜索解决方案。深入了解我们的示例笔记本,了解更多内容,开始免费的云试用,或者现在就尝试在本地机器上使用 Elastic。

原文:Ingesting data with BigQuery - Elasticsearch Labs

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/984148.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

如何在el-input搜索框组件的最后面,添加图标按钮?

1、问题描述 2、解决步骤 在el-input组件标签内&#xff0c;添加一个element-plus的自定义插槽&#xff0c; 在插槽里放一个图标按钮即可。 3、效果展示 结语 以上就是在搜索框组件的末尾添加搜索按钮的过程。 喜欢本篇文章的话&#xff0c;请关注本博主~~

Magento2根据图片文件包导入产品图片

图片包给的图片文件是子产品的图片&#xff0c;如下图&#xff1a;A104255是主产品的sku <?php/*** 根据图片包导入产品图片&#xff0c;包含子产品和主产品* 子产品是作为主图&#xff0c;主产品是作为附加图片*/use Magento\Framework\App\Bootstrap;include(../app/boot…

INT_MAX 与 0x3f3f3f3f 的区别

【INT_MAX 与 0x3f3f3f3f 的区别】 在算法设计中&#xff0c;INT_MAX 与 0x3f3f3f3f 都常被用作无穷大值的设定&#xff0c;但二者区别显著&#xff0c;适用场景也有所不同。 备注&#xff1a;此图由百度 AI 创作生成 &#xff08;1&#xff09;INT_MAX 是 C/C 中的标准常量&a…

升级旧版本Vmware到Vmware Workstation Pro 17

背景 一些新版本Linux内核版本较高&#xff0c;例如&#xff1a;openEuler24.03 LTS需要的内核版本为6.6&#xff0c;而Vmware Workstation Pro 16最高只支持Linux5.x内核&#xff0c;对Linux6.x不支持&#xff0c;因此&#xff0c;需要将旧版本的Vmware升级到Vmware Workstat…

【第23节】C++设计模式(行为模式)-Interpreter(解释器)模式

一、问题背景 在一些应用中&#xff0c;系统会提供内建&#xff08;Build-In&#xff09;的脚本或宏语言&#xff0c;允许用户定义他们能够在系统中执行的操作。Interpreter 模式的目的就是为用户提供一种定义语言的语法表示&#xff0c;并通过解释器来解释语言中的句子。这种模…

哈弗赛恩公式计算长度JavaScript实现

哈弗赛恩公式&#xff08;Haversine formula&#xff09;是一种用于计算球面上两点间最短距离的数学方法&#xff0c;尤其适用于地球表面。本文将详细介绍哈弗赛恩公式的原理、应用以及如何使用JavaScript实现它。 一、哈弗赛恩公式原理 在球面几何中&#xff0c;哈弗赛恩公式…

Day05 实例:正向反向连接内外网环境防火墙出入站

一、正反向连接 0、先将防火墙关闭 Linux&#xff1a; sudo systemctl stop firewalld Windows&#xff1a;netsh advfirewall set allprofiles state off 1、正向连接 1.1 Linux连接Windows 00x1 开启两台服务器 并且给Windows拖入nc.exe 00x2 Windows绑定自己5566端…

【大模型知识点】位置编码——绝对位置编码,相对位置编码,旋转位置编码RoPE

由于Transformer 中的自注意模块具有置换不变性&#xff08;不关心输入序列的顺序&#xff09;&#xff0c;因此需要使用位置编码来注入位置信息以建模序列&#xff0c;使模型能够区分不同位置的 token&#xff0c;并捕捉序列的顺序关系。 在介绍一些位置编码方法前&#xff0…

Linux 配置静态 IP

一、简介 在 Linux CentOS 系统中默认动态分配 IP 地址&#xff0c;每次启动虚拟机服务都是不一样的 IP&#xff0c;因此要配置静态 IP 地址避免每次都发生变化&#xff0c;下面将介绍配置静态 IP 的详细步骤。 首先先理解一下动态 IP 和静态 IP 的概念&#xff1a; 动态 IP…

DeepSeek 3FS:端到端无缓存的存储新范式

在 2025 年 2 月 28 日&#xff0c;DeepSeek 正式开源了其高性能分布式文件系统 3FS【1】&#xff0c;作为其开源周的压轴项目&#xff0c;3FS 一经发布便引发了技术圈的热烈讨论。它不仅继承了分布式存储的经典设计&#xff0c;还通过极简却高效的架构&#xff0c;展现了存储技…

微服务与消息队列RabbitMQ

简介 同步模式 异步模式 内容 解决方案RabbitMQ 同步调用的优缺点 同步调用的优势是什么&#xff1f; 时效性强&#xff0c;等待到结果后才返回。 同步调用的问题是什么&#xff1f; 拓展性差性能下降级联失败问题

vue+element-plus简洁完美实现古诗文网

目录 一、项目介绍 二、项目截图 1.项目结构图 2.首页&#xff08;推荐&#xff09; 3.诗文 4.名句 5.作者 6.古籍 7.我的 三、源码实现 1.路由配置 2.顶部 四、总结 一、项目介绍 项目在线预览&#xff1a;点击访问 本项目为vue项目&#xff0c;参考古诗文网官方…

Vue项目通过内嵌iframe访问另一个vue页面,获取token适配后端鉴权(以内嵌若依项目举例)

1. 改造子Vue项目进行适配(ruoyi举例) (1) 在路由文件添加需要被外链的vue页面配置 // 若依项目的话是 router/index.js文件 {path: /contrast,component: () > import(/views/contrast/index),hidden: true },(2) 开放白名单 // 若依项目的话是 permission.js 文件 cons…

RuleOS:区块链开发的“破局者”,开启Web3新纪元

RuleOS&#xff1a;区块链开发的“破冰船”&#xff0c;驶向Web3的星辰大海 在区块链技术的浩瀚宇宙中&#xff0c;一群勇敢的探索者正驾驶着一艘名为RuleOS的“破冰船”&#xff0c;冲破传统开发的冰层&#xff0c;驶向Web3的星辰大海。这艘船&#xff0c;正以一种前所未有的姿…

指针的工作原理,函数的传值和传址

在C之中&#xff0c;指针是一个变量用于存储另外一个变量的内存地址。指针可以指向各种数据类型例如基础数据类型和自定义数据类型等。 在计算机之中的内存被划分为一个一个的存储单元&#xff0c;每个存储单元拥有唯一的内存地址&#xff0c;指针就是指向这些内存单元的地址。…

RISC-V汇编学习(三)—— RV指令集

有了前两节对于RISC-V汇编、寄存器、汇编语法等的认识&#xff0c;本节开始介绍RISC-V指令集和伪指令。 前面说了RISC-V的模块化特点&#xff0c;是以RV32I为作为ISA的核心模块&#xff0c;其他都是要基于此为基础&#xff0c;可以这样认为&#xff1a;RISC-V ISA 基本整数指…

基于控制障碍函数(Control Barrier Function)的二次规划(QP)控制

文章目录 研究背景主要贡献障碍函数&#xff08;Barrier Function&#xff09;&#xff08;一&#xff09;倒数障碍函数&#xff08;Reciprocal Barrier Function, RBF&#xff09;&#xff08;二&#xff09;归零障碍函数&#xff08;Zeroing Barrier Function, ZBF&#xff0…

软件工程:软件需求之需求分析方法

目录 前言 需求分析方法 工具和方法 具体分析方法 对运行环境的影响 ​编辑 前言 本文重点介绍开展软件需求分析的方法。 需求分析方法 工具和方法 软件需求可以维护在ALM系统中&#xff0c;譬如&#xff1a;doors&#xff0c;codeBeamer等&#xff0c;JIRA适合互联网行…

蓝桥杯—走迷宫(BFS算法)

题目描述 给定一个NM 的网格迷宫 G。G 的每个格子要么是道路&#xff0c;要么是障碍物&#xff08;道路用 11表示&#xff0c;障碍物用 0 表示&#xff09;。 已知迷宫的入口位置为 (x1​,y1​)&#xff0c;出口位置为 (x2​,y2​)。问从入口走到出口&#xff0c;最少要走多少…

Ubuntu无风扇工控机:解决精密仪器散热难题的利器

在现代工业自动化领域&#xff0c;精密仪器的控制对设备的稳定性、可靠性和静音性提出了极高的要求。而无风扇Ubuntu工控机&#xff0c;凭借其独特的无风扇设计和强大的计算能力&#xff0c;正逐渐成为精密仪器控制场景中的“无声守护者”。本文将深入探讨无风扇工控机在精密仪…