python -【es】基本使用

一. 前言

在Python中使用Elasticsearch(ES)通常涉及安装Elasticsearch的Python客户端库,然后通过该库与Elasticsearch集群进行交互。

二. 基本使用

1. 安装Elasticsearch Python客户端库

首先,你需要安装elasticsearch库。你可以使用pip来安装它:

pip install elasticsearch

2. 连接到Elasticsearch集群

在Python中,你可以通过创建一个Elasticsearch对象来连接到Elasticsearch集群。

from elasticsearch import Elasticsearch

# 创建Elasticsearch客户端实例
es = Elasticsearch(['http://localhost:9200'])

# 检查连接是否成功
if es.ping():
    print("Successfully connected to Elasticsearch!")
else:
    print("Could not connect to Elasticsearch")

3. 执行索引操作

创建索引
在Elasticsearch中,索引类似于关系型数据库中的表。可以使用客户端实例的indices.create()方法来创建一个新的索引。

# 创建索引的请求体(这里是一个简单的例子,实际使用中可能更复杂)
index_body = {
    "settings": {
        "number_of_shards": 1,
        "number_of_replicas": 1
    },
    "mappings": {
        "properties": {
            "field1": {"type": "text"},
            "field2": {"type": "integer"}
        }
    }
}

# 创建索引
es.indices.create(index='my_index', body=index_body)

添加文档
可以使用index()方法来向索引中添加文档。

# 添加文档的请求体
doc_body = {
    "field1": "value1",
    "field2": 123
}

# 添加文档(指定索引名和文档ID)
es.index(index='my_index', id=1, body=doc_body)

4. 执行搜索操作

使用search()方法来执行搜索查询。

# 查询DSL
query_body = {
    "query": {
        "match": {
            "field1": "value1"
        }
    }
}

# 执行搜索
response = es.search(index='my_index', body=query_body)

# 处理响应
for hit in response['hits']['hits']:
    print(hit['_source'])

三. 整合封装成一个类来使用

import json
import uuid
from datetime import datetime, timedelta

from elasticsearch import Elasticsearch

from base.common.time_format import get_isoformat_time
from configure.configure import config
from configure.server_config import logger
import time
import traceback

es_conf = config['elasticsearch']


