Python-ElasticSearch客户端的封装(聚合查询、统计查询、全量数据)

目录

  • ES Python客户端介绍
  • 封装代码
  • 测试代码
  • 参考


ES Python客户端介绍

官方提供了两个客户端elasticsearch、elasticsearch-dsl

pip install elasticsearch
pip install elasticsearch-dsl

第二个是对第一个的封装,类似ORM操作数据库,可以.filter、.groupby,个人感觉很鸡肋,star数也不多。平时使用的时候一般会在kibana上测试,然后直接把query拷贝过来获取更多数据,所以这里做下第一个的封装。

封装代码

  1. 封装后依然暴露了es,方便有特殊情况下使用
  2. index一般很少改动,就直接放到对象中了,可以使用set_index修改
  3. 常用的应该是get_doc和get_doc_scroll来获取少量和全量数据

代码测试时使用的是7.17.12版本,大于此版本可能由于官方改动出异常

pip install elasticsearch==7.17.12

es.py

import random
import string
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
from typing import List,Dict


class ESClient:

    def __init__(self, host="127.0.0.1",index="", http_auth = None):
        self.index = index
        if http_auth is None:
            self.es = Elasticsearch(hosts=host)
        else:
            self.es = Elasticsearch(hosts=host, http_auth=http_auth)
        print("success to connect " + host)

    def close(self):
        self.es.close()

    # 设置索引
    def set_index(self,index:str):
        self.index = index

    # 创建索引
    def create_index(self, index_name: str, mappings=None):
        res = self.es.indices.create(index=index_name, mappings=mappings)
        return res

    # 删除索引
    def delete_index(self, index_name: str):
        res = self.es.indices.delete(index=index_name)
        return res

    # 获取索引
    def get_index(self, index_name: str):
        res = self.es.indices.get(index=index_name)
        return res

    # 创建文档(单个)
    def create_doc(self,body, _id=''.join(random.sample(string.ascii_letters+string.ascii_uppercase+string.digits,20))):
        res = self.es.create(index=self.index, body=body, id=_id)
        return res

    # 创建文档(批量)
    def create_doc_bulk(self, docs: List[Dict]):
        actions = []
        for doc in docs:
            action = {
                "_index": self.index,
                "_op_type": "create",
                "_id": ''.join(random.sample(string.ascii_letters+string.ascii_uppercase+string.digits,20))
            }
            for k,v in doc.items():
                action[k] = v
            actions.append(action)
        res = bulk(client=self.es, actions=actions)
        return res

    # 删除文档
    def delete_doc(self, doc_id):
        res = self.es.delete(index=self.index, id=doc_id)
        return res

    # 更新文档
    def update_doc(self, doc_id, doc:Dict):
        body = {
            "doc" : doc
        }
        res = self.es.update(index=self.index, id=doc_id, body=body)
        return res

    # 分页获取超过100000的文档
    def get_doc_scroll(self,query:Dict):
        res = self.es.search(index=self.index,size=10000,body=query,search_type="query_then_fetch",scroll="5m")
        data_list = []
        hits = res.get("hits")
        scroll_id = res.get('_scroll_id')
        total_value = 0
        # total 可能为Dict或int
        if isinstance(hits.get('total'),Dict):
            total_value= hits.get('total').get('value')
        else:
            total_value = hits.get('total')

        if total_value>0:
            for data in hits.get('hits'):
                data_list.append(data.get('_source'))
        return scroll_id,data_list

    # 通过scroll_id分页获取后序文档
    def get_doc_by_scroll_id(self,scroll_id):
        page = self.es.scroll(scroll_id=scroll_id,scroll="5m")
        data_list = []
        scroll_id = page.get('_scroll_id')
        for data in page.get('hits').get('hits'):
            data_list.append(data)
        return scroll_id,data_list

    # 清空scroll_id,防止服务端不够用
    def clear_scroll(self,scroll_id):
        self.es.clear_scroll(scroll_id)

    # 获取索引的hits内容(一般用于获取文档id、总数)
    def get_doc_all(self):
        res = self.es.search(index=self.index)
        return res['hits']

    # 获取一个文档
    def get_doc_by_id(self, id_):
        res = self.es.get(index=self.index, id=id_)
        return res["_source"]

    # 获取所有文档的_source内容(小于100000)
    def get_doc(self,query:Dict,size:int=100000):
        query['size'] = size
        res = self.es.search(index=self.index,body=query)
        data_list = []
        hits = res.get("hits")
        total_value = 0
        # total 可能为Dict或int
        if isinstance(hits.get('total'), Dict):
            total_value = hits.get('total').get('value')
        else:
            total_value = hits.get('total')

        if total_value > 0:
            for data in hits.get('hits'):
                data_list.append(data.get('_source'))
        return data_list

    # 聚合查询(分组条件名为group_by,返回buckets)
    def get_doc_agg(self, query):
        res = self.es.search(index=self.index, body=query)
        return res['aggregations']['group_by'].get('buckets')

    # 统计查询(统计条件为stats_by,返回最值、平均值等)
    def get_doc_stats(self,query):
        res = self.es.search(index=self.index,body=query)
        return res['aggregations']["stats_by"]

