Elasticsearch:让你的 Elasticsearch 索引与 Python 和 Google Cloud Platform 功能保持同步

作者:来自 Elastic Garson

Elasticsearch 内的索引 (index) 是你可以将数据存储在文档中的位置。 在使用索引时,如果你使用的是动态数据集,数据可能会很快变旧。 为了避免此问题,你可以创建一个 Python 脚本来更新索引,并使用 Google Cloud Platform (GCP) 的 Cloud Functions 和 Cloud Scheduler 进行部署,以便自动保持索引最新。

为了使索引保持最新,你可以首先设置一个 Jupyter Notebook 在本地进行测试,并创建一个脚本框架,该脚本框架将在出现新信息时更新你的索引。 你可以调整脚本以使其更具可重用性并将其作为云函数(Cloud function)运行。 使用 Cloud Scheduler,你可以将 Cloud Function 中的代码设置为使用 cron 类型格式按计划运行。

先决条件

  • 本示例使用 Elasticsearch 版本 8.12; 如果你是新手,请查看我们的 Elasticsearch 快速入门。
  • 如果你的计算机上尚未安装 Python,请下载最新版本。 此示例使用 Python 3.12.1。
  • NASA API 的 API 密钥。
  • 你将使用 Requests 包连接到 NASA API,使用 Pandas 操作数据,使用 Elasticsearch Python 客户端将数据加载到索引中并使其保持最新,并使用 Jupyter Notebooks 在测试时以交互方式处理数据。 你可以运行以下行来安装这些必需的软件包:
pip3 install requests pandas elasticsearch notebook

加载和更新你的数据集

在 GCP 内运行更新脚本之前,你需要上传数据并测试用于保持脚本更新的流程。 你将首先从 API 连接到数据,将其保存为 Pandas DataFrame,连接到 Elasticsearch,将 DataFrame 上传到索引中,检查索引上次更新的时间,并在有新数据可用时更新索引。 你可以在此搜索实验室笔记本中找到本节的完整代码。

加载你的数据

让我们开始使用 Jupyter Notebook 进行本地测试,以交互方式处理你的数据。 为此,你可以在终端中运行以下命令。

jupyter notebook

在右上角,你可以选择 “New” 来创建新的 Jupyter Notebook。

首先,你需要导入将要使用的包。 你将导入之前安装的所有软件包,以及用于处理 API 密钥等机密的 getpass 和用于处理日期对象的 datetime。

import requests
from getpass import getpass
import pandas as pd
from datetime import datetime, timedelta
from elasticsearch import Elasticsearch, helpers

你将使用的数据集是近地天体 Web 服务 (NeoWs),这是一种提供近地小行星信息的 RESTful Web 服务。 该数据集可让你根据小行星最接近地球的日期搜索小行星、查找特定小行星并浏览整个数据集。

通过以下函数,你可以连接到 NASA 的 NeoWs API,获取过去一周的数据,并将响应转换为 JSON 对象。

def connect_to_nasa():
    url = "https://api.nasa.gov/neo/rest/v1/feed"
    nasa_api_key = getpass("NASA API Key: ")
    today = datetime.now()
    params = {
        "api_key": nasa_api_key,
        "start_date": today - timedelta(days=7),
        "end_date": datetime.now(),
    }
    return requests.get(url, params).json()

现在,你可以将 API 调用的结果保存到名为 “response” 的变量中。

要将 JSON 对象转换为 pandas DataFrame,你必须将嵌套对象规范化为一个 DataFrame,并删除包含嵌套 JSON 的列。

def create_df(response):
    all_objects = []
    for date, objects in response["near_earth_objects"].items():
        for obj in objects:
            obj["close_approach_date"] = date
            all_objects.append(obj)
    df = pd.json_normalize(all_objects)
    return df.drop("close_approach_data", axis=1)

要调用此函数并查看数据集的前五行,你可以运行以下命令:

df = create_df(response)
df.head()

连接到 Elasticsearch

你可以通过提供 Elastic Cloud ID 和 API 密钥进行身份验证,从 Python 客户端访问 Elasticsearch。

def connect_to_elastic():
    elastic_cloud_id = getpass("Elastic Cloud ID: ")
    elastic_api_key = getpass("Elastic API Key: ")
    return Elasticsearch(cloud_id=elastic_cloud_id, api_key=elastic_api_key)

现在,你可以将连接函数的结果保存到名为 es 的变量中。

es = connect_to_elastic()

Elasticsearch 中的索引是数据的主要容器。 你可以将索引命名为 asteroid_data_set。