class ElasticSearchService(Elasticsearch):
    es_service = None

    mappings = {
        "properties": {
            # "id": {"type": "keyword"},
            "content": {
                "type": "text",
                "analyzer": "ik_max_word",
                "search_analyzer": "ik_smart"
            },
            "time": {"type": "date", "format": "yyyy-MM-dd'T'HH:mm:ss"},
            "qst_id": {"type": "keyword"},
            "reply_type": {"type": "integer"}
        }
    }

    def __init__(self, index_name, addrs, *args, **kwargs):
        self.max_result_window = es_conf['max_result_window']
        self.username = es_conf['username']
        self.password = es_conf['password']
        self.index_name = index_name
        self.addrs = addrs
        super().__init__(self.addrs, basic_auth=(self.username, self.password), request_timeout=3600)
        
        # 1.查询index是否存在
        if not self.indices.exists(index=self.index_name):
            self.create_index(self.index_name)
        if not self.ping():
            logger.error(f"ElasticSearchHandler Connection failed")
        logger.info(
            f"Connect to ElasticService successfully!!! addrs:{addrs}, index_name:{self.index_name}")

    def create_index(self, index_name):
        # 创建索引
        if not self.indices.exists(index=index_name):
            response = self.indices.create(index=index_name, body={})
            logger.info(f"Index [{index_name}] created successfully!")
            # 检查索引是否创建成功
            if not response.get('acknowledged'):
                logger.info(f"Failed to create index '{index_name}'. Response: {response}")
                return False

            self.create_mapping_session_history()
            self.create_index_setting()

            if response.get('shards_acknowledged'):
                logger.info(f"Index '{index_name}' All shards are acknowledged.")
            else:
                logger.info(f"Index '{index_name}' Not all shards are acknowledged.")

    def create_mapping_session_history(self):
        mapping = ElasticSearchService.mappings
        # 将mapping添加到索引
        response = self.indices.put_mapping(index=self.index_name, body=mapping)

        # 检查索引是否创建成功
        if response.get('acknowledged'):
            logger.info(f"Index '{self.index_name}' created successfully with mapping.")
        else:
            logger.info(f"Failed to create index '{self.index_name}'. Response: {response}")

    def create_index_setting(self):
        setting = {"number_of_replicas": "0"}
        # 将setting添加到索引
        response = self.indices.put_settings(index=self.index_name, body=setting)

        # 检查索引是否创建成功
        if response.get('acknowledged'):
            logger.info(f"Index '{self.index_name}' created successfully with setting.")
        else:
            logger.info(f"Failed to create index setting'{self.index_name}'. Response: {response}")

    def delete_index(self, index_name):
        res = self.indices.delete(index=index_name)
        logger.info(f"Index [{index_name}] deleted successfully!, res: {res}")
        return res

    def insert_doc(self, hist_hash_id: str, doc_body: dict):
        """
        新增数据
        :param hist_hash_id:
        :param doc:
        :return:
        """
        try:
            self.index(index=self.index_name, id=hist_hash_id, body=doc_body)
            # 刷新索引以确保文档立即可见
            res = self.indices.refresh(index=self.index_name)
            logger.info(f"Document hist_hash_id:[{hist_hash_id}] indexed successfully!")
            return res
        except Exception as e:
            logger.error(f"Failed to index document hist_hash_id:[{hist_hash_id}]: {e}")

    def bulk_insert_docs(self, session_histories: list):
        """
        批量新增数据
        :param chitchat_list:
        :return:
        """
        try:
            # 准备批量数据
            bulk_actions = []
            failed_list = []
            batch_count = 1000
            for i in range(0, len(session_histories), batch_count):
                for item in session_histories[i:i + batch_count]:
                    doc = {
                        # "id": item.get('id', 0),
                        "you_feild": item.get('you_feild', ''),
                        ...
                        }
                    action = {
                        "index": {  # Use "index" as the action
                            "_index": self.index_name,
                            # 如果需要指定文档ID,可以取消下面的注释
                            "_id": item.get('hist_hash_id', '')
                        }
                    }
                    # 将 action 和 doc 作为元组的两个元素添加到 bulk_actions 列表中
                    bulk_actions.append(action)
                    bulk_actions.append(doc)
                    print(f"insert data -> {item}")
                response = self.bulk(index=self.index_name, body=bulk_actions)

                # 检查响应中的成功和失败项
                for item in response['items']:
                    if item['index']['status'] != 201:
                        failed_list.append(item)
                logger.info(f"Elasticsearch 批量新增完成,failed_list:{failed_list}")
                # 刷新索引以确保文档立即可见
                self.indices.refresh(index=self.index_name)
            return failed_list
        except Exception as e:
            traceback.print_exc()
            logger.error(f"Elasticsearch bulk insert doc error:{e}")

    def delete_doc_by_id(self, doc_ids):
        """ 删除文档 """
        try:
            failed_list = []
            for doc_id in doc_ids:
                response = self.delete(index=self.index_name, id=doc_id)
                # 检查响应状态
                if response.meta.status != 200:
                    failed_list.append(doc_id)
                    logger.info(f"Document with ID {doc_id} in index {self.index_name} was deleted failed!")
                logger.info(f"Document with ID {doc_id} in index {self.index_name} was deleted successfully.")
            return failed_list
        except Exception as e:
            traceback.print_exc()
            logger.error(f"Elasticsearch delete_doc error:{e}")

    def delete_docs_by_query_body(self, query_body):
        # 使用_delete_by_query API 删除所有文档
        # 注意:这里我们使用了一个匹配所有文档的查询:{"query": {"match_all": {}}}
        try:
            response = self.delete_by_query(index=self.index_name, body=query_body)
            logger.info("Deleted documents:", response['_deleted'])  # 这将显示被删除的文档数量
        except Exception as e:
            # 捕获并处理异常
            logger.error(f"Deleted docs error: {e}")

    def update_doc(self, datas: list):
        """ 更新文档 """
        try:
            failed_list = []
            for data in datas:
                # 更新文档(实际上是重新索引)
                response = self.index(index=self.index_name, id=data['doc_id'], body=data['doc'])
                logger.info("Update Response:", response)
                if response.meta.status != 200:
                    failed_list.append(data)
            # 刷新索引以立即应用更改(可选)
            self.indices.refresh(index=self.index_name)
            logger.info(f"Elasticsearch update_doc finished! failed_list -> {failed_list}")
        except Exception as e:
            traceback.print_exc()
            logger.error(f"Elasticsearch update_doc error: {e}")

    def get_doc(self, doc_id):
        """ 获取文档数据 """
        try:
            doc = self.get(index=self.index_name, id=doc_id)['_source']
            return doc
        except Exception as e:
            logger.error(f"Error retrieving document {doc_id}: {e}")
            return None

    def search_index(self, query_body):
        """
        检索文档
        query_body:查询体(Query DSL)
        """
        try:
            logger.info(f'elasticsearch search index_name={self.index_name},query_body={query_body}')
            response = self.search(index=self.index_name, body=query_body)
            logger.info(f'elasticsearch search [{self.index_name}] response.meta.status={response.meta.status}')
            if response.get('_shards', {}).get('successful') == response.get('_shards', {}).get('total'):
                logger.info(
                    f"Search index successful! total count={response['hits']['total']['value']}, match count={len(response['hits']['hits'])}")
                # logger.info(f"search response -> {response}")
                if response['hits']['total']['value'] > 0:
                    return response
                return None
            return None
        except Exception as e:
            traceback.print_exc()
            logger.error(f"ElasticService search_index error:{e}")

    def search_index_by_scroll_api(self, query_body):
        """
        分页查询
        query_body:查询体(Query DSL)
        """
        try:
            logger.info(f'elasticsearch search index_name={self.index_name},query_body={query_body}')
            response = self.search(index=self.index_name, body=query_body, scroll='1m')
            logger.info(f'elasticsearch search [{self.index_name}] response.meta.status={response.meta.status}')
            if response.get('_shards', {}).get('successful') == response.get('_shards', {}).get('total'):
                logger.info(
                    f"Search index successful! total count={response['hits']['total']['value']}, match count={len(response['hits']['hits'])}")
                # logger.info(f"search response -> {response}")
                if response['hits']['total']['value'] > 0:
                    return response
                return None
            return None
        except Exception as e:
            traceback.print_exc()
            logger.error(f"ElasticService search_index error:{e}")

    def search_by_sql(self, sql_body):
        try:
            logger.info(f'elasticsearch search index_name={self.index_name},sql_body={sql_body}')
            response = self.sql.query(body=sql_body)
            logger.info(f'elasticsearch search [{self.index_name}] response.meta.status={response.meta.status}')
            if response.meta.status == 200:
                columns = response.get('columns')
                rows = response.get('rows')
                # 提取列名
                column_names = [col['name'] for col in columns]
                # 组织成字典格式
                result_dicts = []
                for row in rows:
                    result_dict = {column_names[i]: row[i] for i in range(len(column_names))}
                    result_dicts.append(result_dict)
                logger.info(f"Search index successful! match count={len(result_dicts)}")
                return result_dicts
            return []
        except Exception as e:
            traceback.print_exc()
            logger.error(f"ElasticService search_by_sql error:{e}")
            return []


