集成ES分组查询统计求平均值

前言

       之前其实写过ES查询数据,进行分组聚合统计:
复杂聚合分组统计实现


一、目标场景

  1. 机房机柜的物联网设备上传环境数据,会存储到ES
  2. 存到ES的温湿度数据需要查询,进行分组后,再聚合统计求平均值

二、使用步骤

1.引入库

       我这里因为ES服务已经升级到8.0.0了,然后ES数据查询分组,我这里需要对时间进行格式化,再聚合avg,所以客户端相关版本用的7.17.4

<dependency>
	<groupId>org.elasticsearch.client</groupId>
	<artifactId>elasticsearch-rest-client</artifactId>
	<version>7.17.4</version>
	<exclusions>
		<exclusion>
			<groupId>org.elasticsearch</groupId>
			<artifactId>elasticsearch</artifactId>
		</exclusion>
	</exclusions>
</dependency>
<dependency>
	<groupId>org.elasticsearch.client</groupId>
	<artifactId>elasticsearch-rest-high-level-client</artifactId>
	<version>7.17.4</version>
	<exclusions>
		<exclusion>
			<groupId>org.elasticsearch</groupId>
			<artifactId>elasticsearch</artifactId>
		</exclusion>
	</exclusions>
</dependency>

<dependency>
	<groupId>org.elasticsearch</groupId>
	<artifactId>elasticsearch</artifactId>
	<version>7.17.4</version>
</dependency>

2.配置类

       目前我们就是单服务的,这个配置类够用了。其实我配置类就是要把RestHighLevelClient注入,并交给spring管理。

/**
 * ES配置类
 * @author zwmac
 */
@Configuration
@Data
public class ElasticSearchConfig {

    @Value("${es.host}")
    private String host;
    @Value("${es.port}")
    private int port;
    @Value("${es.username}")
    private String loginName;
    @Value("${es.password}")
    private String password;
    private RestHighLevelClient client;

    @Bean
    public RestHighLevelClient client() {
        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        credentialsProvider.setCredentials(AuthScope.ANY,
                new UsernamePasswordCredentials(loginName, password));
        HttpHost[] httpHostArray = new HttpHost[1];
        httpHostArray[0] = new HttpHost(host, port);
        RestClientBuilder restClientBuilder = RestClient.builder(httpHostArray)
                .setHttpClientConfigCallback(httpClientBuilder -> {
                    httpClientBuilder.disableAuthCaching();
                    return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
                });
        restClientBuilder.setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder
                .setConnectTimeout(60000)
                .setSocketTimeout(150000));
        client = new RestHighLevelClient(
                restClientBuilder
        );
        return client;
    }
}

3.使用


    @Resource
    private RestHighLevelClient restHighLevelClient;

