数据同步-Mysql同步到ElasticSearch

Mysql同步到ElasticSearch

  • 数据同步
    • 1、定时任务
    • 2、双写
    • 3、MQ异步写入
    • 4、Logstash
    • 5、Canal

数据同步

一般情况下,如果做查询搜索功能,使用 ES 来模糊搜索,但是数据是存放在数据库 MySQL 里的,所以说我们需要把 MySQL 中的数据和 ES 进行同步,保证数据一致(以 MySQL 为主)。

数据同步包含:全量同步 (首次) + 增量同步(新数据)。

首次安装完 ES,把 MySQL 数据全量同步到 ES 里,写一个单次脚本。

public class FullSyncPostToEs implements CommandLineRunner {

    @Resource
    private PostService postService;

    @Resource
    private PostEsDao postEsDao;

    @Override
    public void run(String... args) {
        List<Post> postList = postService.list();
        if (CollectionUtils.isEmpty(postList)) {
            return;
        }
        List<PostEsDTO> postEsDTOList = postList.stream().map(PostEsDTO::objToDto).collect(Collectors.toList());
        final int pageSize = 500;
        int total = postEsDTOList.size();
        log.info("FullSyncPostToEs start, total {}", total);
        for (int i = 0; i < total; i += pageSize) {
            int end = Math.min(i + pageSize, total);
            log.info("sync from {} to {}", i, end);
            postEsDao.saveAll(postEsDTOList.subList(i, end));
        }
        log.info("FullSyncPostToEs end, total {}", total);
    }
}

增量同步有五种方式:

1、定时任务

  • 定时任务:比如1 分钟 1 次,找到 MySQL 中过去几分钟内(至少是定时周期的 2 倍)发生改变的数据,然后更新到 ES。
public class IncSyncPostToEs {

    @Resource
    private PostMapper postMapper;

    @Resource
    private PostEsDao postEsDao;

    /**
     * 每分钟执行一次
     */
    @Scheduled(fixedRate = 60 * 1000)
    public void run() {
        // 查询近 5 分钟内的数据
        Date fiveMinutesAgoDate = new Date(new Date().getTime() - 5 * 60 * 1000L);
        List<Post> postList = postMapper.listPostWithDelete(fiveMinutesAgoDate);
        if (CollectionUtils.isEmpty(postList)) {
            log.info("no inc post");
            return;
        }
        List<PostEsDTO> postEsDTOList = postList.stream()
                .map(PostEsDTO::objToDto)
                .collect(Collectors.toList());
        final int pageSize = 500;
        int total = postEsDTOList.size();
        log.info("IncSyncPostToEs start, total {}", total);
        for (int i = 0; i < total; i += pageSize) {
            int end = Math.min(i + pageSize, total);
            log.info("sync from {} to {}", i, end);
            postEsDao.saveAll(postEsDTOList.subList(i, end));
        }
        log.info("IncSyncPostToEs end, total {}", total);
    }
}

优点:简单易懂、占用资源少、不用引入第三方中间件;
缺点:有时间差;
应用场景:数据时间内不同步影响不大、或者数据几乎不发生修改;

2、双写

  • 双写:写数据的时候,必须去写入到ES,更新、删除都需要操作ES(加事务:可能存在写入某一方出现失败,形成脏数据)。

3、MQ异步写入

  • MQ异步写入:在写入数据库时,通过MQ异步写入ES,同样可能存在数据写入不一致问题。

4、Logstash

  • ES的Logstash数据同步管道:Logstash 事件处理管道有三个阶段:输入过滤器输出

下载地址:https://www.elastic.co/guide/en/logstash/7.17/installing-logstash.html
inputs 模块负责收集数据,filters 模块可以对收集到的数据进行格式化、过滤、简单的数据处理,outputs 模块负责将数据同步到目的地,Logstash的处理流程,就像管道一样,数据从管道的一端,流向另外一端。

inputs 和 outputs 支持编解码器,使您能够在数据进入或离开管道时对数据进行编码或解码,而无需使用单独的过滤器。
在这里插入图片描述
启动Logstash,添加一个conf配置文件,便可完成同步任务。

 C:\Windows\system32> cd C:\logstash-7.17.23\
 C:\logstash-7.17.23> .\bin\logstash.bat -f .\config\syslog.conf

syslog.conf:数据同步的配置文件。

举个例子:

输入事件:

input {
  jdbc {
    jdbc_driver_library => "mysql-connector-java-5.1.36-bin.jar"  //数据库驱动
    jdbc_driver_class => "com.mysql.jdbc.Driver"                 //连接数据库
    jdbc_connection_string => "jdbc:mysql://localhost:3306/mydb"
    jdbc_user => "mysql"
     jdbc_password => "mysql"
     statement => "SELECT * from songs where artist = :favorite_artist"  //执行sql语句
    parameters => { "favorite_artist" => "Beethoven" }   //预编译  
    schedule => "* * * * *"    //corn表达式,多久进行同步
  }
}

