基于Canal同步MySQL数据到Elasticsearch

基于Canal同步MySQL数据到Elasticsearch

基于 canal 同步 mysql 的数据到 elasticsearch 中。

1、canal-server

相关软件的安装请参考:《Canal实现数据同步》

1.1 pom依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.example</groupId>
    <artifactId>canal-to-elasticsearch</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>canal-to-elasticsearch</name>
    <description>canal to elasticsearch</description>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.0.RELEASE</version>
        <relativePath/>
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client</artifactId>
            <version>1.1.4</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

1.2 SimpleCanalClientExample编写

package com.example.canatest.config;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;

import java.net.InetSocketAddress;
import java.util.List;

/**
 * 说明:用于测试canal是否已经连接上了mysql
 */
public class SimpleCanalClientExample {
    public static void main(String args[]) {
        // 创建链接
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.94.186",
                11111), "example", "", "");
        int batchSize = 1000;
        int emptyCount = 0;
        try {
            connector.connect();
            connector.subscribe(".*\\..*");
            connector.rollback();
            int totalEmptyCount = 120;
            while (emptyCount < totalEmptyCount) {
                // 获取指定数量的数据
                Message message = connector.getWithoutAck(batchSize);
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    emptyCount++;
                    System.out.println("empty count : " + emptyCount);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                    }
                } else {
                    emptyCount = 0;
                    // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
                    printEntry(message.getEntries());
                }
                connector.ack(batchId); // 提交确认
                // connector.rollback(batchId); // 处理失败, 回滚数据
            }
            System.out.println("empty too many times, exit");
        } finally {
            connector.disconnect();
        }
    }

    private static void printEntry(List<CanalEntry.Entry> entrys) {
        for (CanalEntry.Entry entry : entrys) {
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                continue;
            }
            CanalEntry.RowChange rowChage = null;
            try {
                rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                        e);
            }
            CanalEntry.EventType eventType = rowChage.getEventType();
            System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));
            for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == CanalEntry.EventType.DELETE) {
                    printColumn(rowData.getBeforeColumnsList());
                } else if (eventType == CanalEntry.EventType.INSERT) {
                    printColumn(rowData.getAfterColumnsList());
                } else {
                    System.out.println("-------&gt; before");
                    printColumn(rowData.getBeforeColumnsList());
                    System.out.println("-------&gt; after");
                    printColumn(rowData.getAfterColumnsList());
                }
            }
        }
    }

    private static void printColumn(List<CanalEntry.Column> columns) {
        for (CanalEntry.Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
        }
    }
}

在这里插入图片描述

在这里插入图片描述

注意当后面canal-adapter也连接上canal-server后,程序就监听不到数据变化了。

这个类只是测试,下面不使用。

2、canal-adapter

由于目前canal-adapter没有官方docker镜像,所以拉去一个非官方的。

canal-adapter安装:

搜索镜像

$ docker search canal-adapter

在这里插入图片描述

拉取镜像

$ docker pull slpcat/canal-adapter:v1.1.5

在这里插入图片描述

启动

$ docker run -p 8081:8081 --name canal-adapter -d slpcat/canal-adapter:v1.1.5

在这里插入图片描述

修改配置

$ docker exec -it 89ef714d3a0e /bin/bash
$ cd conf/
$ vi application.yml
server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null
canal.conf:
  mode: tcp #tcp kafka rocketMQ rabbitMQ
  flatMessage: true
  zookeeperHosts:
  syncBatchSize: 1000
  retries: 0
  timeout:
  accessKey:
  secretKey:
  consumerProperties:
    # canal tcp consumer
    # canal.tcp.server.host需要修改
    canal.tcp.server.host: 192.168.94.186:11111
    canal.tcp.zookeeper.hosts:
    canal.tcp.batch.size: 500
    canal.tcp.username:
    canal.tcp.password:
  srcDataSources:
    defaultDS:
      # url,username,password需要修改
      url: jdbc:mysql://192.168.94.186:3306/canal_test?useUnicode=true
      username: canal
      password: canal
  canalAdapters:
  - instance: example # canal instance Name or mq topic name
    groups:
    - groupId: g1
      outerAdapters:
      - name: logger
      # name需要修改
      - name: es7
        # hosts需要修改
        hosts: 192.168.94.186:9200 # 127.0.0.1:9200 for rest mode
        properties:
          mode: rest
          # security.auth: test:123456 #  only used for rest mode
          # cluster.name需要修改
          cluster.name: my-es
