将 OneLake 数据索引到 Elasticsearch - 第二部分

作者:来自 Elastic Gustavo Llermaly 及 Jeffrey Rengifo

本文分为两部分,第二部分介绍如何使用自定义连接器将 OneLake 数据索引并搜索到 Elastic 中。

在本文中,我们将利用第 1 部分中学到的知识来创建 OneLake 自定义 Elasticsearch 连接器。

我们已经上传了一些 OneLake 文档并将其索引到 Elasticsearch 中以供搜索。但是,这仅适用于一次性上传。如果我们想要同步数据,那么我们需要开发一个更复杂的系统。

幸运的是,Elastic 有一个连接器框架可用于开发满足我们需求的自定义连接器:

我们现在将根据本文制作一个 OneLake 连接器:如何为 Elasticsearch 创建自定义连接器。

步骤

  • 连接器引导
  • 实现 BaseDataSource 类
  • 身份验证
  • 运行连接器
  • 配置计划

连接器引导

背景信息:Elastic 连接器分为两种类型:

  1. Elastic 托管连接器:完全由 Elastic Cloud 托管和运行。
  2. 自托管连接器:由用户自行托管,必须部署在你的基础设施中。

自定义连接器属于 “连接器客户端” 类别,因此我们需要下载并部署连接器框架。

首先,克隆连接器的代码库:

git clone https://github.com/elastic/connectors

现在在 requirements/framework.txt 文件末尾添加你将使用的依赖项。在本例中:

azure-identity==1.19.0
azure-storage-file-datalake==12.17.0

这样,存储库就完成了,我们可以开始编码了。

实现 BaseDataSource 类

你可以在此存储库中找到完整的工作代码。

我们将介绍 onelake.py 文件中的核心部分。

在导入和类声明之后,我们必须定义将捕获配置参数的 __init__ 方法。

"""OneLake connector to retrieve data from datalakes"""

from functools import partial

from azure.identity import ClientSecretCredential
from azure.storage.filedatalake import DataLakeServiceClient

from connectors.source import BaseDataSource

ACCOUNT_NAME = "onelake"


class OneLakeDataSource(BaseDataSource):
    """OneLake"""

    name = "OneLake"
    service_type = "onelake"
    incremental_sync_enabled = True

    # Here we can enter the data that we'll later need to connect our connector to OneLake.
    def __init__(self, configuration):
        """Set up the connection to the azure base client

        Args:
            configuration (DataSourceConfiguration): Object of DataSourceConfiguration class.
        """
        super().__init__(configuration=configuration)
        self.tenant_id = self.configuration["tenant_id"]
        self.client_id = self.configuration["client_id"]
        self.client_secret = self.configuration["client_secret"]
        self.workspace_name = self.configuration["workspace_name"]
        self.data_path = self.configuration["data_path"]

然后,你可以配置 UI 将显示的表单,使用返回配置字典的 get_default_configuration 方法填充这些参数。

    # Method to generate the Enterprise Search UI fields for the variables we need to connect to OneLake.
    @classmethod
    def get_default_configuration(cls):
        """Get the default configuration for OneLake

        Returns:
            dictionary: Default configuration
        """
        return {
            "tenant_id": {
                "label": "OneLake tenant id",
                "order": 1,
                "type": "str",
            },
            "client_id": {
                "label": "OneLake client id",
                "order": 2,
                "type": "str",
            },
            "client_secret": {
                "label": "OneLake client secret",
                "order": 3,
                "type": "str",
                "sensitive": True, # To hide sensitive data like passwords or secrets
            },
            "workspace_name": {
                "label": "OneLake workspace name",
                "order": 4,
                "type": "str",
            },
            "data_path": {
                "label": "OneLake data path",
                "tooltip": "Path in format <DataLake>.Lakehouse/files/<Folder path>",
                "order": 5,
                "type": "str",
            },
            "account_name": {
                "tooltip": "In the most cases is 'onelake'",
                "default_value": ACCOUNT_NAME,
                "label": "Account name",
                "order": 6,
                "type": "str",
            },
        }

然后我们配置下载方法,并从 OneLake 文档中提取内容。

