ElasticSearch 批量插入漏数据

项目场景:

项目中需要把Mysql数据同步到ElasticSearch中


问题描述

数据传输过程中数据不时出现丢失的情况,偶尔会丢失一部分数据,本地测试也无法复现,后台程序也没有报错,一到正式环境就有问题,很崩溃

这里是批量操作的代码

private void bulk(List<IndexRequest> indexRequests) throws Exception {
        try {
            // 在这里可以对你获取到的批量结果数据进行需要的业务处理
            BulkProcessor bulkProcessor = BulkProcessor.builder(
                            (req, bulkListener) -> restHighLevelClient.bulkAsync(req, RequestOptions.DEFAULT, bulkListener),
                            new BulkProcessor.Listener() {
                                private int totalCount = 0;

                                @Override
                                public void beforeBulk(long executionId, BulkRequest request) {
                                }

                                @Override
                                public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
                                    // 统计条数并输出信息
                                    int count = response.getItems().length;
                                    totalCount += count;
                                    log.info("批量操作 [{}] 成功执行了{}条请求,共处理了{}条数据", executionId, count, totalCount);
                                }

                                @Override
                                public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
                                    log.error("数据处理失败,执行id为{},错误信息为:{}", executionId, failure);
                                }
                            }
                    )
                    .setConcurrentRequests(esproperties.getThreadSize())/*并发请求的数量。默认为1。*/
                    .setFlushInterval(TimeValue.timeValueSeconds(30)) // 固定30s必须刷新一次
                    .setBulkSize(new ByteSizeValue(10L, ByteSizeUnit.MB)) // 5MB batch size
                    .setBulkActions(esproperties.getBulkActions()) // 每次执行最多处理5000个请求
                    .setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
                    .build();
            indexRequests.forEach(bulkProcessor::add);
            bulkProcessor.close();
        } catch (Exception e) {
            e.printStackTrace();
            throw new Exception(e);
        }
    }

原因分析:

当时想到的问题是这里是不是数据格式有问题,因为采用的是异步,就是错误了也不会影响到其它数据的插入

接着就定位到了这段代码,想想是不是哪里没有处理错误的数据信息,所以没有打印出来,果然发现了BulkResponse 这个类,是可以处理每个错误信息的,接着就优化了代码如下

其实只需要修改afterBulk 方法,遍历出现的异常就能够打印出导入不进去的错误信息

 @Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
 // 统计条数并输出信息
// int count = response.getItems().length;
// totalCount += count;
//  log.info("批量操作 [{}] 成功执行了{}条请求,共处理了{}条数据", executionId, count, totalCount);
 if (response.hasFailures()){
     for (BulkItemResponse itemResponse : response) {
         if (itemResponse.isFailed()) {
             log.info("数据写入失败:错误信息为:{}",itemResponse.getFailureMessage());
         }
     }
// log.info("数据写入失败:{}",response.buildFailureMessage());
 }
}

解决方案:

接着修改代码后把新的包放上去,执行,终于找到了错误信息

下面是错误信息的截图
在这里插入图片描述

报错 Limit of total fields 1000 这里就能看出来,是字段数量大于1000了,因为我的是宽表,而之前创建的索引字段数量都是小于1000的,新的索引结构数量大于1000,找到问题就好办了

在kibana执行下面脚本修改字段限制,根据实际情况来,没有kibana就写出curl 请求
在这里插入图片描述

PUT 你的索引名/_settings
{
  "index": {
    "mapping.total_fields.limit": 2000
  }
}

总结

  1. 没有测试好宽表字段比较多的情况
  2. 写代码的时候以为很简单不会出现问题,所以日志也比较随便。
  3. 日常开发要打印好日志,它能够在出现错误的情况下,很快的帮我们定位出问题所在。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/114260.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Collction的List方法,list特有方法,遍历方式,迭代器选择

