Elastic Search(ES)Java 入门实操(3)数据同步

基本概念和数据查询代码:

Elastic Search (ES)Java 入门实操(1)下载安装、概念-CSDN博客

Elastic Search(ES)Java 入门实操(2)搜索代码-CSDN博客

想要使用 ES 来查询数据,首先得要 ES 里有数据,但是如果是后来引入的 ES,数据库上万条的数据肯定不能通过手动进行同步,需要使用其他方法进行同步。

数据同步分为全量同步和增量同步。

所谓全量同步,就是引入 ES 时将 MySQL 里的数据全部同步到 ES 里。增量同步就是当数据库的数据发生变化时,将变化的数据同步到 ES 里。

同步方法

定时任务

通过定时任务的方式,每隔一段时间进行同步。比如每一分钟同步一次。

优点:简单,占用资源少,不用引入第三方中间件

缺点:有时间差,数据一致性要求高的场景不适用

全量同步通过实现 CommandLineRunner 接口,在程序启动时执行。

/**
 * CommandLineRunner 接口,当spring启动时就执行方法
 */
@Component
public class FullSycnToEs implements CommandLineRunner {

    @Resource
    private ArticleService articleService;

    @Resource
    private ArticleEsDao articleEsDao;
    @Override
    public void run(String... args) throws Exception {
        //spring 启动就执行方法进行全量同步
        //1.从MySQL获取数据
        List<Article> articleList = articleService.list();
        if(CollectionUtils.isEmpty(articleList)){
            return;
        }
        //2.将数据转换为DTO
        List<ArticleEsDto> articleDtoList = articleList.stream().map(ArticleEsDto::toDto).collect(Collectors.toList());
        //3.将数据同步到ES
        articleEsDao.saveAll(articleDtoList);
        System.out.println("全量同步完成");
    }
}

增量同步使用 @ Scheduled 定时任务监控更新时间

注意启动类要加上注解 @EnableScheduling

/**
 * 定时任务执行数据同步
 */
@Component
public class InSyncToEs {


    @Resource
    private ArticleMapper articleMapper;

    @Resource
    private ArticleEsDao articleEsDao;

    @Scheduled(fixedRate = 100)
    public void run(){
        // 定时任务,将数据同步到es,根据更新时间来判断
        //假定3分钟内,如果更新时间大于3分钟之前的时间,就是更新了,获取这个数据存入到ES 中
        Date minUpdateTime = new Date(new Date().getTime() - 5* 60*1000L);
        List<Article> newArticles = articleMapper.getNewArticles(minUpdateTime);
        //判断是否有数据更新
        if(CollectionUtils.isEmpty(newArticles)){
            //没有数据更新
            System.out.println("没有数据更新");
            return;
        }
        //有数据更新,将数据转换成dto格式
        List<ArticleEsDto> articleEsDtoList = newArticles.stream().map(ArticleEsDto::toDto).collect(Collectors.toList());
        //将数据存入到ES中
        articleEsDao.saveAll(articleEsDtoList);
        System.out.println("数据同步完成");
    }
}

双写

写入数据库时同时同步到 ES 中,需要考虑 ES 同步失败了怎么办。

使用事务来保证一致性,如果 ES 同步失败了,可以通过定时任务 + 日志 + 告警进行检测和修复(补偿)

Logstash 数据同步管道

传输和处理数据的管道

下载地址:https://artifacts.elastic.co/downloads/logstash/logstash-7.17.21-windows-x86_64.zip

官方文档:Jdbc input plugin | Logstash Reference [7.17] | Elastic

同样的,需要注意版本,下载解压之后在 config 文件夹创建新的同步文件,建议不同的同步脚本创建不同的文件,不要在同一个文件下配置。

文件配置根据官方文档修改,MySQL jar包使用绝对路径即可,否则可能找不到 jar 包,jar 包可以自行准备,也可以从项目的 maven 仓库获取。

# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.

input {
  jdbc {
    // MySQL jar包路径
    jdbc_driver_library => "D:\software\logstash-7.17.21\config\mysql-connector-java-8.0.29.jar"
    // MySQL 驱动
    jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
    // MySQL 连接地址
    jdbc_connection_string => "jdbc:mysql://localhost:3306/mydb"
    //账号密码
    jdbc_user => "root"
    jdbc_password => "1234"
    //动态 SQL
    statement => "SELECT * from article where 1=1"
    parameters => { "favorite_artist" => "Beethoven" }
    //定时执行,core 表达式
    schedule => "*/5 * * * * *"
  }
}

output {
    stdout { codec => rubydebug }
}

 配置好之后在 logstash 目录下执行下面的命令,完成初步从数据库获取数据

.\bin\logstash.bat -f .\config\my-task.conf

 成功获取数据

增量同步配置,使用 updateTime 来进行同步更新的数据。

完整 input 配置如下。

input {
  jdbc {
    jdbc_driver_library => "D:\software\logstash-7.17.21\config\mysql-connector-java-8.0.29.jar"
    jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/mydb"
    jdbc_user => "root"
    jdbc_password => "1234"
    // 动态查询语句,保证最后一条是最大的
    statement => "SELECT * from article where updateTime > :sql_last_value and updateTime < now() order by updateTime asc"
    // 查询参数的 hash,不用更改
    parameters => { "favorite_artist" => "Beethoven" }
    // 查询参数的类型,updatetime 是 timestamp 类型的
    tracking_column_type => "timestamp"
    // 查询参数
    tracking_column => "updatetime"
    // 设置为 true 时,将定义的查询参数值用作动态 SQL 中sql_last_value,false 时:sql_last_value 是上次查询时间
    use_column_value => true
    // 时区设置为上海,否则存在 8小时时差
    jdbc_default_timezone => "Asia/Shanghai"
    // core 表达式
    schedule => "*/5 * * * * *"
  }
}

配置好从 MySQL 获取的数据之后,就可以同步到 ES 中了。同样需要书写配置。

官方文档:Elasticsearch output plugin | Logstash Reference [7.17] | Elastic

output {
    stdout { codec => rubydebug }
    elasticsearch {
        //访问地址,就是本地 ES 端口
        hosts => "127.0.0.1:9200"
        // ES 索引
	    index =>"article_1"
        // 数据 id,从数据库获取
	    document_id => "%{id}"
    }

最终配置

# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.

input {
  jdbc {
    jdbc_driver_library => "D:\software\logstash-7.17.21\config\mysql-connector-java-8.0.29.jar"
    jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/mydb"
    jdbc_user => "root"
    jdbc_password => "1234"
    statement => "SELECT * from article where updateTime > :sql_last_value and updateTime < now() order by updateTime asc "
    parameters => { "favorite_artist" => "Beethoven" }
    tracking_column_type => "timestamp"
    tracking_column => "updatetime"
    use_column_value => true
    jdbc_default_timezone => "Asia/Shanghai"
    schedule => "*/5 * * * * *"
  }
}
// 筛选
filter{
	mutate{
        //重命名
		rename => {
			"updatetime" =>"updateTime"
			"createtime" => "createTime"
			"isdetele" => "isDetele"
		}
	}
}

output {
    stdout { codec => rubydebug }
    elasticsearch {
        hosts => "127.0.0.1:9200"
	index =>"article_1"
	document_id => "%{id}"
    }
}

同步成功! 

logstash 的优点:配置完成后使用比较方便,插件多

                缺点:要多维护组件,一般需要配合其他中间件,比如(kafka)

Canal

下载地址:Releases · alibaba/canal (github.com)

文档:QuickStart · alibaba/canal Wiki (github.com)

实时同步数据,通过监控 MySQL 的 binlog,当数据库发生修改时,会修改 binlog 文件,然后 canal 监听到就可以同步到 ES 中。

对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,在 MySQL 目录下新建一个my.ini,配置如下

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant,直接在查询控制台执行如下命令

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

bin 目录下 startup 启动即可。

然后 Java 需要一个客户端,首先引入依赖

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.0</version>
</dependency>

客户端代码

import java.net.InetSocketAddress;
import java.util.List;


import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;


public class SimpleCanalClientExample {


public static void main(String args[]) {
    // 创建链接
    CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
                                                                                        11111), "example", "", "");
    int batchSize = 1000;
    int emptyCount = 0;
    try {
        connector.connect();
        connector.subscribe(".*\\..*");
        connector.rollback();
        int totalEmptyCount = 120;
        while (emptyCount < totalEmptyCount) {
            Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
            long batchId = message.getId();
            int size = message.getEntries().size();
            if (batchId == -1 || size == 0) {
                emptyCount++;
                System.out.println("empty count : " + emptyCount);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                }
            } else {
                emptyCount = 0;
                // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
                printEntry(message.getEntries());
            }

            connector.ack(batchId); // 提交确认
            // connector.rollback(batchId); // 处理失败, 回滚数据
        }

        System.out.println("empty too many times, exit");
    } finally {
        connector.disconnect();
    }
}