$ cd conf/es7
$ cp -v mytest_user.yml canal_test_collect.yml
# 删除其他多余的
$ rm -rf biz_order.yml customer.yml mytest_user.yml
$ vi dailyhub_collect.yml
dataSourceKey: defaultDS
# 需要修改
destination: example
# 需要修改
groupId: g1
esMapping:
  # 需要修改
  _index: canal_test
  _id: _id
  _type: _doc
  upsert: true
#  pk: id
  # 需要修改
  sql: "
SELECT
        c.id AS _id,
        c.user_id AS userId,
        c.title AS title,
        c.url AS url,
        c.note AS note,
        c.collected AS collected,
        c.created AS created,
        c.personal AS personal,
        u.username AS username,
        u.avatar AS userAvatar
FROM
        m_collect c
LEFT JOIN m_user u ON c.user_id = u.id

"
#  objFields:
#    _labels: array:;
#   etlCondition: "where c.c_time>={}"
  commitBatch: 3000

也可以在外面编辑好,通过docker命令传输到docker容器中:

$ docker cp canal_test_collect.yml canal-adapter:/opt/canal-adapter/conf/es7/canal_test_collect.yml
$ docker cp application.yml canal-adapter:/opt/canal-adapter/conf/application.yml

重启容器

$ docker restart 89ef714d3a0e

验证是否启动成功

$ docker logs -f 89ef714d3a0e

在这里插入图片描述

注意对于时间类型,在后端一定要使用LocalDateTime或者LocalDate类型,如果是Date类型,需要自己手动

设置格式。

3、测试

准备测试条件:

1、首先在数据库中生成表和字段

CREATE TABLE `m_user` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `avatar` varchar(255) DEFAULT NULL,
  `created` date DEFAULT NULL,
  `lasted` date DEFAULT NULL,
  `open_id` varchar(255) DEFAULT NULL,
  `statu` int(11) DEFAULT NULL,
  `username` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4;