/**
     * 查询温湿度24小时平均值
     * @param deviceCode 设备编码
     * @param startTime 开始时间
     * @param endTime 结束时间
     * @param humName 湿度字段名
     * @param tempName 温度字段名
     * @return 温湿度24小时平均值
     */
    private TreeMap<String, Map<String, Double>> queryTempHumDayAvg(String deviceCode, Date startTime, Date endTime, String humName, String tempName) {
        TreeMap<String, Map<String, Double>> treeMap = new TreeMap<>();
        //ES查询
        String index = EsCalendar.getDeviceFlowIndex(startTime, endTime);
        SearchRequest searchRequest = new SearchRequest(index);
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        //忽略不可用索引,允许索引不不存在,通配符表达式将扩展为打开的索引
        searchRequest.indicesOptions(IndicesOptions.fromOptions(true, true, true, false));

        String timeFmt = "yyyy-MM-dd";

        // 组装ES请求数据
        String startTimeStr = DateUtil.format(startTime, DatePattern.NORM_DATETIME_PATTERN);
        String endTimeStr = DateUtil.format(endTime, DatePattern.NORM_DATETIME_PATTERN);
        QueryBuilder rangeQuery = QueryBuilders.rangeQuery("createTime").lte(endTimeStr).gte(startTimeStr);

        BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
        // 必须为deviceCode
        boolQueryBuilder.must(QueryBuilders.termQuery("deviceCode", deviceCode));
        rangeQuery = QueryBuilders.boolQuery().must(rangeQuery).must(boolQueryBuilder);
        QueryBuilder boolQuery = QueryBuilders.boolQuery().must(rangeQuery);

        searchSourceBuilder.query(boolQuery).size(0);


        //平均值 温度
        //String tempName = "temp_avg";
        String tempAvgName = tempName + "_avg";
        String tempFactorName = "data." + tempName;
        AvgAggregationBuilder tempAvgAggregationBuilder = AggregationBuilders.avg(tempAvgName).field(tempFactorName);

        //平均值 湿度
        //String humName = "hygrometer_avg";
        String humAvgName = humName + "_avg";
        String humFactorName = "data." + humName;
        AvgAggregationBuilder humAvgAggregationBuilder = AggregationBuilders.avg(humAvgName).field(humFactorName);


        String createTimeGroup = "createTimeGroup";
        DateHistogramAggregationBuilder aggregation = AggregationBuilders.dateHistogram(createTimeGroup)
                .field("createTime").fixedInterval(DateHistogramInterval.DAY)
                .format(timeFmt)
                //过滤掉count为0的数据
                .minDocCount(1).subAggregation(tempAvgAggregationBuilder).subAggregation(humAvgAggregationBuilder);

        //分组条件
        searchSourceBuilder.aggregation(aggregation);
        searchRequest.source(searchSourceBuilder);


        // 按照因子列表查询
        searchRequest.source(searchSourceBuilder);
        SearchResponse searchResponse = null;
        Map<String, Map<String, Double>> mp = new HashMap<>();
        try {
            log.info("方法getCabinetTempHum24HourAvg查询ES请求数据:" + searchRequest);
            searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
            log.info("方法getCabinetTempHum24HourAvg查询ES响应数据:" + searchResponse.toString());
            Aggregations aggregations = searchResponse.getAggregations();
            if (aggregations != null) {
                //组织出参数
                aggregations.forEach(agg -> {
                    ParsedDateHistogram parsedDateHistogram = (ParsedDateHistogram) agg;
                    List buckets = parsedDateHistogram.getBuckets();
                    if (CollectionUtil.isNotEmpty(buckets)) {
                        buckets.forEach(bucket -> {
                            ParsedDateHistogram.ParsedBucket timeGroupTerm = (ParsedDateHistogram.ParsedBucket) bucket;
                            String timeStr = timeGroupTerm.getKeyAsString();

                            Aggregations subAggregations = timeGroupTerm.getAggregations();
                            if (subAggregations != null) {
                                Map<String, Double> tempHumMap = new HashMap<>();
                                Map<String, Aggregation> subAggMap = subAggregations.asMap();
                                if (subAggMap != null) {
                                    Aggregation tempAgg = subAggMap.get(tempAvgName);
                                    if (tempAgg != null) {
                                        ParsedAvg tempAggPdh = (ParsedAvg) tempAgg;
                                        tempHumMap.put(tempName, tempAggPdh.getValue());
                                    }
                                    Aggregation humAgg = subAggMap.get(humAvgName);
                                    if (humAgg != null) {
                                        ParsedAvg humAggPdh = (ParsedAvg) humAgg;
                                        tempHumMap.put(humName, humAggPdh.getValue());
                                    }

                                }
                                mp.put(timeStr, tempHumMap);
                            }

                        });
                    }

                });

            }
            //数据补全
            List<DateTime> dateTimeList = DateUtil.rangeToList(startTime, DateUtil.offsetHour(endTime, -1), DateField.HOUR_OF_DAY);
            if (CollectionUtil.isNotEmpty(dateTimeList)) {
                String finTempName = "temp_avg";
                String finHumName = "hum_avg";
                dateTimeList.forEach(dateTime -> {
                    String timeStr = DateUtil.format(dateTime, timeFmt);
                    Map<String, Double> finTempHumMap = new HashMap<>();
                    Map<String, Double> tempHumMap = mp.get(timeStr);
                    if (tempHumMap == null) {
                        finTempHumMap.put(finTempName, 0.0);
                        finTempHumMap.put(finHumName, 0.0);
                    } else {
                        Double tempAvg = tempHumMap.get(tempName);
                        Double humAvg = tempHumMap.get(humName);
                        finTempHumMap.put(finTempName, tempAvg);
                        finTempHumMap.put(finHumName, humAvg);
                    }
                    treeMap.put(timeStr, finTempHumMap);
                });
            }

        } catch (Exception e) {
            log.error("方法countByEs查询ES异常", e);
        }

        return treeMap;
    }

关键点注意:

  1. QueryBuilders.rangeQuery传入的时间精度,需要yyyy-MM-dd HH:mm:ss,否则会报错在这里插入图片描述

  2. 这里对时间格式化分组,使用的是DateHistogramAggregationBuilder
    这个在EsApi7+就废弃了calendarInterval,替换新的fixedInterval

  3. 分组再聚合,注意嵌套关系,各位自己理解下subAggregation

  4. 最后数据查询出来后,迭代解析,注意理解ParsedDateHistogram取值、parsedDateHistogram.getBuckets()、迭代解析