:sql_last_value 可以设置每次查询结果中updatetime为最后的时间,作为下次增量同步的开始时间(需要对时间进行排序才能保证最后一条数据为时间最大的)。

input {
  jdbc {
    statement => "SELECT id, mycolumn1, mycolumn2 FROM my_table WHERE updatetime > :sql_last_value order  by updatetime desc"
    use_column_value => true
    tracking_column => "updatetime "
    # ... other configuration bits
  }
}

输出事件:

output {
stdout { codec => rubydebug }
elasticsearch {
hosts => "127.0.0,1:9200"  //写入到ES
index => "post_v1"        //ES对应的索引
document_id => "%{id)"	    //取数据库查询出的id作为ES中的唯一id
}
}

过滤事件:

filter {
mutate {
rename => {
"updatetime" =>"updateTime"    //给字段重命名
"userid"     => "userId"
"createtime" =>"createTime"
"isdelete"   =>"isDelete"
remove_field =>["thumbnm""favournum"]   //移除不需要同步到ES中的字段

更多参数,可参考官方文档进行配置:https://www.elastic.co/guide/en/logstash/7.17/output-plugins.html。

5、Canal

  • Canal

优点:实时同步,实时性非常强;
原理:数据库每次修改时,会修改 binlog 文件,只要监听该文件的修改,就能第一时间得到消息并处理;
canal: 帮你监听 binlog,并解析 binlog 为你可以理解的内容,它伪装成了 mysql 的从节点,获取主节点给的 binlog。

在这里插入图片描述

参考文档:https://github.com/alibaba/canal/wiki/QuickStart

后记
👉👉💕💕美好的一天,到此结束,下次继续努力!欲知后续,请看下回分解,写作不易,感谢大家的支持!! 🌹🌹🌹

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/874865.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

科技之光,照亮未来之路“2024南京国际人工智能展会”

全球科技产业的版图正以前所未有的速度重构&#xff0c;而位于中国东部沿海经济带的江浙沪地区&#xff0c;作为科技创新与产业升级的高地&#xff0c;始终站在这一浪潮的最前沿。2024年&#xff0c;这一区域的科技盛宴——“2024南京人工智能展会”即将在南京国际博览中心盛大…

Golang | Leetcode Golang题解之第390题消除游戏

题目&#xff1a; 题解&#xff1a; func lastRemaining(n int) int {a1 : 1k, cnt, step : 0, n, 1for cnt > 1 {if k%2 0 { // 正向a1 step} else { // 反向if cnt%2 1 {a1 step}}kcnt >> 1step << 1}return a1 }

基于zabbix实现监控Jenkins过程---超详细

&#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:Linux运维老纪的首页…

【ArcGIS】栅格计算器原理及案例介绍

ArcGIS&#xff1a;栅格计算器原理及案例介绍 栅格计算器&#xff08;Raster Calculator&#xff09;原理介绍案例案例1&#xff1a;计算栅格数据平均值 参考 栅格计算器&#xff08;Raster Calculator&#xff09;原理介绍 描述&#xff1a;在类似计算器的界面中&#xff0c;…

learn C++ NO.13——list

前言 本文将从list的使用&#xff0c;再到根据sgi库对于list实现作为参考模拟实现一下list。通过模拟实现来增加对它的理解。 介绍list list是一个由带头双向循环链表实现的STL容器&#xff0c;它提供常规时间内对数据进行插入和删除操作。 list在内存中存储不连续的空间存储…

【HarmonyOS NEXT】实现网络图片保存到手机相册

【问题描述】 给定一个网络图片的地址&#xff0c;实现将图片保存到手机相册 【API】 phAccessHelper.showAssetsCreationDialog【官方文档】 https://developer.huawei.com/consumer/cn/doc/harmonyos-references-V5/js-apis-photoaccesshelper-V5#showassetscreationdialog…

ISO27001信息安全管理体系认证怎么做?

ISO27001认证是关于信息安全管理体系认证&#xff0c;ISO27001将有效保证企业在信息安全领域的可靠性&#xff0c;降低企业泄密风险&#xff0c;更好的保存核心数据。下面给大家详细讲讲ISO27001信息安全管理体系认证详细办理流程。 一、ISO27001信息安全管理体系认证的办理条…

【Centos】Centos系统换yum源

【Centos】Linux&#xff0c;Centos系统换yum源 1、备份 mv /etc/yum.repos.d/CentOS-Base.repo /etc/yum.repos.d/CentOS-Base.repo.bak/etc/yum.repos.d/CentOS-Base.repo 是yum的配置文件 /etc/yum.repos.d/CentOS-Base.repo.bak 是我们备份的配置文件 2、下载yum源 这里…

【EI稳定,马来亚大学主办】2024年计算机与信息安全国际会议(WCCIS 2024,9月27-29)

2024年计算机与信息安全国际会议 (WCCIS 2024) 将于2024年9月27-29日召开。 会议旨在为从事计算机与信息安全的专家学者、工程技术人员、技术研发人员提供一个共享科研成果和前沿技术&#xff0c;了解学术发展趋势&#xff0c;拓宽研究思路&#xff0c;加强学术研究和探讨&…

【编译原理】编译器概述、编译器结构、编译器实例

编译器概述、编译器结构、编译器实例 编译器概述 1.编译器是一个程序 核心功能是把源代码翻译成目标代码 比如源代码&#xff1a;C/C&#xff0c;Java&#xff0c;C#&#xff0c;html 目标代码&#xff1a;X86&#xff0c;IA64,ARM,… 把一种源程序翻译成另外一种源程序&…

/bin/bash的作用

1、为啥使用不了很多命令&#xff1f; 我刚进入一个新系统&#xff1a; 我当时蒙蔽了&#xff0c;这是啥意思&#xff0c;为啥没命令? 原因是&#xff1a;当时进入的shell并没有初始化这些路径环境&#xff0c;所以正确的方法是&#xff1a; 2、/bin/bash运行的过程中执行…

Mac清理其他文件:释放存储空间的高效指南

每个Mac用户都可能遇到存储空间不足的问题&#xff0c;尤其是当“其他”文件积累到一定体积时。在Mac上&#xff0c;“其他”文件通常包括各种系统文件、缓存、文档以及不被归类为应用程序、照片、电影或音乐的其他类型的文件。这些文件往往不易被注意&#xff0c;但逐渐占用了…

C语言代码练习(第十八天)

今日练习&#xff1a; 48、猴子吃桃问题。猴子第1天摘下若干个桃子&#xff0c;当即吃了一半&#xff0c;还不过瘾&#xff0c;又多吃了一个。第2天早上又将剩下的桃子吃掉一半&#xff0c;又多吃了一个。以后每天早上都吃了前一天剩下的一半零一个。到第10天早上想再吃时&…

TMGM:欧元区储蓄率处于结构性增高

法国的家庭储蓄率进一步上升&#xff0c;从2024年第一季度的家庭总可支配收入(GDI)的17.6%上升到2024年第二季度的17.9%&#xff0c;这信息来自于法国国家统计和经济研究所&#xff08;INSEE&#xff09;。这也是欧元区正在进行的上升趋势的早期迹象。虽然第二季度数字还没出来…

【C++】C++ STL探索:容器适配器 Stack 与 Queue 的使用及模拟实现

C语法相关知识点可以通过点击以下链接进行学习一起加油&#xff01;命名空间缺省参数与函数重载C相关特性类和对象-上篇类和对象-中篇类和对象-下篇日期类C/C内存管理模板初阶String使用String模拟实现Vector使用及其模拟实现List使用及其模拟实现 本文将详细介绍如何使用容器适…

探索最佳 Shell 工具:全面测评 Bash、Zsh、Fish、Tcsh 和 Ksh

感谢浪浪云支持发布 浪浪云活动链接 &#xff1a;https://langlangy.cn/?i8afa52 文章目录 1. 简介2. 测评工具3. 测评标准4. Bash 测评4.1 易用性4.2 功能特性4.3 性能4.4 可定制性4.5 社区和支持 5. Zsh 测评5.1 易用性5.2 功能特性5.3 性能5.4 可定制性5.5 社区和支持 6. F…

3款数据恢复免费版软件评测:帮你轻松解决数据丢失问题

如今数字化时代&#xff0c;数据至关重要&#xff0c;珍贵照片、重要文档、成长记录的视频音频&#xff0c;承载回忆与努力。但数据丢失风险常伴&#xff0c;误删、格式化、病毒攻击等意外频发&#xff0c;令人陷入绝望&#xff0c;如坠数据黑洞。所幸科技发展&#xff0c;数据…

精益生产现场管理和改善的“蜕变”之旅:从理念到落地的全方位指南

精益生产现场管理和改善&#xff0c;正逐步成为众多企业转型升级的必经之路。今天&#xff0c;就让我们&#xff08;深圳天行健企业管理咨询公司&#xff09;带大家一起踏上这场从理念到落地的“蜕变”之旅&#xff0c;探索精益生产现场管理与改善的方方面面&#xff0c;为企业…

API安全 | 发现API的5个小tips

在安全测试目标时&#xff0c;最有趣的测试部分是它的 API。API 是动态的&#xff0c;它们比应用程序的其他部分更新得更频繁&#xff0c;并且负责许多后端繁重的工作。在现代应用程序中&#xff0c;我们通常会看到 REST API&#xff0c;但也会看到其他形式&#xff0c;例如 Gr…

游戏论坛网站|基于Springboot+vue的游戏论坛网站系统游戏分享网站(源码+数据库+文档)

游戏论坛|游戏论坛系统|游戏分享网站 目录 基于Springbootvue的游戏论坛网站系统游戏分享网站 一、前言 二、系统设计 三、系统功能设计 四、数据库设计 五、核心代码 六、论文参考 七、最新计算机毕设选题推荐 八、源码获取&#xff1a; 博主介绍&#xff1a;✌️大…