private static void printEntry(List<Entry> entrys) {
    for (Entry entry : entrys) {
        if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
            continue;
        }

        RowChange rowChage = null;
        try {
            rowChage = RowChange.parseFrom(entry.getStoreValue());
        } catch (Exception e) {
            throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                                       e);
        }

        EventType eventType = rowChage.getEventType();
        System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                                         entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                                         entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                                         eventType));

        for (RowData rowData : rowChage.getRowDatasList()) {
            if (eventType == EventType.DELETE) {
                printColumn(rowData.getBeforeColumnsList());
            } else if (eventType == EventType.INSERT) {
                printColumn(rowData.getAfterColumnsList());
            } else {
                System.out.println("-------&gt; before");
                printColumn(rowData.getBeforeColumnsList());
                System.out.println("-------&gt; after");
                printColumn(rowData.getAfterColumnsList());
            }
        }
    }
}

private static void printColumn(List<Column> columns) {
    for (Column column : columns) {
        System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
    }
}

    
  

}

正在监听,修改数据之后可以把修改前和修改后的数据查询出来,之后只需要将修改后的数据同步到 ES 即可,比如通过 ES 的 save 方法。 

过程出现的问题

1. 在执行命令.\bin\logstash.bat -f .\config\my-task.conf  时报错

只需要更改 bin 目录下的 setup.bat 文件中的双引号去掉即可。 

2. canal 启动 报错 canal 1.1.8版本

不知道什么原因,是和 MySQL 8不兼容还是其他原因,报 druid 错误。

解决方法:简单粗暴,下载 1.1.7 版本,实测有效

3. 找不到 JAVA_HOME

修改变量或者修改启动项

编辑 startup.bat,在文件中添加如下配置:

// 自己的 jdk 路径
set JAVA_HOME=C:\Users\p'b\.jdks\corretto-1.8.0_392
// 覆盖环境变量
set PATH=%JAVA_HOME%\bin;%PATH%

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/692138.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

为什么会有虚像

本来我就打算写虚像相关的内容&#xff0c;实际上我看不懂光学的内容&#xff0c;我只是发觉书上没有使用变分法来做&#xff0c;而只是解析几何的变换&#xff0c;这个做法完全脱离实际&#xff0c;物理书为什么会这样写不知道原因&#xff0c;但是很明显这样的内容也非常的复…

操作系统复习-存储管理之段页式存储管理

存储管理之段页式存储管理 页式存储管理(等分划分) 字块是相对物理设备的定义页面则是相对逻辑空间的定义指的都是大小一样的一块内存页式存储管理是将进程逻辑空间等分成若干大小的页面相应的把物理内存空间分成与页面大小的物理块以页面为单位把进程空间装进物理内存中分散的…