index_name = "asteroid_data_set"
es.indices.create(index=index_name)

你返回的结果将如下所示:

ObjectApiResponse({'acknowledged': True, 'shards_acknowledged': True, 'index': 'asteroids_data'})

现在,你可以创建一个辅助函数,该函数允许你将 DataFrame 转换为正确的格式以上传到索引中。

def doc_generator(df, index_name):
    for index, document in df.iterrows():
        yield {
            "_index": index_name,
            "_id": f"{document['id']}",
            "_source": document.to_dict(),
        }

接下来,你可以将 DataFrame 的内容批量上传到 Elastic,调用您刚刚创建的辅助函数。

helpers.bulk(es, doc_generator(df, index_name))

你应该得到类似于以下内容的结果,它告诉你已上传的行数:

(146, [])

你最后一次更新数据是什么时候?

将数据上传到 Elasticsearch 后,你可以检查上次更新索引的时间并设置日期格式,以便它可以与 NASA API 配合使用。

def updated_last(es, index_name):
    query = {
        "size": 0,
        "aggs": {"last_date": {"max": {"field": "close_approach_date"}}},
    }
    response = es.search(index=index_name, body=query)
    last_updated_date_string = response["aggregations"]["last_date"]["value_as_string"]
    datetime_obj = datetime.strptime(last_updated_date_string, "%Y-%m-%dT%H:%M:%S.%fZ")
    return datetime_obj.strftime("%Y-%m-%d")

你可以将索引上次更新的日期保存到变量中并打印出该日期。

last_update_date = updated_last(es, index_name)
print(last_update_date)

更新你的数据

现在,你可以创建一个函数来检查自上次更新索引和当前日期以来是否有任何新数据。 如果对象有效并且数据不为空,它将更新索引并让你知道是否没有新数据要更新,或者 DataFrame 是否返回 None 类型,表明可能存在问题。

def update_new_data(df, es, last_update_date, index_name):
    if isinstance(last_update_date, str):
        last_update_date = datetime.strptime(last_update_date, "%Y-%m-%d")

    last_update_date = pd.Timestamp(last_update_date).normalize()

    if not df.empty and "close_approach_date" in df.columns:
        df["close_approach_date"] = pd.to_datetime(df["close_approach_date"])

    today = pd.Timestamp(datetime.now().date()).normalize()

    if df is not None and not df.empty:
        update_range = df.loc[
            (df["close_approach_date"] > last_update_date)
            & (df["close_approach_date"] < today)
        ]
        if not update_range.empty:
            helpers.bulk(es, doc_generator(update_range, index_name))
        else:
            print("No new data to update.")
    else:
        print("The DataFrame is None.")

如果 DataFrame 是有效对象,它将调用你编写的函数并更新索引(如果适用)。 它还会打印出索引上次更新的日期,以帮助你在需要时进行调试。 如果没有,它会告诉你可能有问题。

try:
    if df is None:
        raise ValueError("DataFrame is None. There may be a problem.")
    update_new_data(df, es, last_update_date, index_name)
    print(updated_last(es, index_name))
except Exception as e:
    print(f"An error occurred: {e}")

保持索引最新

现在你已经创建了用于本地测试的框架,你可以设置一个环境,你可以每天运行脚本来检查是否有任何新数据可用并相应地更新索引。

创建云函数

你现在已准备好部署云功能。 为此,你需要选择环境作为第二代函数,命名你的函数,然后选择云区域。 你还可以将其绑定到 Cloud Pub/Sub 触发器,并选择创建新主题(如果你尚未创建)。 你可以在 GitHub 上查看本节的完整代码。

创建 Pub/Sub 主题

创建新主题时,你可以命名主题 ID 并选择使用 Google 管理的加密密钥进行加密。

设置 Cloud Function 的环境变量

在 “Runtime environment variables” 下方,你可以添加 NASA_API_KEY、ELASTIC_CLOUD_ID 和 ELASTIC_API_KEY 的环境变量。 你需要将它们保存为原始值,并且不带单引号。 因此,如果你之前在终端中输入了 “xxxlsdgzxxxxx” 值,你会希望它是 xxxlsdgzxxxxx。

调整你的代码并将其添加到你的云函数

输入环境变量后,你可以按 “下一步(next)” 按钮,这将带你进入代码编辑器。 你需要选择 Python 3.12.1 的运行时或匹配你正在使用的 Python 版本。 之后,将入口点更新为 update_index。 入口点的作用与 Python 中的 main 函数类似。

