xxljob分片广播+多线程实现高效定时同步elasticsearch索引库

需求:为了利用elasticsearch实现高效搜索,需要将mysql中的数据查出来,再定时同步到es里,同时在同步过程中通过分片广播+多线程提高同步数据的效率。

1. 添加映射

  • 使用kibana添加映射
PUT  /app_info_article
{
    "mappings":{
        "properties":{
            "id":{
                "type":"long"
            },
            "publishTime":{
                "type":"date"
            },
            "layout":{
                "type":"integer"
            },
            "images":{
                "type":"keyword",
                "index": false
            },
            "staticUrl":{
                "type":"keyword",
                "index": false
            },
            "authorId": {
                "type": "long"
            },
            "authorName": {
                "type": "keyword"
            },
            "title":{
                "type":"text",
                "analyzer":"ik_max_word",
                "copy_to": "all"
            },
            "content":{
                "type":"text",
                "analyzer":"ik_max_word",
                "copy_to": "all"
            },
            "all":{
              "type": "text",
              "analyzer": "ik_max_word"
            }
        }
    }
}
  • 使用http请求
    PUT请求添加映射:http://192.168.200.130:9200/app_info_article
    GET请求查询映射:http://192.168.200.130:9200/app_info_article
    DELETE请求,删除索引及映射:http://192.168.200.130:9200/app_info_article
    GET请求,查询所有文档:http://192.168.200.130:9200/app_info_article/_search
    在这里插入图片描述

2. springboot测试

引入依赖

<!--elasticsearch-->
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-high-level-client</artifactId>
            <version>7.12.1</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-client</artifactId>
            <version>7.12.1</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>7.12.1</version>
			<exclusions>
                <exclusion>
                    <groupId>com.fasterxml.jackson.dataformat</groupId>
                    <artifactId>jackson-dataformat-smile</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>com.fasterxml.jackson.dataformat</groupId>
                    <artifactId>jackson-dataformat-yaml</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

elasticsearch配置

#自定义elasticsearch连接配置
elasticsearch:
  host: 192.168.200.131
  port: 9200

elasticsearch配置类

@Getter
@Setter
@Configuration
@ConfigurationProperties(prefix = "elasticsearch")
public class ElasticSearchConfig {
    private String host;
    private int port;

    @Bean
    public RestHighLevelClient client(){
        return new RestHighLevelClient(RestClient.builder(
                new HttpHost(
                        host,
                        port,
                        "http"
                )
        ));
    }
}

实体类

@Data
public class SearchArticleVo {

    // 文章id
    private Long id;
    // 文章标题
    private String title;
    // 文章发布时间
    private Date publishTime;
    // 文章布局
    private Integer layout;
    // 封面
    private String images;
    // 作者id
    private Long authorId;
    // 作者名词
    private String authorName;
    //静态url
    private String staticUrl;
    //文章内容
    private String content;

}

测试。测试成功后别忘了删除app_info_article的所有文档

@SpringBootTest
@RunWith(SpringRunner.class)
public class ApArticleTest {

    @Autowired
    private ApArticleMapper apArticleMapper;

    @Autowired
    private RestHighLevelClient restHighLevelClient;

    /**
     * 注意:数据量的导入,如果数据量过大,需要分页导入
     *  1)查询数据库数据
     *  2)将数据写入到ES中即可
     *     创建BulkRequest
     *            ================================
     *            ||A:创建XxxRequest
     *            ||B:向XxxRequest封装DSL语句数据
     *            ||                             X C:使用RestHighLevelClient执行远程请求
     *            ================================
     *            将XxxRequest添加到BulkRequest
     *       使用RestHighLevelClient将BulkRequest添加到索引库
     * @throws Exception
     */
    @Test
    public void init() throws Exception {
        //1)查询数据库数据
        List<SearchArticleVo> searchArticleVos = apArticleMapper.loadArticleList();

        //2)创建BulkRequest - 刷新策略
        BulkRequest bulkRequest = new BulkRequest()
                //刷新策略-立即刷新
                .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
        for (SearchArticleVo searchArticleVo : searchArticleVos) {
            //A:创建XxxRequest
            IndexRequest indexRequest = new IndexRequest("app_info_article")
                    //B:向XxxRequest封装DSL语句数据
                    .id(searchArticleVo.getId().toString())
                    .source(JSON.toJSONString(searchArticleVo), XContentType.JSON);

            //3)将XxxRequest添加到BulkRequest
            bulkRequest.add(indexRequest);
        }

        //4)使用RestHighLevelClient将BulkRequest添加到索引库
        restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
    }

}