CREATE TABLE `m_collect` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `collected` date DEFAULT NULL,
  `created` date DEFAULT NULL,
  `note` varchar(255) DEFAULT NULL,
  `personal` int(11) DEFAULT NULL,
  `title` varchar(255) DEFAULT NULL,
  `url` varchar(255) DEFAULT NULL,
  `user_id` bigint(20) DEFAULT NULL,
  PRIMARY KEY (`id`),
  KEY `FK6yx2mr7fgvv204y8jw5ubsn7h` (`user_id`),
  CONSTRAINT `FK6yx2mr7fgvv204y8jw5ubsn7h` FOREIGN KEY (`user_id`) REFERENCES `m_user` (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=19 DEFAULT CHARSET=utf8mb4;

在这里插入图片描述

2、然后在elasticsearch中生成索引

# 创建索引并添加映射字段
PUT /canal_test
{
  "mappings": {
    "properties": {
      "collected": {
        "type": "date",
        "format": "date_optional_time||epoch_millis"
      },
      "created": {
        "type": "date",
        "format": "date_optional_time||epoch_millis"
      },
      "note": {
        "type": "text",
        "analyzer": "ik_max_word",
        "search_analyzer": "ik_smart"
      },
      "personal": {
        "type": "integer"
      },
      "title": {
        "type": "text",
        "analyzer": "ik_max_word",
        "search_analyzer": "ik_smart"
      },
      "url": {
        "type": "text"
      },
      "userAvatar": {
        "type": "text"
      },
      "userId": {
        "type": "long"
      },
      "username": {
        "type": "keyword"
      }
    }
  }
}

在这里插入图片描述

3、插入数据

INSERT INTO `m_user` VALUES ('1', 'https://image-1300566513.cos.ap-guangzhou.myqcloud.com/upload../../images/5a9f48118166308daba8b6da7e466aab.jpg', '2022-01-05', '2022-01-06', 'ozWZ-uAOY2iecT-byynO382u01zg', '0', 'MarkerHub');

在这里插入图片描述

4、查看数据

GET /canal_test/_search

5、遇到的问题

如果看到canal-adapter一直出现这种异常,说明启动顺序不对,启动顺序应该是:mysqlescanal

adapar

2022-01-11 10:43:15.278 [Thread-2] ERROR c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - com.alibaba.otter.canal.protocol.exception.CanalClientException: java.io.IOException: Broken pipe Error sync but ACK!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/111182.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Android图形系统之HWComposer、ComposerHal、ComposerImpl、Composer、Hwc2::Composer实例总结(十四)

简介: CSDN博客专家,专注Android/Linux系统,分享多mic语音方案、音视频、编解码等技术,与大家一起成长! 优质专栏:Audio工程师进阶系列【原创干货持续更新中……】🚀 人生格言: 人生从来没有捷径,只有行动才是治疗恐惧和懒惰的唯一良药. 更多原创,欢迎关注:Android…

利用远程IO模块,轻松驾驭食品包装生产的自动化

常见的自动化包装系统&#xff0c;它的核心部分通常由一系列高端设备组成&#xff0c;包括自动开箱机、自动封箱机、自动捆扎机、装箱机器人、码垛机器人等。这些设备协同工作&#xff0c;形成一条高效运转的生产线&#xff0c;从开箱到装箱&#xff0c;再到码垛&#xff0c;每…

多测师肖sir_高级金牌讲师_性能测试之badboy录制脚本02

性能测试之badboy录制脚本 一、下载安装包&#xff0c;点击安装 二、点击我同意 三、选择路径&#xff0c;点击install 打开以下界面&#xff0c;表示安装成功 第二步&#xff1a;录制流程 界面视图&#xff0c;模拟浏览器&#xff0c;能够进行操作 需要录制脚本的URL 点…

MLF - 麻辣粉

MLF全称中期借贷便利&#xff08;Medium-term lending Facility&#xff09;,理解为央行向商业银行、政策银行发放的贷款&#xff0c;但需要符合一定要求才可向央行申请。银行通过MLF向央行借款的时候&#xff0c;需要提供担保品。一般为国债、央行票据、政策性金融债、地方债、…

C++设计模式_19_Memento 备忘录(理解,目前多使用序列化方案来实现)

Memento 备忘录模式也属于“状态变化”模式&#xff0c;它是一个小模式&#xff0c;在今天来看有些过时&#xff0c;当今已经很少使用当前模式实现需求&#xff0c;思想却不变&#xff08;信息隐藏&#xff09;&#xff0c;目前多使用序列化方案来实现。本系列所介绍的模式&…

企业金蝶KIS软件服务器中了locked勒索病毒怎么办,勒索病毒解密

最近一段时间&#xff0c;网络上的locked勒索病毒又开始了新一波的攻击&#xff0c;给企业的正常生产生活带来了严重影响。经过最近一段时间云天数据恢复中心对locked勒索病毒的解密&#xff0c;为大家整理了以下有关locked勒索病毒的相关信息。近期locked勒索病毒主要攻击金蝶…

【httpd】 Apache http服务器目录显示不全解决

文章目录 1. 文件名过长问题1.1 在centos中文件所谓位置etc/httpd/conf.d/httpd-autoindex.conf1.2 在配置文件httpd-autoindex.conf中的修改&#xff1a;1.3 修改完成后重启Apache&#xff1a; 1. 文件名过长问题 1.1 在centos中文件所谓位置etc/httpd/conf.d/httpd-autoindex…

解决SQLServer占用80端口问题

在安装好了SQLServer之后&#xff0c;发现系统默认80端口被占用&#xff0c;导致很多默认用80端口的软件运行不起来。 解决办法 1、运行(快捷键:WINR) services.msc 2、找到SQL Server Reporting Services (MSSQLSERVER) 服务 3、先停止服务&#xff0c;然后再禁用服务

浅谈Redis的五大数据类型及其应用

前言 Redis是一种开源的内存数据结构存储系统,它支持多种数据类型,包括字符串String、列表list、集合、哈希表和有序集合。这些数据类型在Redis中有着广泛的应用场景,可以满足不同的业务需求。本文将介绍Redis的五大数据类型及其应用。 一、string数据类型 常用命令: …

C 语言选择练习题(一): C语言基础

本文主要考察知识点范围为&#xff1a; C语言系统化精讲&#xff08;一&#xff09;&#xff1a;编程基础&#xff1a;https://blog.csdn.net/xw1680/article/details/134001443 C语言系统化精讲&#xff08;二&#xff09;&#xff1a;C语言初探&#xff1a;https://blog.csdn…

如何隐藏woocommerce 后台header,woocommerce-layout__header

如何隐藏woocommerce 后台header&#xff0c;woocommerce-layout__header WooCommerce |Products Store Activity| Inbox| Orders| Stock| Reviews| Notices| breadcrumbs 在 functions.php 里添加如下代码即可&#xff1a; // Disable WooCommerce Header in WordPress Admi…

Web3 React项目Dapp获取智能合约对象

上文Web3 整理React项目 导入Web3 并获取区块链信息中&#xff0c;我们在react搭建的dapp中简单拿到了我们区块链中的账号授权信息 那 我们继续 先终端运行 ganache -d将ganache环境起起来 然后 我们运行 dapp 拿到授权列表 回到上文结束的一个状态 然后 我们发布一下自己的…

提高小程序SEO 排名,9招优化技巧!

在当今移动互联网时代&#xff0c;小程序已经成为企业必不可少的一种营销手段&#xff0c;而如何让用户能够更容易地找到自己的小程序&#xff0c;就需要进行SEO优化&#xff0c;提升小程序的排名&#xff0c;本文将 为大家介绍9个小程序SEO优化技巧&#xff0c;帮助您的小程序…

双指针——快乐数

一、题目解析 由题目我们可以分析出无非就两种情况&#xff1a; 这个数一直变化最终能变到1这个数一直变化最终是无限循环 其实这两种情况我们也可以抽象成是一种情况&#xff0c;因为第一种情况虽然变到了1但是1再继续变下去也是形成一个环&#xff0c;只不过这个环的数都是…

视频下载软件 Downie4 mac中文介绍

Downie mac是一款Mac平台上非常实用的视频下载工具。它支持下载各种视频网站上的视频&#xff0c;并且具有快速、稳定、易于使用的特点。 Downie支持下载各种视频网站上的视频&#xff0c;包括YouTube、Vimeo、Netflix、Hulu、Amazon等等。它具有快速、稳定的下载速度&#xff…

css:transform实现平移、旋转、缩放、倾斜元素

目录 文档语法示例旋转元素 transform-rotate旋转过渡旋转动画 参考文章 文档 https://developer.mozilla.org/zh-CN/docs/Web/CSS/transform 语法 /* Keyword values */ transform: none;/* Function values */ transform: matrix(1, 2, 3, 4, 5, 6); transform: translate…

使用simple_3dviz进行三维模型投影

【版权声明】 本文为博主原创文章&#xff0c;未经博主允许严禁转载&#xff0c;我们会定期进行侵权检索。 更多算法总结请关注我的博客&#xff1a;https://blog.csdn.net/suiyingy&#xff0c;或”乐乐感知学堂“公众号。 本文章来自于专栏《Python三维模型处理基础》的系列文…

0基础学习PyFlink——用户自定义函数之UDTAF

大纲 UDTAFTableAggregateFunction的实现累加器定义创建累加 返回类型计算 完整代码 在前面几篇文章中&#xff0c;我们分别介绍了UDF、UDTF和UDAF这三种用户自定义函数。本节我们将介绍最后一种函数&#xff1a;UDTAF——用户自定义表值聚合函数。 UDTAF UDTAF函数即具备了…

ATECLOUD如何进行电源模块各项性能指标的测试?

ATECLOUD平台进行电源模块各项性能指标的测试是通过以下步骤实现的&#xff1a; 连接测试设备&#xff1a;将测试设备与云计算服务器连接&#xff0c;实现数据采集和远程控制。测试设备包括示波器、电子负载、电源、万用表等&#xff0c;这些设备通过纳米BOX连接到云测试平台上…

【Java 进阶篇】深入理解 Java Response:从基础到高级

HTTP响应&#xff08;Response&#xff09;是Web开发中的一个关键概念&#xff0c;它是服务器向客户端&#xff08;通常是浏览器&#xff09;返回数据的方式。理解如何在Java中处理和构建HTTP响应是开发Web应用程序的重要一部分。本文将从基础知识到高级技巧&#xff0c;详细介…