[to] list特有方法 //插入指定元素//list.add(1,"ddd");//System.out.println(list);//[aaa, ddd, bbb, ccc]//这个表示在一索引的位置插入ddd//他会把原来一索引位置的元素往后移动一位在添加//删除指定元素//String remove list.remove(1);//System.out.println(…

云原生安全日志审计

记得添加&#xff0c;把配置文件挂载进去 - mountPath: /etc/kubernetes/auditname: audit-policyreadOnly: true.....- hostPath:path: /etc/kubernetes/audit/type: DirectoryOrCreatename: audit-policy/etc/kubernetes/manifests/kube-apiserver.yaml 具体配置文件如下 a…

查询平均提速 700%,奇安信基于 Apache Doris 升级日志安全分析系统

本文导读&#xff1a; 数智时代的到来使网络安全成为了不可忽视的重要领域。奇安信作为一家领先的网络安全解决方案领军者&#xff0c;致力于为企业提供先进全面的网络安全保护&#xff0c;其日志分析系统在网络安全中发挥着关键作用&#xff0c;通过对运行日志数据的深入分析…

Git复制代码

目录 一、常用下载代码 1.登录Git克隆SSH​编辑 2.新建文件然后右键点击Git Bash Here 3.git clone Paste 二. 本地下载 1.从本地进入页面 2.生成代码——>导入——>生成代码后下载 3.解压道相应位置 一、常用下载代码 1.登录Git克隆SSH 2.新建文件然后右键点击…

【JavaEE】cookie和session

cookie和session cookie什么是 cookieServlet 中使用 cookie相应的API Servlet 中使用 session 相应的 API代码示例: 实现用户登陆Cookie 和 Session 的区别总结 cookie 什么是 cookie cookie的数据从哪里来? 服务器返回给浏览器的 cookie的数据长什么样? cookie 中是键值对…

一文看懂MySQL 5.7和MySQL 8到底有哪些差异?

目录 ​编辑 引言 1、数据字典和系统表的变化 2、JSON支持的改进 3、新的数据类型 4、安全性增强 5、性能改进 6、InnoDB存储引擎的改进 结论 引言 MySQL作为最常用的开源关系型数据库管理系统之一&#xff0c;一直在不断发展和改进。随着时间的推移&#xff0c;MySQ…

HTTP/HTTPS、SSL/TLS、WS/WSS 都是什么?

有同学问我&#xff0c;HTTP/HTTPS、SSL/TLS、WS/WSS 这都是些什么&#xff1f;那我们就先从概念说起&#xff1a; HTTP 是超文本传输协议&#xff0c;信息是通过明文传输。HTTPS 是在 HTTP 的基础上信息通过加密后再传输。SSL 是实现 HTTPS 信息传输加密的算法。TLS 是 SSL 的…

电子敲木鱼小程序源码系统 功德自动+1 带完整的搭建教程

大家好啊&#xff0c;今天来给大家推荐一款电子敲木鱼小程序源码系统 。这款小程序在当下可是超级火爆的。这是一个简单的小程序&#xff0c;有且只有一个敲木鱼的功能。可以自动家功德&#xff0c;帮助用户减轻压力。 系统特色功能一览&#xff1a; 1.动作模拟&#xff1a;通过…

UI自动化概念+Web自动化测试框架

1.UI自动化测试概念:我们先明确什么是UI UI&#xff0c;即(User Interface简称UI用户界面)是系统和用户之间进行交互和信息交换的媒介 UI自动化测试: Web自动化测试和移动自动化测试都属于UI自动化测试&#xff0c;UI自动化测试就是借助自动化工具对程序UI层进行自动化的测试 …

N-132基于springboot,vue人事管理系统

开发工具&#xff1a;IDEA 服务器&#xff1a;Tomcat9.0&#xff0c; jdk1.8 项目构建&#xff1a;maven 数据库&#xff1a;mysql5.7 系统分前后台&#xff0c;项目采用前后端分离 前端技术&#xff1a;vueelementUI 服务端技术&#xff1a;springbootmybatis-plus 本项…