测试代码

import unittest
from es import ESClient

cli = ESClient(host="http://10.28.144.3:9200",http_auth=["elastic","changeme"])
def test_create_index():
    res = cli.create_index(index_name="test")
    print(res)

def test_delete_index():
    res = cli.delete_index(index_name="test")
    print(res)

def test_get_index():
    res = cli.get_index(index_name="test")
    print(res)

def test_set_index():
    cli.set_index(index="test")

def test_create_doc():
    body = {
        "name": "lady_killer9",
        "age": 19
    }
    res = cli.create_doc(body=body)
    print(res)

def test_create_doc_bulk():
    from copy import deepcopy
    body = {
        "name": "lady_killer9"
    }
    users = []
    for i in range(100001):
        tmp = deepcopy(body)
        tmp["age"] = i
        users.append(tmp)
    res = cli.create_doc_bulk(docs=users)
    print(res)


def test_get_doc_all():
    res = cli.get_doc_all()
    print(res)


def test_get_doc_by_id():
    res = cli.get_doc_by_id("jHALXDQaENQZPM4C9EUt")
    print(res)

def test_get_doc():
    query = {
        "query": {
            "match_all": {

            }
        }
    }
    res = cli.get_doc(query=query,size=20)
    print(res)

def test_update_doc():
    body={
        "name": "lady_killer_after_update"
    }
    res = cli.update_doc(doc_id="jHALXDQaENQZPM4C9EUt",doc=body)
    print(res)


def test_delete_doc():
    res = cli.delete_doc(doc_id="jHALXDQaENQZPM4C9EUt")
    print(res)

def test_get_doc_agg():
    query = {
            "aggs": {
                "group_by": {
                    "terms": {
                        "field": "age"
                    }
                }
            }
    }
    res = cli.get_doc_agg(query=query)
    print(res)

def test_get_doc_stats():
    query = {
            "aggs": {
                "stats_by": {
                    "stats": {
                        "field": "age"
                    }
                }
            }
    }
    res = cli.get_doc_stats(query=query)
    print(res)

def test_get_doc_scroll():
    query = {
        "query": {
            "match_all": {}
        }
    }
    scroll_id,data_list = cli.get_doc_scroll(query=query)
    res = []
    while data_list:
        res.extend(data_list)
        scroll_id,data_list = cli.get_doc_by_scroll_id(scroll_id=scroll_id)
    print(len(res))


if __name__ == '__main__':
    # test_delete_index()
    test_create_index()
    test_get_index()
    # test_set_index()
    # test_create_doc()
    # test_create_doc_bulk()
    # test_get_doc_all()
    # test_update_doc()
    # test_get_doc_by_id()
    # test_get_doc()
    # test_delete_doc()
    # test_get_doc_agg()
    # test_get_doc_stats()
    # test_get_doc_scroll()
    cli.close()