【MySQL】常见可执行程序

本文使用的版本是MySQL8&#xff0c;5.7可能会有所不同。 MySQL提供了一些重要的程序用来管理和操作数据库。这里会介绍一些常用的程序及其使用。对于MySQL程序的使用&#xff0c;可以查看官方帮助手册来学习。 MySQL :: MySQL 8.0 Reference Manual :: 6 MySQL Programs 程序…

normalizing flows vs 直方图规定化

normalizing flows名字的由来 The base density P ( z ) P(z) P(z) is usually defined as a multivariate standard normal (i.e., with mean zero and identity covariance). Hence, the effect of each subsequent inverse layer is to gradually move or “flow” the da…

C# Maui 报错:程序“[15748] MauiApp1.exe”已退出,返回值为 2147942405 (0x80070005)

“MauiApp1.exe”(CoreCLR: DefaultDomain): 已加载“C:\Program Files\dotnet\shared\ Microsoft.NETCore.App\8.0.6\System.Private.CoreLib.dll”。 “MauiApp1.exe”(CoreCLR: clrhost): 已加载“E:\cDemo\MauiApp1\MauiApp1\bin\Debug\net8.0-windows10.0.19041.0\win10-x…

数智融通 创新发展|亚信科技携AntDB、Data OS与隐私计算产品,赋能企业高质量发展

5月21日&#xff0c;亚信科技在云端举办了一场别开生面的研讨会——“数智融通 创新发展”&#xff0c;聚焦企业数智化升级的前沿话题。资深产品经理和技术架构师们面对面深入交流&#xff0c;分享创新成果与实战案例&#xff0c;共同探索企业数智化转型的新路径。 图1&#xf…

重构某测试站点

一、计算校验值 校验值结果&#xff1a; 文件名称&#xff1a;培训用centos.rar&#xff0c;文件大小&#xff1a;1,335,759,953&#xff0c;MD5&#xff1a;534EC38CDA7DA2196C84AC8F6092514B&#xff0c;SHA1&#xff1a;FD35D86A27A007AE10872980C48653A110DF6067&#xf…

【Ardiuno】ESP32单片机初试点亮LED小灯

之前用的Ardiuno的主板做过一些简单的开发实验&#xff0c;按照相关说明还是很容易进行操作的。最近看了ESP32可以有wifi的功能&#xff0c;也就买来实验一下。 ESP32的主板开发环境安装&#xff0c;按照说明的安装下载程序总是报错&#xff0c;又上网搜索半天最后按照CSDN上某…

算法006:查找总价格为目标值的两个商品

. - 力扣&#xff08;LeetCode&#xff09;. - 备战技术面试&#xff1f;力扣提供海量技术面试资源&#xff0c;帮助你高效提升编程技能,轻松拿下世界 IT 名企 Dream Offer。https://leetcode.cn/problems/he-wei-sde-liang-ge-shu-zi-lcof/ 题干说的很复杂&#xff0c;简化一…

IDEA使用阿里通义灵码插件

在这个AI火热的时代&#xff0c;纯手工写代码已经有点out了&#xff0c;使用AI插件可以帮我们快速写代码&#xff0c;起码能省去写那些简单、重复性的代码&#xff0c;大大提高编码效率&#xff0c;在这里我推荐使用阿里的通义灵码 注册安装 安装注册好后&#xff0c;打开我们…

前端技术探索:从基础到进阶

前端技术作为现代Web开发中不可或缺的一部分&#xff0c;其重要性不言而喻。随着技术的快速发展&#xff0c;前端领域涌现出了许多经典且值得深入探索的技术和框架。本文将带您领略前端技术的魅力&#xff0c;从基础到进阶&#xff0c;一起探讨前端开发的精髓。 一、前端技术基…

【AI时代,生命修行】