3. 核心代码

3.1 xxljob配置

xxl:
  job:
    accessToken: default_token
    admin:
      addresses: http://127.0.0.1:8080/xxl-job-admin
    executor:
      address: ''
      appname: hmtt
      ip: ''
      logpath: /data/applogs/xxl-job/jobhandler
      logretentiondays: 30
      port: 9998
@Configuration
public class XxlJobConfig {
    private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);

    @Value("${xxl.job.admin.addresses}")
    private String adminAddresses;

    @Value("${xxl.job.accessToken}")
    private String accessToken;

    @Value("${xxl.job.executor.appname}")
    private String appname;

    @Value("${xxl.job.executor.address}")
    private String address;

    @Value("${xxl.job.executor.ip}")
    private String ip;

    @Value("${xxl.job.executor.port}")
    private int port;

    @Value("${xxl.job.executor.logpath}")
    private String logPath;

    @Value("${xxl.job.executor.logretentiondays}")
    private int logRetentionDays;


    @Bean
    public XxlJobSpringExecutor xxlJobExecutor() {
        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
        xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
        xxlJobSpringExecutor.setAppname(appname);
        xxlJobSpringExecutor.setAddress(address);
        xxlJobSpringExecutor.setIp(ip);
        xxlJobSpringExecutor.setPort(port);
        xxlJobSpringExecutor.setAccessToken(accessToken);
        xxlJobSpringExecutor.setLogPath(logPath);
        xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);

        return xxlJobSpringExecutor;
    }
}

3.2 elasticsearch配置

和前面一样

3.3 任务编写

@Component
public class SyncIndexTask {

    @Autowired
    private IArticleClient articleClient;

    @Autowired
    private RestHighLevelClient restHighLevelClient;

    //线程池
    public static ExecutorService pool = Executors.newFixedThreadPool(10);

    /***
     * 同步索引任务
     *  1)当数量大于100条的时候,才做分片导入,否则只让第1个导入即可
     *      A:查询所有数据量 ->searchTotal total>100 [判断当前分片不是第1个分片]
     *      第N个分片执行数据处理范围-要计算   确定当前分片处理的数据范围  limit #{index},#{size}
     *                                                                [index-范围]
     *
     *      B:执行分页查询-需要根据index判断是否超过界限,如果没有超过界限,则并开启多线程,分页查询,将当前分页数据批量导入到ES
     *
     *      C:在xxl-job中配置作业-策略:分片策略
     *
     */
    @XxlJob("syncIndex")
    public void syncIndex()  {
        //1、获取任务传入的参数   {"minSize":100,"size":10}
        String jobParam = XxlJobHelper.getJobParam();
        Map<String,Integer> jobData = JSON.parseObject(jobParam,Map.class);
        int minSize = jobData.get("minSize"); //分片处理的最小总数据条数
        int size =  jobData.get("size"); //分页查询的每页条数   小分页

        //2、查询需要处理的总数据量  total=IArticleClient.searchTotal()
        Long total = articleClient.searchTotal();

        //3、判断当前分片是否属于第1片,不属于,则需要判断总数量是否大于指定的数据量[minSize],大于,则执行任务处理,小于或等于,则直接结束任务
        int cn = XxlJobHelper.getShardIndex(); //当前节点的下标
        if(total<=minSize && cn!=0){
            //结束
            return;
        }
        //4、执行任务   [index-范围]   大的分片分页处理
        //4.1:节点个数
        int n = XxlJobHelper.getShardTotal();
        //4.2:当前节点处理的数据量
        int count = (int) (total % n==0? total/n :  (total/n)+1);
        //4.3:确定当前节点处理的数据范围
        //从下标为index的数据开始处理  limit #{index},#{count}
        int indexStart = cn*count;
        int indexEnd = cn*count+count-1; //最大的范围的最后一个数据的下标
        //5.小的分页查询和批量处理
        int index =indexStart; //第1页的index

        System.out.println("分片个数是【"+n+"】,当前分片下标【"+cn+"】,处理的数据下标范围【"+indexStart+"-"+indexEnd+"】");
        do {
            //=============================================小分页================================
            //5.1:分页查询
            //5.2:将数据导入ES
            push(index,size,indexEnd);

            //5.3:是否要查询下一页 index+size
            index = index+size;
        }while (index<=indexEnd);
    }