你将希望使用 os 来执行更自动化的过程,而不是使用 getpass 来获得环境变量(账号信息)。 示例如下所示:

elastic_cloud_id = os.getenv("ELASTIC_CLOUD_ID")
elastic_api_key = os.getenv("ELASTIC_API_KEY")

你需要调整脚本的顺序,以使函数首先连接到 Elasticsearch。 之后,你将想知道索引上次更新的时间,连接到你正在使用的 NASA API,将其保存到 DataFrame,并加载可能可用的任何新数据。

你可能会注意到底部有一个名为 update_index 的新函数,它将你的代码联系在一起。 在此函数中,你定义索引的名称,连接到 Elasticsearch,计算出索引的最后更新日期,连接到 NASA API,将结果保存到数据框中,并在需要时更新索引。 要指示入口点函数是云事件,你可以使用装饰器 @functions_framework.cloud_event 来表示它。

@functions_framework.cloud_event
def update_index(cloud_event):
    index_name = "asteroid_data_set"
    es = connect_to_elastic()
    last_update_date = updated_last(es, index_name)
    print(last_update_date)
    response = connect_to_nasa(last_update_date)
    df = create_df(response)
    if df is not None:
      update_new_data(df, es, last_update_date, index_name)
      print(updated_last(es, index_name)) 
    else:
      print("No new data was retrieved.")

这是完整更新的代码示例:

import functions_framework
import requests
import os
import pandas as pd
from datetime import datetime
from elasticsearch import Elasticsearch, helpers


def connect_to_elastic():
    elastic_cloud_id = os.getenv("ELASTIC_CLOUD_ID")
    elastic_api_key = os.getenv("ELASTIC_API_KEY")
    return Elasticsearch(cloud_id=elastic_cloud_id, api_key=elastic_api_key)


def connect_to_nasa(last_update_date):
    url = "https://api.nasa.gov/neo/rest/v1/feed"
    nasa_api_key = os.getenv("NASA_API_KEY")
    params = {
        "api_key": nasa_api_key,
        "start_date": last_update_date,
        "end_date": datetime.now(),
    }
    return requests.get(url, params).json()


def create_df(response):
    all_objects = []
    for date, objects in response["near_earth_objects"].items():
        for obj in objects:
            obj["close_approach_date"] = date
            all_objects.append(obj)
    df = pd.json_normalize(all_objects)
    return df.drop("close_approach_data", axis=1)


def doc_generator(df, index_name):
    for index, document in df.iterrows():
        yield {
            "_index": index_name,
            "_id": f"{document['close_approach_date']}",
            "_source": document.to_dict(),
        }


def updated_last(es, index_name):
    query = {
        "size": 0,
        "aggs": {"last_date": {"max": {"field": "close_approach_date"}}},
    }
    response = es.search(index=index_name, body=query)
    last_updated_date_string = response["aggregations"]["last_date"]["value_as_string"]
    datetime_obj = datetime.strptime(last_updated_date_string, "%Y-%m-%dT%H:%M:%S.%fZ")
    return datetime_obj.strftime("%Y-%m-%d")


def update_new_data(df, es, last_update_date, index_name):
    if isinstance(last_update_date, str):
        last_update_date = datetime.strptime(last_update_date, "%Y-%m-%d")

    last_update_date = pd.Timestamp(last_update_date).normalize()

    if not df.empty and "close_approach_date" in df.columns:
        df["close_approach_date"] = pd.to_datetime(df["close_approach_date"])

    today = pd.Timestamp(datetime.now().date()).normalize()

    if df is not None and not df.empty:
        update_range = df.loc[
            (df["close_approach_date"] > last_update_date)
            & (df["close_approach_date"] < today)
        ]
        print(update_range)
        if not update_range.empty:
            helpers.bulk(es, doc_generator(update_range, index_name))
        else:
            print("No new data to update.")
    else:
        print("The DataFrame is empty or None.")


# Triggered from a message on a Cloud Pub/Sub topic.
@functions_framework.cloud_event
def hello_pubsub(cloud_event):
    index_name = "asteroid_data_set"
    es = connect_to_elastic()
    last_update_date = updated_last(es, index_name)
    print(last_update_date)
    response = connect_to_nasa(last_update_date)
    df = create_df(response)
    try:
        if df is None:
            raise ValueError("DataFrame is None. There may be a problem.")
        update_new_data(df, es, last_update_date, index_name)
        print(updated_last(es, index_name))
    except Exception as e:
        print(f"An error occurred: {e}")

添加 requirements.txt 文件

你还需要定义一个requirements.txt 文件,其中包含运行代码所需的所有指定包。