今日分享&#x1f4d2;&#xff0c;AI时代&#xff0c; 生命 与 修行&#xff1a; 不要用太多时间去工作&#xff0c;尤其是在人工智能时代。如果谁还在用传统的线性的费时间的这种努力的工作方式&#xff0c;只能说太落伍了。 我只说给同频的朋友们无关的人请划走。因为很多…

AddressSanitizer理论及实践:heap-use-after-free、free on not malloc()-ed address

AddressSanity&#xff1a;A Fast Address Sanity Checker 摘要 对于C和C 等编程语言&#xff0c;包括缓冲区溢出和堆内存的释放后重用等内存访问错误仍然是一个严重的问题。存在许多内存错误检测器&#xff0c;但大多数检测器要么运行缓慢&#xff0c;要么检测到的错误类型有…

AndroidStudio无法识别连接夜神模拟器

方法一(无法从根本上解决) ①进入夜神模拟器安装路径下的bin路径(安装路径可以带有中文路径) ②打开cmd窗口,输入以下代码(一定要打开模拟器) nox_adb.exe connect 127.0.0.1:62001 方法二(根本上解决) 原因:Android Studio的adb版本与夜神模拟器的adb版本不一致 ①打开And…

如何使用ERC-20与Sui Coin标准创建Token

区块链使用tokens作为传递价值的基本手段。它们可以是区块链的原生交换单位&#xff0c;也可以是应用中的交换单位&#xff0c;甚至可以在游戏世界中用作货币。tokens还支持Sui和其他区块链上的强大DeFi活动。 以太坊使用ERC-20标准来创建tokens&#xff0c;借用智能合约&…

大数据环境搭建@Hive编译

Hive3.1.3编译 1.编译原因1.1Guava依赖冲突1.2开启MetaStore后运行有StatsTask报错1.3Spark版本过低 2.环境部署2.1jdk安装2.2maven部署2.3安装图形化桌面2.4安装Git2.5安装IDEA 3.拉取Hive源码4.Hive源码编译4.1环境测试1.测试方法——编译2.问题及解决方案&#x1f4a5;问题1…

全网最强下载神器IDM之如何用IDM下载百度网盘文件不限速 如何用IDM下载百度云资源 IDM激活码免费版下载安装

百度网盘是比较早的网盘类应用&#xff0c;用户群体比较多&#xff0c;但百度网盘对于非会员用户限速比较严重。IDM是非常好用的下载工具&#xff0c;那么我们如何用IDM下载百度网盘文件不限速&#xff1f;我们可以通过多种方法使用IDM下载百度网盘文件。下面我们就来看如何用I…

RabbitMQ python第三方库pika应用入门实践

1. RabbitMQ简介 RabbitMQ是一个可靠、高效的开源消息代理服务器&#xff0c;基于AMQP协议。它具备以下特点&#xff1a; 可以支持多种消息协议&#xff0c;如AMQP、STOMP和MQTT等。提供了持久化、可靠性和灵活的路由等功能。支持消息的发布和订阅模式。具备高可用性和可扩展…

天才程序员周弈帆 | Stable Diffusion 解读(一):回顾早期工作

本文来源公众号“天才程序员周弈帆”&#xff0c;仅用于学术分享&#xff0c;侵权删&#xff0c;干货满满。 原文链接&#xff1a;Stable Diffusion 解读&#xff08;一&#xff09;&#xff1a;回顾早期工作 在2022年的这波AI绘画浪潮中&#xff0c;Stable Diffusion无疑是最…

pdf怎么编辑修改内容?3个实用软件!

在当今数字化时代&#xff0c;PDF文件因其跨平台、格式固定的特性&#xff0c;成为我们日常工作和生活中不可或缺的一部分。然而&#xff0c;PDF文件的修改和编辑往往成为许多人的难题。本文将为您详细介绍如何编辑修改PDF文件的内容&#xff0c;并推荐几款实用的编辑软件&…