    /**
     * 数据批量导入
     * @param index
     * @param size
     * @param indexEnd
     * @throws IOException
     */
    public void push(int index,int size,int indexEnd)  {

        pool.execute(()->{
            System.out.println("当前线程处理的分页数据是【index="+index+",size="+(index+size>indexEnd? indexEnd-index+1 : size)+"】");
            //1)查询数据库数据
            List<SearchArticleVo> searchArticleVos = articleClient.searchPage(index, index+size>indexEnd? indexEnd-index+1 : size);  //size可能越界
            //2)创建BulkRequest - 刷新策略
            BulkRequest bulkRequest = new BulkRequest()
                    //刷新策略-立即刷新
                    .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
            for (SearchArticleVo searchArticleVo : searchArticleVos) {
                //A:创建XxxRequest
                IndexRequest indexRequest = new IndexRequest("app_info_article")
                        //B:向XxxRequest封装DSL语句数据
                        .id(searchArticleVo.getId().toString())
                        .source(com.alibaba.fastjson.JSON.toJSONString(searchArticleVo), XContentType.JSON);

                //3)将XxxRequest添加到BulkRequest
                bulkRequest.add(indexRequest);
            }

            //4)使用RestHighLevelClient将BulkRequest添加到索引库
            if(searchArticleVos!=null && searchArticleVos.size()>0){
                try {
                    restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}

3.4 xxl-admin新增任务

  • 新增执行器 这里的appName要和XxlJobConfig 中的appname保持一致。也可以不用新增执行器,执行器就相当于一个分组的作用。
    在这里插入图片描述
  • 新增任务
    在这里插入图片描述

4. 模拟集群,运行项目

  • 运行3个SearchApplication项目,设置xxl.job.executor.port分别为9998,9997,9996
    在这里插入图片描述
  • 执行一次任务
    在这里插入图片描述
  • 查看结果,3个application都成功了
    在这里插入图片描述
  • kibana查看是否有数据,发现有18条,mysql的数据全部被导入了

在这里插入图片描述

5. 总结

  • xxljob分片广播:假如一共有1000条数据,有3个节点上运行着SearchApplication服务。那么每个节点需要同步的数据总条数为334,334,332条。
    在这里插入图片描述

  • 分页查询:节点0的任务总条数为334条,那么需要做分页(假设分页size为20)查询的次数为17次,每查1次后,将查到的数据通过restHighLevelClient发送到es中。

do {
    //5.1:分页查询
    //5.2:将数据导入ES
    push(index,size,indexEnd);  //分页查询+导入es的操作

    //5.3:是否要查询下一页 index+size
    index = index+size;
}while (index<=indexEnd);
  • 多线程:上述代码,push方法包括了分页查询+导入es的操作,是低效的。并且push方法不结束的话,下一页的操作不会开始。这里可以用多线程,每次push的时候都开启一个新线程,这样每一页的操作都是独立的,可以同时查询,同时导入到es,不会互相影响。这里使用了线程池。
public static ExecutorService pool = Executors.newFixedThreadPool(10);
......
public void push(int index,int size,int indexEnd)  {
		pool.execute(()->{
            //分页查询+导入到es
        });
     }
  • elasticsearch的相关api:参考我另一篇博客 项目中使用Elasticsearch的API相关介绍
//2)创建BulkRequest - 刷新策略
 BulkRequest bulkRequest = new BulkRequest()
         //刷新策略-立即刷新
         .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
 for (SearchArticleVo searchArticleVo : searchArticleVos) {
     //A:创建XxxRequest
     IndexRequest indexRequest = new IndexRequest("app_info_article")
             //B:向XxxRequest封装DSL语句数据
             .id(searchArticleVo.getId().toString())
             .source(com.alibaba.fastjson.JSON.toJSONString(searchArticleVo), XContentType.JSON);

     //3)将XxxRequest添加到BulkRequest
     bulkRequest.add(indexRequest);
 }

 //4)使用RestHighLevelClient将BulkRequest添加到索引库
 if(searchArticleVos!=null && searchArticleVos.size()>0){
     try {
         restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
     } catch (IOException e) {
         e.printStackTrace();
     }
 }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/628966.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

AutoNeRF:Training Implicit Scene Representations with Autonomous Agents

论文概述 《AutoNeRF》是由Pierre Marza等人撰写的一篇研究论文&#xff0c;旨在通过自主智能体收集数据来训练隐式场景表示&#xff08;如神经辐射场&#xff0c;NeRF&#xff09;。传统的NeRF训练通常需要人为的数据收集&#xff0c;而AutoNeRF则提出了一种使用自主智能体高效…

知识图谱必须要图数据库嘛?

在 ZH上又看到一个问题&#xff0c;觉得挺有意思&#xff0c;小聊一二。 “知识图谱必须要图数据库吗&#xff1f;” ——使用非关系型数据库&#xff0c;关系型数据库&#xff0c;在计算图的一些特征上&#xff0c;通过优化算法是否能达到使用图数据库接近的计算速度呢&#x…

【数据结构】栈的实现(链式栈)

文章目录 栈的实现&#xff08;链式栈&#xff09;栈的定义初始化栈进栈判断是否为空栈出栈销毁栈获取栈顶元素获取栈的长度栈的打印 完整代码&#xff08;包括测试代码&#xff09;Stack.hStack.ctest.c 栈的实现&#xff08;链式栈&#xff09; 首先新建一个工程&#xff1a…

SHAP - 解释机器学习

文章目录 一、关于 SHAP二、安装三、树集成示例&#xff08;XGBoost/LightGBM/CatBoost/scikit-learn/pyspark 模型&#xff09;四、自然语言示例&#xff08;transformers&#xff09;五、使用 DeepExplainer 的深度学习示例&#xff08;TensorFlow/Keras 模型&#xff09;六、…

如何利用R包进行主成分分析和可视化

一. 使用R包“FactoMineR”进行主成分分析&#xff08;PCA&#xff09; 基本步骤如下&#xff1a; 安装和加载包&#xff1a;如果尚未安装&#xff0c;首先安装“FactoMineR”包&#xff0c;然后加载它&#xff1a; install.packages("FactoMineR")library(FactoM…

2024年5月面试知识点梳理

2024年5月面试知识点梳理 资料来源Java基础泛型基本概念常见问题 字符串注解异常反射SPI机制Java集合CollectionMap 并发基础基础理论线程Java 中的锁乐观锁与悲观锁自旋锁与非自旋锁公平锁与非公平锁 并发关键字 - synchronized并发集合Lock核心类并发集合核心类原子类核心类线…

ARM机密计算组件

安全之安全(security)博客目录导读 目录 ​一、硬件架构 1、RME 二、软件和固件架构 1、RMM 2、其他固件标准&#xff08;例如PSCI&#xff09; 三、开源实现 1、TF-A 2、Veraison 3、工具链 四、动态TrustZone技术 Arm机密计算架构(Arm CCA)引入了一系列硬件和软件…

vue网页端控制台展示独有标记

效果展示 实现步骤 1. 新建js文件 定义一个类 用于提供控制台打印日志显示样式的方法 src\libs\util.log.js class Logger {// 定义静态方法static typeColor(type "default") {let color "";switch (type) {case "default":color "#3…

AIGC行业现在适合进入吗

一、引言 随着人工智能技术的飞速发展&#xff0c;AIGC&#xff08;人工智能生成内容&#xff09;行业正逐渐成为科技领域的新热点。AIGC通过利用人工智能技术&#xff0c;自动生成文本、图像、音频、视频等多种形式的内容&#xff0c;极大地提高了内容生产的效率和质量。然而…

案例实践 | 基于长安链的华电集团碳资产精细化管理体系

案例名称-碳资产精细化管理体系 ■ 建设单位 北京华电电子商务科技有限公司 ■ 用户群体 华电集团内各级公司及相关产业链单位 ■ 应用成效 解决数据精准可信问题与隐私保护问题&#xff0c;提升碳资产管理效能&#xff0c;入选国资委的国有重点企业管理标杆项目 案例…

高通QCS6490开发(六):连接使用摄像头

本文将会介绍如何在FV01开发板上连接摄像头和显示预览。 所用硬件有&#xff1a; 1. FV01开发板 2.Raspberry 摄像头 操作步骤如下&#xff1a; 通过FPC线和杜邦线将FV01板和摄像头连接起来&#xff0c;接线如下&#xff1a; 1、Camera设备连接&#xff0c;通过22pin转15pi…

echarts树图 改文本显示的地方的样式

树图改文本显示的时候的样式 虽然有点越改越丑 其中有一些失败的尝试 forammter 无法识别html元素 所以对于tooptips有用的html元素定义获取返回在这里写的话是不生效的 rich配置项里面的backgroundColor官方说支持 html元素和canvas元素 已经图片url 没有详细试验 官网地址 h…

实验室无法培养的菌,原来可以这么研究!

厌氧氨氧化&#xff08;anammox&#xff09;细菌在全球氮循环和废水氮去除中发挥着至关重要的作用&#xff0c;由于anammox细菌生长缓慢、难以培养等特点&#xff0c;对其生态学和生物学特性知之甚少。近日&#xff0c;凌恩生物合作客户重庆大学陈猷鹏教授团队在《Science of t…

腾讯面向大众!普通人玩微信视频号,就能月入过万!

哈喽~我是电商月月 说起创业&#xff0c;电商这个赛道真的很适合普通人去闯&#xff0c;我为什么这样说&#xff0c;其实有两个原因 项目省钱&#xff1a;做电商不需要货物&#xff0c;没接触过电商的朋友应该不了解&#xff0c;每个电商平台都存在大批量的“无货源”商家 就…

【QT学习】15.数据库

一。安装数据库 1.判断数据库是否安装成功 方法一&#xff1a;命令行检测 1.进入命令行&#xff08;不需要管理员模式&#xff09; 2.结果说是欢迎进入mysql&#xff0c;表示mysql成功安装 方法二&#xff1a;navicat连接mysql 二。qt上配置MySQL 1.配置qt之前 1.点击mysql.p…

人工智能领域向量化技术加速多模态大模型训练与应用

目录 前言1、TextIn文档解析技术1.1、文档解析技术1.2、目前存在的问题1.2.1、不规则的文档信息示例 1.3、合合信息的文档解析1.3.1、合合信息的TextIn文档解析技术架构1.3.2、版面分析关键技术 Layout-engine1.3.3、文档树提取关键技术 Catalog-engine1.3.4、双栏1.3.5、非对称…

MHD、MQA、GQA注意力机制详解

MHD、MQA、GQA注意力机制详解 注意力机制详解及代码前言&#xff1a;MHAMQAGQA 注意力机制详解及代码 前言&#xff1a; 自回归解码器推理是 Transformer 模型的 一个严重瓶颈&#xff0c;因为在每个解码步骤中加 载解码器权重以及所有注意键和值会产生 内存带宽开销 下图为三…

安防视频汇聚/智能分析云平台EasyCVR调用localfile接口会返回日志的问题该如何解决?

视频汇聚/安防视频融合云平台EasyCVR视频监控系统支持多协议接入、兼容多类型设备&#xff0c;平台能在复杂的网络环境中&#xff08;专网、局域网、广域网、VPN、公网等&#xff09;将前端海量的设备进行统一集中接入与视频汇聚管理。视频监控/集中存储系统EasyCVR平台可支持国…

中青杯全国大学生数学建模竞赛纳入多所高校学科竞赛认定目录

2024年第六届中青杯全国大学生数学建模竞赛将于2024年5月23日17:00至5月26日17:00举行,中青杯全国大学生数学建模竞赛是中国高校学科竞赛中规模较大、影响较广的学科竞赛之一,并且纳入多所高校学科竞赛认定目录。 报名截止时间:2024年5月23日12:00 报名网站:http://www.c…

Hadoop 3.4.0 项目实战

1环境基于 上一篇搭建 高可用分布式集群 2 官方提供MapReduce程序 #评估圆周率 cd /data/hadoop/share/hadoop/mapreduce/ hadoop jar hadoop-mapreduce-examples-3.4.0.jar pi 2 6 3 实例项目分析1 #预分析的文件如&#xff0c;如单词统计 # #上传文件到hdfs hdfs …