测试截图
在这里插入图片描述
更多python相关内容:【python总结】python学习框架梳理

本人b站账号:一路狂飚的蜗牛

有问题请下方评论,转载请注明出处,并附有原文链接,谢谢!如有侵权,请及时联系。如果您感觉有所收获,自愿打赏,可选择支付宝18833895206(小于),您的支持是我不断更新的动力。

参考

github-elasticsearch
github-elasticsearch-dsl

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/52482.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

com.android.ide.common.signing.KeytoolException:

签名没问题但是提示Execution failed for task :app:packageDebug. > A failure occurred while executing com.android.build.gradle.tasks.PackageAndroidArtifact$IncrementalSplitterRunnable > com.android.ide.common.signing.KeytoolException: Failed to read ke…

github gitlab 多用户多平台切换

一、背景 我需要用账号1 来登录并管理github 账号 我需要用账号2 来登录并管理gitlab 账号 二、设置账号 邮箱 设置账号1用户名与邮箱 git config --global user.name "miaojiang" git config --global user.email "187133163.com" 三、生成本地密钥…

pycharm配置arcpy环境

目录 1、安装ArcGIS软件2、安装PyCharm3、创建PyCharm项目4、验证ArcPy环境 在GIS开发中, ArcPy是不可或缺的重要组件,而PyCharm作为一款功能强大的Python IDE,为我们提供了更便捷、高效的开发环境。在本文中,我们将详细介绍如何…

sublime配置less的一些坑(1)

仅在sublime的Install Package安装保存less报错 在sublime的Install Package安装less 打开sublime软件,按住CtrlShiftP组合键,弹出的界面中选择Install Package 选中后enter或者回车。等会弹出一个弹窗,大致意思是说你已经成功安装了package control。如果你在此之前已经安装了…

【力扣每日一题】2023.7.31 重排链表

目录 题目: 示例: 分析: 代码: 题目: 示例: 分析: 给我们一个链表,让我们按照题目要求原地修改重排链表。 那么具体怎么个重排法呢,题目给出了一串式子,其实就是把链表分为前后两段,然后在前半段的节…

企业工程管理系统源码之提高工程项目管理软件的效率

高效的工程项目管理软件不仅能够提高效率还应可以帮你节省成本提升利润 在工程行业中,管理不畅以及不良的项目执行,往往会导致项目延期、成本上升、回款拖后,最终导致项目整体盈利下降。企企管理云业财一体化的项目管理系统,确保…

UML/SysML建模工具更新(2023.7)(1-5)有国产工具

DDD领域驱动设计批评文集 欢迎加入“软件方法建模师”群 《软件方法》各章合集 最近一段时间更新的工具有: 工具最新版本:Visual Paradigm 17.1 更新时间:2023年7月11日 工具简介 很用心的建模工具。支持编写用例规约。支持文本分析和C…

小目标检测(1)——大恒(DaHeng)相机操作与控制编程

文章目录 引言正文相关开发库的介绍编程准备配置引用头文件GalaxyIncludes.h配置lib文件 具体编程过程初始化和反初始化枚举设备开关设备 属性控制属性控制器种类 图像采集控制和图像处理采单帧回调采集图像处理流对象属性控制 获取设备事件获取掉线事件通知 样例程序分析补充&…

重生之我要学C++第五天

这篇文章主要内容是构造函数的初始化列表以及运算符重载在顺序表中的简单应用,运算符重载实现自定义类型的流插入流提取。希望对大家有所帮助,点赞收藏评论,支持一下吧! 目录 构造函数进阶理解 1.内置类型成员在参数列表中的定义 …

2023.07.29 驱动开发DAY6

通过epoll实现一个并发服务器 服务器 #include <stdio.h> #include <string.h> #include <unistd.h> #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <sys/epoll.h…

脑电信号处理与特征提取——6.运用机器学习技术和脑电进行大脑解码(涂毅恒)