总结

  • gs一直用老版本的ES6,这次终于被逼的更新了吧,真好。(之前一直建议、希望,都。。。。)
  • 本来很想引入EasyEs用用,但是总有同事不认可,算了
  • 之前也建议给ES装上sql-package插件,让DBeaver可以连接,试过一阵子,新版本又没装,算了
  • 其他就没啥好说的了,唯一就是restHighLevelClient现在在7+也被标记为过时了,下次有机会,这个再改改。
  • 希望能帮到大家,uping!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/491068.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【Linux系统】进程概念创建进程进程标示符

什么是进程&#xff1f; 操作系统中&#xff0c; 进程可以同时存在非常多的。根据我们之前谈的操作系统具有“管理”的特性&#xff0c; 那么就有&#xff0c;既然要管理&#xff0c;就要 --- 先描述&#xff0c;在组织&#xff01;&#xff01;&#xff01; 由冯诺依曼体系结…

AIGC,ChatGPT,Prompt 万能提示词

AIGC ChatGPT 职场案例 AI 绘画 与 短视频制作 PowerBI 商业智能 68集 Mysql 8.0 54集 Oracle 21C 142集 Office 2021实战应用 Python 数据分析实战&#xff0c; ETL Informatica 数据仓库案例实战 51集 Excel 2021实操 100集&#xff0c; Excel 2021函数大全 80集 Excel 2021…

进入消息传递的魔法之门:ActiveMQ原理与使用详解

嗨&#xff0c;亲爱的童鞋们&#xff01;欢迎来到这个充满魔法的世界&#xff0c;今天我们将一同揭开消息中间件ActiveMQ的神秘面纱。如果你是一个对编程稍有兴趣&#xff0c;但又对消息中间件一知半解的小白&#xff0c;不要害怕&#xff0c;我将用最简单、最友好的语言为你呈…

Linux——命名管道

Linux——命名管道 命名管道命名管道和匿名管道的区别 创建命名管道利用命名管道实现简单通信 我们之前学习了匿名管道&#xff0c;这种管道有一个缺点就是只有两个有血缘关系的进程才能够使用匿名管道&#xff0c;这个非常不方便。所以我们又在匿名管道的基础之上引入了命名管…

Flask python :logging日志功能使用

logging日志的使用 一、了解flask日志1.1、Loggers记录器1.2、Handlers 处理器1.3、Formatters 格式化器 二、使用日志2.1、官网上的一个简单的示例2.2、基本配置2.3、具体使用示例2.4、运行 三、写在最后 一、了解flask日志 日志是一种非常重要的工具&#xff0c;可以帮助开发…

系列学习前端之第 7 章:一文掌握 AJAX

1、AJAX 简介 AJAX 全称为 Asynchronous JavaScript And XML&#xff08;中文名&#xff1a;阿贾克斯&#xff09;&#xff0c;就是异步的 JS 和 XML。AJAX 不是新的编程语言&#xff0c;而是一种将现有的标准组合在一起使用的新方式。AJAX 可以在浏览器中向服务器发送异步请求…

flutter 弹窗之系列一

自定义不受Navigator影响的弹窗 class MyHomePage extends StatefulWidget {const MyHomePage({super.key, required this.title});final String title;overrideState<MyHomePage> createState() > _MyHomePageState(); }class _MyHomePageState extends State<MyH…

rabbitmq-c 程序实现客户端服务端

安装mq https://blog.csdn.net/zl_momomo/article/details/82986368 需要安裝rabbitmq-server 开启rabbitmq服务 systemctl start rabbitmq-server systemctl enable rabbitmq-server. 客户端 amqp_sendstring.c include <stdint.h> #include <stdio.h> #incl…

访问二维数组本质