async def download_file(self, file_client):
        """Download file from OneLake

        Args:
            file_client (obj): File client

        Returns:
            generator: File stream
        """

        try:
            download = file_client.download_file()
            stream = download.chunks()

            for chunk in stream:
                yield chunk
        except Exception as e:
            self._logger.error(f"Error while downloading file: {e}")
            raise

    async def get_content(self, file_name, doit=None, timestamp=None):
        """Obtains the file content for the specified file in `file_name`.

        Args:
            file_name (obj): The file name to process to obtain the content.
            timestamp (timestamp, optional): Timestamp of blob last modified. Defaults to None.
            doit (boolean, optional): Boolean value for whether to get content or not. Defaults to None.

        Returns:
            str: Content of the file or None if not applicable.
        """

        if not doit:
            return

        file_client = await self._get_file_client(file_name)
        file_properties = file_client.get_file_properties()
        file_extension = self.get_file_extension(file_name)

        doc = {
            "_id": f"{file_client.file_system_name}_{file_properties.name}",  # workspacename_data_path
            "name": file_properties.name.split("/")[-1],
            "_timestamp": file_properties.last_modified,
            "created_at": file_properties.creation_time,
        }

        can_be_downloaded = self.can_file_be_downloaded(
            file_extension=file_extension,
            filename=file_properties.name,
            file_size=file_properties.size,
        )

        if not can_be_downloaded:
            return doc

        extracted_doc = await self.download_and_extract_file(
            doc=doc,
            source_filename=file_properties.name.split("/")[-1],
            file_extension=file_extension,
            download_func=partial(self.download_file, file_client),
        )

        return extracted_doc if extracted_doc is not None else doc

为了让我们的连接器对框架可见,我们需要在 connectors/config.py 文件中声明它。为此,我们将以下代码添加到源中:

 "sources": {
   ...
    "onelake": "connectors.sources.onelake:OneLakeDataSource",
    ...
 }

身份验证

在测试连接器之前,我们需要获取 client_id, tenant_id 和 client_secret,我们将使用它们从连接器访问工作区。

我们将使用 service principals 作为身份验证方法。

Azure service principal 是为与应用程序、托管服务和自动化工具一起使用以访问 Azure 资源而创建的身份。

步骤如下:

  1. 创建应用程序并收集 client_id、tenant_id 和 client_secret
  2. 在工作区中启用 service principal
  3. 将 service principal 添加到工作区

你可以逐步遵循本教程。

准备好了吗?现在是测试连接器的时候了!

运行连接器

连接器准备好后,我们现在可以连接到我们的 Elasticsearch 实例。

转到: Search > Content > Connectors > New connector 并选择 Customized Connector

选择要创建的名称,然后选择 “Create and attach an index” 以创建与连接器同名的新索引。

你现在可以使用 Docker 运行它或从源代码运行它。在此示例中,我们将使用 “Run from source”。

单击 “Generate Configuration”,然后将框中的内容粘贴到项目根目录中的 config.yml 文件中。在字段 service_type 上,你必须匹配 Connectors/config.py 中的连接器名称。在本例中,将 changeme 替换为 onelake。

现在,你可以使用以下命令运行连接器:

make install
make run

如果连接器正确初始化,你应该在控制台中看到如下消息:

注意:如果出现兼容性错误,请检查你的连接器/版本文件并与你的 Elasticsearch 集群版本进行比较:与 Elasticsearch 的版本兼容性。我们建议保持连接器版本和 Elasticsearch 版本同步。在本文中,我们使用 Elasticsearch 和连接器版本 8.15。

如果一切顺利,我们的本地连接器将与我们的 Elasticsearch 集群通信,我们将能够使用我们的 OneLake 凭据对其进行配置:

我们现在将索引来自 OneLake 的文档。为此,请单击 Sync > Full Content,运行完整内容同步:

同步完成后,你应该在控制台中看到以下内容:

在企业搜索 UI 中,你可以单击 “Documents” 来查看已索引的文档:

配置计划

你可以根据需要使用 UI 安排定期内容同步,以使索引保持更新并与 OneLake 同步。

要配置计划同步,请转到 “Search > Content > Connectors,然后选择你的连接器。然后单击 “scheduling”:

或者,你可以使用允许 CRON 表达式的更新连接器调度 API。

结论