functions-framework==3.*
requests==2.31.0
elasticsearch==8.12.0
pandas==2.1.4

调度你的云函数

在 Cloud Scheduler 中,你可以将函数设置为使用 unix cron 格式定期运行。 我将代码设置为每天早上 8 点在我的时区运行。

你还需要配置执行以连接到你之前创建的 Pub/Sub 主题。 我目前将消息正文设置为 “hello”。

现在你已经设置了 Pub/Sub 主题和 Cloud Function 并将该 Cloud Function 设置为按计划运行,只要出现新数据,你的索引就会自动更新。

结论

使用 Python、Google Cloud Platform Functions 和 Google Cloud Scheduler,你应该能够确保定期更新索引。 你可以在此处找到完整的代码以及用于本地测试的搜索实验室笔记本。 我们还与 Google Cloud 一起举办了一场点播网络研讨会,如果你想构建搜索应用程序,这可能是一个不错的下一步。 如果你基于此博客构建了任何内容,或者如果你对我们的讨论论坛和社区 Slack 频道有疑问,请告诉我们。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/479143.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

VMWare虚拟机使用openmediavault搭建NAS服务器完整步聚

下载: gopenmediavault - The open network attached storage solution 下载好openmediavault的ISO镜像后,打开虚拟机并安装 系统类型选择Debian 启动虚拟机并安装openmediavault 选择中文 地区选中国 键盘配置选汉语 开始安装 配置网络信息 配置root密码 确认密码 系统安装中…

LeetCode # 199. 二叉树的右视图

199. 二叉树的右视图 题目 给定一个二叉树的 根节点 root&#xff0c;想象自己站在它的右侧&#xff0c;按照从顶部到底部的顺序&#xff0c;返回从右侧所能看到的节点值。 示例 1: 输入: [1,2,3,null,5,null,4] 输出: [1,3,4] 示例 2: 输入: [1,null,3] 输出: [1,3] 示例 3…

最新域名出售交易平台源码修复版

最新域名出售交易平台源码修复版 测试正常分享&#xff0c;保障你使用愉快&#xff0c;源码测试修复不易&#xff0c;拿走留痕是美德 无需到处找教程或修复&#xff0c;教程一次性到位&#xff0c;无需藏着掖着 最新修复版域名出售交易平台源码&#xff0c;搭建即可正常使用…

【群晖】Docker Compose部署 Emby Server

【群晖】Docker Compose部署 Emby Server 本来群晖上面的 Emby 是用套件安装的&#xff0c;但是不巧的是前两天脑袋一抽装了两个插件&#xff0c;导致 Emby Server被当肉鸡了&#xff0c;还找不到脚本代码在哪儿&#xff0c;一天时间上传了3T的流量。无奈之下&#xff0c;只能尝…

头条网盘拉新项目该怎么选择授权

作为十二月份首发上线的项目——头条网盘拉新。一经上线就受到很多想要做这行业人的关注&#xff0c;光是佣金已经是业内比较高的了&#xff01;每拉新一位新用户就可以获取到价格为9元一单的佣金&#xff0c;拉失活用户也可以获取价格为4元的佣金&#xff0c;推广方式和其他网…

真机笔记(2)项目分析

目录 1. 项目&#xff1a; 2. 网络工程师工作流程 3. 实验 设备命名 登录密码 使用SSH协议 1. 项目&#xff1a; 竞标方&#xff1a;集成商、厂商、代理商、服务商、监理检测公司 在一个网络项目中&#xff0c;不同的角色承担着不同的职责和任务。以下是集成商、厂商、代…

JavaScript-Web学习笔记01

一、Web APIs 1、Web API Web API 是浏览器提供的一套操作浏览器功能和页面元素的API&#xff08;BOM 和 DOM&#xff09;。 2、总结 API 是为我们提供的一个接口&#xff0c;帮助我们实现某种功能Web API 主要是针对浏览器提供的接口&#xff0c;主要针对浏览器做交互效果。W…

腾讯云COS - 前端上传文件到 COS 跨域问题

问题描述 原因分析 因为我本地的地址是&#xff1a;http://localhost:9528 而发送请求时的地址是&#xff1a;http://132-1307119153.cos.ap-beijing.myqcloud.com/tu.jpg 域名不同&#xff0c;自然而然就出现了跨域的问题&#xff01; 解决方案 先点击对象存储 - 安全设置…

吃瓜Grok大模型