先从一维数组讲起 int main() {int arr[5] { 1,2,3,4,5 };for (int i 0; i < 5; i) {printf("%d",arr[i]); //对数组进行访问}return 0; } 其实 arr [ i ] * (arr i) 这两个是完全相等的&#xff0c;在c语言指针&#xff08;1&#xff09;8.数组名与 …

STM32F103 CubeMX 使用USB生成键盘设备

STM32F103 CubeMX 使用USB生成键盘设备 基础信息HID8个数组各自的功能 生成代码代码编写添加申明信息main 函数编写HID 修改1. 修改报文描述符2 修改 "usbd_hid.h" 中的申明文件 基础信息 软件版本&#xff1a; stm32cubmx&#xff1a;6.2 keil 5 硬件&#xff1a;…

Redis中的事件(三)

时间事件 事件的调度与执行 因为服务器中同时存在文件事件和时间事件两种事件类型&#xff0c;所以服务器必须对这两种事件进行调度&#xff0c;决定何时应该处理文件事件&#xff0c;何时有应该处理时间事件&#xff0c;以及花多少事件来处理它们等等。事件的调度和执行由ae…

uniApp中使用小程序XR-Frame创建3D场景(2)加载模型

上篇文章讲述了如何将XR-Frame作为子组件集成到uniApp中使用&#xff0c;只完成了简单的环境搭建&#xff0c;这篇文章讲解如何加载3D模型。 1 加入模型加载标签 在XR-Frame框架中&#xff0c;加载资源都是在wxml文件的标签中实现的。下面是wxml中完整的代码 index.wxml &l…

(二)Eureka服务搭建,服务注册,服务发现

1.Eureka注册中心 假如我们的服务提供者user-service部署了多个实例&#xff0c;如图&#xff1a; 存在几个问题&#xff1a; order-service在发起远程调用的时候&#xff0c;该如何得知user-service实例的ip地址和端口&#xff1f;有多个user-service实例地址&#xff0c;…

手机和键盘的数字键盘排序为什么是不同的?

不知道你有没有注意有一个问题。我们的手机输入法中的数字键盘&#xff0c;电脑上通用的数字键盘&#xff0c;计算器上的数字键盘等排序是不同的&#xff0c;从观察者角度看&#xff0c;0-9的数字排列有从上到下的排列&#xff0c;还有从下到上的排列。为什么会出现不同的排列方…

HWOD:句子逆序

一、题目 描述 将一个英文语句以单词为单位逆序排放。例如I am a boy逆序排放后为boy a am I。所有单词之间用一个空格隔开。语句中除了英文字母外&#xff0c;不再包含其他字符。 数据范围 输入的字符串长度满足 1<n<1000 输入 输入一个英文语句&#xff0c;每个…

【电力监控保护】AM5SE-IS防孤岛保护装置/35kV、10kV、380V分布式光伏并网供电/什么是孤岛效应/孤岛效应的危害

什么是孤岛效应&#xff01;&#xff01;&#xff01; 安科瑞薛瑶瑶18701709087 在电力系统中&#xff0c;孤岛效应指的是当电网突然断电时&#xff0c;并网光伏发电系统仍然保持对电网中部分线路的供电状态。这种情况下&#xff0c;这些线路与其他电网断开&#xff0c;形成了…

HarmonyOS页面布局方式

Column&Row组件的使用 1 概述 一个丰富的页面需要很多组件组成&#xff0c;那么&#xff0c;我们如何才能让这些组件有条不紊地在页面上布局呢&#xff1f;这就需要借助容器组件来实现。 容器组件是一种比较特殊的组件&#xff0c;它可以包含其他的组件&#xff0c;而且…

亚马逊美国站CPC认证婴儿门栏和围栏安全标准cpsc办理

美国ASTM F1004-19认证属于婴幼儿门栏和围栏安全标准 ASTM F1004-19婴幼儿门栏和围栏安全标准 2020年7月6日&#xff0c;美国消费品安全委员会发布了最终法规16 CFR 1239&#xff0c;为婴幼儿门栏和围栏建立了安全标准。该法规合并及修订了最新版本的ASTM F1004-19《婴幼儿扩展…

【分布式】——降级熔断限流

降级&熔断&限流 ⭐⭐⭐⭐⭐⭐ Github主页&#x1f449;https://github.com/A-BigTree 笔记仓库&#x1f449;https://github.com/A-BigTree/tree-learning-notes 个人主页&#x1f449;https://www.abigtree.top ⭐⭐⭐⭐⭐⭐ 如果可以&#xff0c;麻烦各位看官顺手点…

API网关-Apisix路由配置教程(数据编辑器方式)

文章目录 前言一、端口修改1. apisix 端口修改2. dashboard 端口修改3. 登录密码修改 二、常用插件介绍1. 常用转换插件1.1 proxy-rewrite插件1.1.1 属性字段1.1.2 配置示例 2. 常用认证插件2.1 key-auth插件2.1.1 消费者端字段2.1.2 路由端字段2.1.3 配置示例 2.2 basic-auth插…