def get_elastic_instance(index_name, addrs):
    _es_service = None
    _wait_times = 0
    _try_count = 5
    _interval_seconds = 10
    for i in range(_try_count):  # 初始化后,尝试启动5次,第次间隔10秒
        try:
            _es_service = ElasticSearchService(index_name, addrs)
            if _es_service:
                logger.info(f"ElasticService initial successfully!")
                print(f"ElasticService initial successfully!")
                return _es_service
            logger.warning(f"Connect to elasticServer failed, try reconnect to elasticServer [{i}]!")
        except Exception as e:
            traceback.print_exc()
            logger.warning(
                f"初始化ElasticService失败,结果:{_es_service}, 异常原因:{str(e)}, 应用将在{_interval_seconds}秒后重新尝试.")
            time.sleep(_interval_seconds)


es_service = None
port = es_conf['port']
host = es_conf['host']
addrs = [f"http://{host}:{port}", ]

if config['elasticsearch']['enabled']:
    index_name = config['elasticsearch']['session_history_index']
    es_service = get_elastic_instance(index_name, addrs)
else:
    logger.info(f"[elasticsearch] 未启用! enabled -> {config['elasticsearch']['enabled']}")

if __name__ == '__main__':
    index_name = config['elasticsearch']['session_history_index']
    es_service = get_elastic_instance(index_name, addrs)
    # 添加文档
    docs = [{
        # "id": i + 1,
        "you_feild": "",
        ...
    } for i in range(5)]
    # 插入数据
    # es_service.insert_doc('2', doc)
    print(es_service.bulk_insert_docs(docs))

    # 删除index
    # print(es_service.delete_index(index_name))

    # 获取文档
    # print(es_service.get_doc('c2b27b31-80f8-4cf6-b3f2-36683b60d7da'))
    # logger.info(es_service.get_doc('2'))

    # 删除文档
    # logger.info(es_service.delete_doc_by_id(['f554d0e5-e4cc-4556-952b-b12cdc640fe56']))
    # query_body = {"query": {"match_all": {}}}
    # logger.info(es_service.delete_docs_by_query_body(query_body))

    # 更新数据
    # datas = [{'doc_id': 'c2b27b31-80f8-4cf6-b3f2-36683b60d7da', 'doc': {'qst_content': 'qqq'}}]
    # print(es_service.update_doc(datas))

    # 查询数据
    keyword = "缴清"
    query_body = {
        "query": {
            "multi_match": {
                "query": keyword,
                "fields": ["reply_content", "qst_content", "standard_qst"]
            }
        },
        "from": 0,
        "size": 10,
        "sort": [{
            "chat_qst_time": "desc"
        }]
    }
    # print(es_service.search_index(query_body))