段子区 今年当地时间2月29日晚&#xff0c;马斯克闹出来一件大事——正式起诉OpenAI和Sam Altman&#xff0c;并要求OpenAI 恢复开源GPT-4等模型。国际流量大师我只付服马斯克和川宝!&#xff01; 当大家觉得这扯皮的故事就此结束后&#xff0c;马斯克“不负众望”的整了一个大…

算法-图的强连通分量,图的最小生成树

1.图的强连通分量 (1). 定义 图的强连通分量是图论中的一个重要概念&#xff0c;主要在有向图中进行讨论。具体来说&#xff0c;如果在一个有向图G中&#xff0c;任意两个顶点vi和vj&#xff08;其中vi大于vj&#xff09;之间都存在一条从vi到vj的有向路径&#xff0c;同时也存…

解锁AI之门:协助探索Amazon Bedrock服务

AI愈加强大的功能和广泛的应用场景&#xff0c;正逐渐改变着我们的工作和生活方式。 Amazon Bedrock在AI的时代潮流中&#xff0c;也以其强大而灵活的功能特性&#xff0c;正在成为越来越多企业和个人的智能助手。 亚马逊云科技通过VERYCLOUD睿鸿股份的服务能力&#xff0c;使…

PyTorch深度学习:如何提升遥感影像的地物分类精度?

我国高分辨率对地观测系统重大专项已全面启动&#xff0c;高空间、高光谱、高时间分辨率和宽地面覆盖于一体的全球天空地一体化立体对地观测网逐步形成&#xff0c;将成为保障国家安全的基础性和战略性资源。未来10年全球每天获取的观测数据将超过10PB&#xff0c;遥感大数据时…

F. Microcycle(dfs 搜寻路径 + 并查集)

解析&#xff1a; 本题的意思是&#xff0c;求一个环的最小的那条边。 并且输出其这个环的点。 我们可以利用并查集&#xff0c;进行确定其是否有环路。在将所用的边从大到小排序。 利用 vector容器&#xff0c;pop_back() 和 push的特性。 起点为 u终点为 v寻找路径。 代…

P2789 直线交点数题解

题目 假设平面上有N条直线&#xff0c;且无三线共点&#xff0c;那么这些直线一共能有多少不同的交点数&#xff1f; 输入输出格式 输入格式 一行&#xff0c;一个整数N&#xff0c;代表有N条直线。 输出格式 一行&#xff0c;一个整数&#xff0c;表示方案总数。 输入输…

金融知识分享系列之:出场信号RSI指标

金融知识分享系列之&#xff1a;出场信号RSI指标 一、出场信号RSI指标二、RSI指标原理三、 指标用法四、RSI指标总结 一、出场信号RSI指标 名称&#xff1a;相对强弱指标参数&#xff1a;(默认14)组成&#xff1a;RSI线以及30轴、50轴、70轴构成 0-30是极弱&#xff1a;0-30的…

天天爱掼蛋规则

一、牌型 1、单张&#xff1a;任意一张单牌&#xff1b; 2、对子&#xff1a;任意两张点数相同的牌&#xff0c;如33、44&#xff1b; 3、三同张&#xff1a;三张牌点相同的牌型&#xff0c;如555&#xff1b; 4、三同连张&#xff08;也叫钢板&#xff09;&#xff1a;两组…

蓝桥杯单片机快速开发笔记——特训2 按键的长按与短按

一、题目要求 在CT107D单片机综合训练平台上&#xff0c;通过I/O模式编写代码&#xff0c;实现以下功能&#xff1a; 系统上电后&#xff0c;关闭蜂鸣器、继电器和全部指示灯&#xff0c;数码管显示初始值为28&#xff0c;仅显示数码管最右边两位。利用定时器0实现10ms间隔定…

分享基于PDF.js的pdf阅读器代码

一、前言 有时候开发PC端web页面的时候会遇到PDF预览、下载等功能&#xff0c;为了兼容浏览器&#xff0c;需要找一款前端插件进行开发。比较好的PDF插件&#xff0c;就是mozilla的pdf.js&#xff08;注意是mozilla&#xff0c;如果你百度遇到需要收费的&#xff0c;那应该是下…

使用clion开发tftlcd屏,移植驱动时遇到的问题记录

问题现象 屏幕只有一半屏在刷新 问题出现的情况(在CLION开发时遇到过) 总结

构造函数和析构函数两兄弟的作用是什么

[TOP] &#xff08;1&#xff09;构造函数 1.1 概念 对于以下Date类&#xff1a; class Date { public:void Init(int year, int month, int day){_year year;_month month;_day day;}void Print(){cout << _year << "-" << _month << …