Content-Type 值有哪些?

1、application/x-www-form-urlencoded 最常见 POST 提交数据的方式。 浏览器的原生 form 表单&#xff0c;如果不设置 enctype 属性&#xff0c;那么最终就会以 application/x-www-form-urlencoded 方式提交数据。 <form action"http://www.haha/ads/sds?name小草莓…

公司内网知识问答库系统源码 完全开源可二次开发 带完整搭建教程

随着公司规模的扩大和业务复杂性的增加&#xff0c;员工需要更快更有效地获取和共享知识。一个内部的知识问答库系统可以帮助公司提高员工的工作效率和知识管理水平。 有效的内部沟通是公司成功的关键因素之一。通过创建一个内部的知识问答平台&#xff0c;可以鼓励员工之间的…

【进程控制⑦】:制作简易shell理解shell运行原理

【进程控制⑦】&#xff1a;制作简易shell&&理解shell运行原理 一.交互问题&#xff0c;获取命令行二.字串分割问题&#xff0c;解析命令行三.指令的判断四.普通命令的执行五.shell原理本质 一.交互问题&#xff0c;获取命令行 shell刚启动时就会出现一行命令行&#x…

01_stable_diffusion_introduction_CN

stable_diffusion 配置 !pip install -Uq diffusers ftfy accelerate# Installing transformers from source for now since we need the latest version for Depth2Img: !pip install -Uq githttps://github.com/huggingface/transformers import torch import requests fro…

Logstash学习

1、什么是logstash logstash是一个数据抽取工具&#xff0c;将数据从一个地方转移到另一个地方。如hadoop生态圈的sqoop等。下载地址: https://www.elastic.co/cn/downloads/logstash logstash之所以功能强大和流行&#xff0c;还与其丰富的过滤器插件是分不开的&#xff0c;过…

搭建个人hMailServer邮件服务实现远程发送邮件

文章目录 前言1. 安装hMailServer2. 设置hMailServer3. 客户端安装添加账号4. 测试发送邮件5. 安装cpolar6. 创建公网地址7. 测试远程发送邮件8. 固定连接公网地址9. 测试固定远程地址发送邮件 前言 hMailServer 是一个邮件服务器,通过它我们可以搭建自己的邮件服务,通过cpola…

python连接clickhouse (CK)

Author: tkhywang 2810248865qq.com Date: 2023-11-01 11:28:58 LastEditors: tkhywang 2810248865qq.com LastEditTime: 2023-11-01 11:36:25 FilePath: \PythonProject02\Python读取clickhouse2 数据库数据.py Description: 这是默认设置,请设置customMade, 打开koroFileHead…

tcp/ip该来的还是得来

1. TCP/IP、Http、Socket的区别 \qquad 区别是&#xff1a;TCP/IP即传输控制/网络协议&#xff0c;也叫作网络通讯协议&#xff0c;它是在网络的使用中的最基本的通信协议。Http是一个简单的请求-响应协议&#xff0c;它通常运行在TCP之上。Socket是对网络中不同主机上的应用进…

react-hook-form。 useFieldArray Controller 必填,报错自动获取较多疑问记录

背景 动态多个数据Controller包裹时候&#xff0c;原生html标签input可以add时候自动获取焦点&#xff0c;聚焦到最近不符合要求的元素上面 matiral的TextField同样可以可是x-date-pickers/DatePicker不可以❌ 是什么原因呢&#xff0c;内部提供foucs&#xff1f;&#xff1f;属…

这才是当今生成式人工智能的根本性问题!

原创 | 文 BFT机器人 01 引言 近年来&#xff0c;生成式人工智能产品层出不穷&#xff0c;ChatGPT火爆出圈后&#xff0c;百度、谷歌等科技大佬争相研究生成式人工智能产品&#xff0c;将该技术的普及程度提升到了一个新的水平。然而&#xff0c;生成式人工智能的运营需要高昂…