四. 总结

以上是使用Python与Elasticsearch进行交互的基本步骤。可以根据实际需求扩展这些操作,例如处理更复杂的查询、使用聚合、批量操作等。Elasticsearch的Python客户端库提供了丰富的API,可以满足大多数与Elasticsearch交互的需求。
希望对你有所帮助!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/946040.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Edge安装问题,安装后出现:Could not find Edge installation

解决:需要再安装(MicrosoftEdgeWebView2RuntimeInstallerX64)。 网址:https://developer.microsoft.com/zh-cn/microsoft-edge/webview2/?formMA13LH#download 如果已经安装了edge,那就再下载中间这个独立程序安装就…

【JAVA高级篇教学】第六篇:Springboot实现WebSocket

在 Spring Boot 中对接 WebSocket 是一个常见的场景,通常用于实现实时通信。以下是一个完整的 WebSocket 集成步骤,包括服务端和客户端的实现。本期做个简单的测试用例。 目录 一、WebSocket 简介 1. 什么是 WebSocket? 2. WebSocket 的特…

Painter-Mortadela靶场

信息收集 枚举端口 nmap 192.168.109.132 -sS -sV -min-rate 5000 -Pn -p- -p- :扫描所有端口。 (65535)-sS:执行TCP SYN 扫描以快速扫描哪些端口打开。-sC:使用基本识别脚本执行扫描-sV:执行服务扫描–min-rate 5000&#xff1…

攻防世界pwn刷题