在第二部分中,我们通过使用 Elastic 连接器框架并开发我们自己的 OneLake 连接器来轻松与我们的 Elastic Cloud 实例通信,将我们的配置更进一步。

想要获得 Elastic 认证?了解下一次 Elasticsearch 工程师培训何时开始!

Elasticsearch 包含新功能,可帮助你为你的用例构建最佳搜索解决方案。深入了解我们的示例笔记本以了解更多信息,开始免费云试用,或立即在你的本地机器上试用 Elastic。

原文:Indexing OneLake data into Elasticsearch - Part II - Elasticsearch Labs

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/959444.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Formality:时序变换(三)(相位反转)

相关阅读 Formalityhttps://blog.csdn.net/weixin_45791458/category_12841971.html?spm1001.2014.3001.5482 一、引言 时序变换在Design Compiler的首次综合和增量综合中都可能发生&#xff0c;它们包括&#xff1a;时钟门控(Clock Gating)、寄存器合并(Register Merging)、…

php代码审计2 piwigo CMS in_array()函数漏洞

php代码审计2 piwigo CMS in_array()函数漏洞 一、目的 本次学习目的是了解in_array()函数和对项目piwigo中关于in_array()函数存在漏洞的一个审计并利用漏洞获得管理员帐号。 二、in_array函数学习 in_array() 函数搜索数组中是否存在指定的值。 in_array($search,$array…

房租管理系统的智能化应用助推租赁行业高效运营与决策优化

内容概要 在现代租赁行业中&#xff0c;房租管理系统的智能化应用正在逐步成为一个不可或缺的工具。通过整合最新技术&#xff0c;这些系统为租赁管理的各个方面提供了极大的便利和效率提升。从房源管理到合同签署再到财务监控&#xff0c;智能化功能能够帮助运营者在繁琐的事…

Hive关于数据库的语法,warehouse,metastore

关于数据库的语法 在default数据库下,查看其他数据库的表 in 打开控制台 字体大小的设置 Hive默认的库: default, 1/4说明一共有4个库,现在只展示了1个,单击>>所有架构 数据库的删除 方法一: 语法 删除有表的数据库,加cascade 方法二 当前连接的数据库 切换当前数据库…

【React】PureComponent 和 Component 的区别

前言 在 React 中&#xff0c;PureComponent 和 Component 都是用于创建组件的基类&#xff0c;但它们有一个主要的区别&#xff1a;PureComponent 会给类组件默认加一个shouldComponentUpdate周期函数。在此周期函数中&#xff0c;它对props 和 state (新老的属性/状态)会做一…

AI赋能医疗:智慧医疗系统源码与互联网医院APP的核心技术剖析

本篇文章&#xff0c;笔者将深入剖析智慧医疗系统的源码架构以及互联网医院APP背后的核心技术&#xff0c;探讨其在医疗行业中的应用价值。 一、智慧医疗系统的核心架构 智慧医疗系统是一个高度集成的信息化平台&#xff0c;主要涵盖数据采集、智能分析、决策支持、远程医疗等…

HTML-新浪新闻-实现标题-样式1

用css进行样式控制 css引入方式&#xff1a; --行内样式&#xff1a;写在标签的style属性中&#xff08;不推荐&#xff09; --内嵌样式&#xff1a;写在style标签中&#xff08;可以写在页面任何位置&#xff0c;但通常约定写在head标签中&#xff09; --外联样式&#xf…

【学习笔记】深度学习网络-深度前馈网络(MLP)

作者选择了由 Ian Goodfellow、Yoshua Bengio 和 Aaron Courville 三位大佬撰写的《Deep Learning》(人工智能领域的经典教程&#xff0c;深度学习领域研究生必读教材),开始深度学习领域学习&#xff0c;深入全面的理解深度学习的理论知识。 在之前的文章中介绍了深度学习中用…

如何在IDEA社区版Service面板中管理springboot项目

1、开启service仪表盘 2、在service仪表盘中&#xff0c;添加启动类配置项&#xff0c;专业版是SpringBoot 、社区版是application。 3、控制台彩色日志输出 右键启动类配置项&#xff0c;添加虚拟机参数 -Dspring.output.ansi.enabledALWAYS

如何在data.table中处理缺失值

&#x1f4ca;&#x1f4bb;【R语言进阶】轻松搞定缺失值&#xff0c;让数据清洗更高效&#xff01; &#x1f44b; 大家好呀&#xff01;今天我要和大家分享一个超实用的R语言技巧——如何在data.table中处理缺失值&#xff0c;并且提供了一个自定义函数calculate_missing_va…

.NET9增强OpenAPI规范,不再内置swagger

ASP.NETCore in .NET 9.0 OpenAPI官方文档ASP.NET Core API 应用中的 OpenAPI 支持概述 | Microsoft Learnhttps://learn.microsoft.com/zh-cn/aspnet/core/fundamentals/openapi/overview?viewaspnetcore-9.0https://learn.microsoft.com/zh-cn/aspnet/core/fundamentals/ope…

【redis初阶】redis客户端

目录 一、基本介绍 二、认识RESP&#xff08;redis自定的应用层协议名称&#xff09; 三、访问github的技巧 四、安装redisplusplus 4.1 安装 hiredis** 4.2 下载 redis-plus-plus 源码 4.3 编译/安装 redis-plus-plus 五、编写运行helloworld 六、redis命令演示 6.1 通用命令的…

蓝桥杯3518 三国游戏 | 排序

题目传送门 这题的思路很巧妙&#xff0c;需要算出每个事件给三国带来的净贡献&#xff08;即本国士兵量减其他两国士兵量&#xff09;并对其排序&#xff0c;根据贪心的原理累加贡献量直到累加结果不大于0。最后对三国的胜利的最大事件数排序取最值即可。 n int(input()) a …

基于vue框架的的信用社业务管理系统设计与实现4gnx5(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。

系统程序文件列表 项目功能&#xff1a;用户,用户销户,用户存款,用户取款,用户转账,理财类型,投资理财,理财订单,金属类别,贵金属,金属订单,产品分类,保险产品,保险订单 开题报告内容 基于Vue框架的信用社业务管理系统设计与实现开题报告 一、研究背景与意义 随着金融科技的…

自然语言处理——从原理、经典模型到应用

1. 概述 自然语言处理&#xff08;Natural Language Processing&#xff0c;NLP&#xff09;是一门借助计算机技术研究人类语言的科学&#xff0c;是人工智能领域的一个分支&#xff0c;旨在让计算机理解、生成和处理人类语言。其核心任务是将非结构化的自然语言转换为机器可以…

微信小程序1.1 微信小程序介绍

1.1 微信小程序介绍 内容提要 1.1 什么是微信小程序 1.2 微信小程序的功能 1.3 微信小程序使用场景 1.4 微信小程序能取代App吗 1.5 微信小程序的发展历程 1.6微信小程序带来的机会

【已解决】OSS配置问题

OSS SDK快速入门_对象存储(OSS)-阿里云帮助中心 阿里官方的SDK使用方法还得配置环境变量access Key、access Secret &#xff0c;我没有配置&#xff0c;仅把access Key和access Secret写到了yml文件读取&#xff0c;结果上传图片时还是出现下面的问题。 [ ERROR ] [ com.s…

SVN客户端使用手册

目录 一、简介 二、SVN的安装与卸载 1. 安装&#xff08;公司内部一般会提供安装包和汉化包&#xff0c;直接到公司内部网盘下载即可&#xff0c;如果找不到可以看下面的教程&#xff09; 2. 查看SVN版本 ​编辑 3. SVN卸载 三、SVN的基本操作 1. 检出 2. 清除认证数据 3. 提交…

Oracle迁移DM数据库

Oracle迁移DM数据库 1 数据准备 2 DTS工具操作步骤 2.1 创建工程 打开DTS迁移工具&#xff0c;点击新建工程&#xff0c;填写好工程信息&#xff0c;如图&#xff1a; 2.2 新建迁移任务 右击迁移>选择新建迁移>填写迁移名称>勾选启用&#xff0c;然后确认下一步…

正则表达式以及Qt中的使用

目录 一、正则表达式 1、基本匹配&#xff1a; 2、元字符&#xff1a; 2.1 .运算符&#xff1a; 2.2 字符集&#xff1a; 2.3 重复次数&#xff1a; 2.4 量词{} 2.5 特征标群() 2.6 或运算符 2.7 \反斜线转码特殊字符 2.8 锚点 3、简写字符 4、零宽度断言 4.1 正…