目录 六、运用机器学习技术和脑电进行大脑解码 6.1 前言 6.2 基于脑电数据的机器学习基础分析 6.3 基于脑电数据的机器学习进阶分析 6.4 代码解读 六、运用机器学习技术和脑电进行大脑解码 6.1 前言 6.2 基于脑电数据的机器学习基础分析 6.3 基于脑电数据的机器学习进阶分…

内网隧道代理技术(十五)之 Earthworm的使用(二级代理)

Earthworm的使用(二级代理) 本文紧接着上一篇文章继续讲解Earthworm工具的使用 (二级代理)正向连接 二级正向代理发生在如下的情况: 1、Web服务器在公网,黑客可以直接访问 2、B机器在内网,黑客不能直接访问 3、Web服务器可以访问内网机器B 4、内网机器B可以访问公司…

flask创建数据库连接池

flask创建数据库连接池 在Python中&#xff0c;您可以使用 Flask-SQLAlchemy 这个扩展来创建一个数据库连接池。Flask-SQLAlchemy 是一个用于 Flask 框架的 SQLAlchemy 操作封装&#xff0c;实现了 ORM(Object Relational Mapper)。ORM 主要用于将类与数据库中的表建立映射关系…

Spring Boot实践三 --数据库

一&#xff0c;使用JdbcTemplate访问MySQL数据库 1&#xff0c;确认本地已正确安装mysql 按【winr】快捷键打开运行&#xff1b;输入services.msc&#xff0c;点击【确定】&#xff1b;在打开的服务列表中查找mysql服务&#xff0c;如果没有mysql服务&#xff0c;说明本机没有…

【数据结构】实验十:哈夫曼编码

实验十 哈夫曼编码 一、实验目的与要求 1&#xff09;掌握树、森林与二叉树的转换&#xff1b; 2&#xff09;掌握哈夫曼树和哈夫曼编码算法的实现&#xff1b; 二、 实验内容 1. 请编程实现如图所示的树转化为二叉树。 2. 编程实现一个哈夫曼编码系统&#xff0c;系统功能…

腾讯云标准型S6/SA3/SR1/S5/SA2服务器CPU处理器大全

腾讯云服务器CVM标准型CPU处理器大全&#xff0c;包括标准型S6、SA3、SR1、S5、S5se、SA2、S4、SN3ne、S3、SA1、S2ne实例CPU处理器型号大全&#xff0c;标准型S6云服务器CPU采用Intel Ice Lake(2.7GHz/3.3GHz)&#xff0c;标准型S5采用Intel Xeon Cascade Lake 8255C/Intel Xe…

Flink集群运行模式--Standalone运行模式

Flink集群运行模式--Standalone运行模式 一、实验目的二、实验内容三、实验原理四、实验环境五、实验步骤5.1 部署模式5.1.1 会话模式&#xff08;Session Mode&#xff09;5.1.2 单作业模式&#xff08;Per-Job Mode&#xff09;5.1.3 应用模式&#xff08;Application Mode&a…

SpringBoot 集成 EasyExcel 3.x 优雅实现 Excel 导入导出

介绍 EasyExcel 是一个基于 Java 的、快速、简洁、解决大文件内存溢出的 Excel 处理工具。它能让你在不用考虑性能、内存的等因素的情况下&#xff0c;快速完成 Excel 的读、写等功能。 EasyExcel文档地址&#xff1a; https://easyexcel.opensource.alibaba.com/ 快速开始 …

list与erase()

运行代码&#xff1a; //list与erase() #include"std_lib_facilities.h" //声明Item类 struct Item {string name;int iid;double value;Item():name(" "),iid(0),value(0.0){}Item(string ss,int ii,double vv):name(ss),iid(ii),value(vv){}friend istr…

opencv顺时针,逆时针旋转视频并保存视频

原视频 代码 import cv2# 打开视频文件 video cv2.VideoCapture(inference/video/lianzhang.mp4)# 获取原视频的宽度和高度 width int(video.get(cv2.CAP_PROP_FRAME_WIDTH)) height int(video.get(cv2.CAP_PROP_FRAME_HEIGHT))# 创建视频编写器并设置输出视频参数 fourcc …