get_shell 这题直接给shell了 exp from pwn import* p remote(61.147.171.105,59682) p.sendline(cat flag) p.interactive() cyberpeace{8cd678c722f48327a69b2661ae8956c8} hello_pwn checksec一下 ok,64位的 {alarm(0x3Cu);setbuf(stdout, 0LL);puts("…

1、pycharm、python下载与安装

1、去官网下载pycharm 官网:https://www.jetbrains.com/pycharm/download/?sectionwindows 2、在等待期间,去下载python 进入官网地址:https://www.python.org/downloads/windows/ 3、安装pycharm 桌面会出现快捷方式 4、安装python…

epoll的ET和LT模式

LevelTriggered:简称LT,当FD有数据可读时,会重复通知多次,直至数据处理完成。是epoll的默认模式EdgeTriggered:简称ET,当FD有数据可读时,只通知一次,不管数据是否处理完成 Level是指…

CSS利用浮动实现文字环绕右下角,展开/收起效果

期望实现 文字最多展示 N 行,超出部分截断,并在右下角显示 “…” “更多”; 点击更多,文字展开全部内容,右下角显示“收起”。效果如下: 思路 尽量使用CSS控制样式,减少JS代码复杂度。 利…

单元测试入门和mockup

Java 新手入门:Java单元测试利器,Mock详解_java mock-CSDN博客 这个是典型的before when assert三段式,学一下单测思路 这个没有动态代理,所以是直接class(对比下面) Jmockit使用笔记_增加代码覆盖率_覆盖try catch_使用new Mock…

开发小工具:ping地址

开发小工具:ping地址 import socketdef tcp_port_scan(ip,port):#创建套接字socksocket.socket(socket.AF_INET,socket.SOCK_STREAM)#设置超时sock.settimeout(0.2)try:#发请求result sock.connect_ex((ip,port))if result 0:print(f{ip}--{port}接口连接成功)res…

双汇火腿肠,请勿随意喂猫

在许多家庭中,猫咪作为可爱的宠物成员,备受宠爱。当我们享受着双汇火腿肠的便捷与美味时,或许会有人想到与猫咪分享,但这种看似温馨的举动实则隐藏着诸多问题,双汇火腿肠并不适合喂猫。 从营养成分来看,双…

Unity Excel转Json编辑器工具

功能说明:根据 .xlsx 文件生成对应的 JSON 文件,并自动创建脚本 注意事项 Excel 读取依赖 本功能依赖 EPPlus 库,只能读取 .xlsx 文件。请确保将该脚本放置在 Assets 目录下的 Editor 文件夹中。同时,在 Editor 下再创建一个 Exc…

深信服云桌面系统的终端安全准入设置

深信服的云桌面系统在默认状态下没有终端的安全准入设置,这也意味着同样的虚拟机,使用云桌面终端或者桌面套件都可以登录,但这也给系统带来了一些安全隐患,所以,一般情况下需要设置终端的安全准入策略,防止…

基于SpringBoot的实验室信息管理系统【源码+文档+部署讲解】

系统介绍 视频演示 基于SpringBootVue实现的实验室信息管理系统采用前后端分离的架构方式,系统分为管理员、老师、用户三种角色,实现了用户管理、设备管理、实验室查询、公告、课程、实验室耗材管理、我的等功能 技术选型 开发工具:idea2…

Windows 10 自带功能实现大屏、小屏无线扩展

一、添加可选功能 在作为无线投屏对象的「第二屏」设备上,打开 Windows 10 设置并定位至「应用 > 应用和功能」界面,然后点击右侧界面中的「可选功能」选项。 点击可选功能界面顶部的「添加功能」按钮,搜索「无线显示器」模块并选择添加。…

大电流和大电压采样电路

大电压采样电路: 需要串联多个电阻进行分压,从而一级一级降低电压,防止电阻损坏或者短路直接打穿MCU。 为什么需要加电压跟随器:进行阻抗的隔离,防止MCU的IO阻抗对分压产生影响: 大电流检测电路&#xff…

torch.nn.functional的用法

文章目录 介绍激活函数示例 损失函数示例 卷积操作示例 池化示例 归一化操作示例 Dropout示例 torch.nn.functional 与 torch.nn 的区别 介绍 torch.nn.functional 是 PyTorch 中的一个模块,提供了许多函数式的神经网络操作,包括激活函数、损失函数、卷…

生物信息学软件开发综述学习

目录 ①编程语言和开源工具和库 ②轻量级 R 包开发 ③大规模组学软件开发 ④示例 1.轻量级 R 包开发示例及数据 2.大规模组学软件开发 文献:Bioinformatics software development: Principles and future directions ①编程语言和开源工具和库 在生物信息学…

【复刻】数字化转型是否赋能企业新质生产力发展?(2015-2023年)

参照赵国庆(2024)的做法,对来自产业经济评论《企业数字化转型是否赋能企业新质生产力发展——基于中国上市企业的微观证据》一文中的基准回归部分进行复刻基于2015-2023年中国A股上市公司数据,实证分析企业数字化转型对新质生产力…

在线免费批量生成 Word 文档工具

为了方便的批量生成 Word 文档,写了个在线 Word 文档批量生成工具,可以根据 Excel 数据和 Word 模板批量生成大量个性化的 Word 文档。适用于需要批量生成格式统一但内容不同的文档场景。比如: 批量生成证书、奖状批量生成合同、协议批量生成…

3D数学基础2

矩阵的行列式 在任意方阵中都存在至少一个标量,称作该方阵的行列式。在线性代数中,行列式有很多有用的性质 线性运算法则 方阵 M M M的行列式记作 ∣ M ∣ |M| ∣M∣或“det M”。非方阵矩阵的行列式是未定义的。 注意,在书写行列式时&…