1、环境准备
服务器:Centos7
Jdk版本:1.8
Mysql版本:5.7.44
Canal版本:1.17
Es版本:7.12.1
kibana版本:7.12.1
软件包下载地址:链接:https://pan.baidu.com/s/1jRpCJP0-hr9aIghC2ZbS4g 提取码:zzzz
IP地址 | 安装软件 |
---|---|
192.168.50.210 | Mysql,Canal |
192.168.50.211 | Es,Kibana |
2、安装es以及kibana
2.1 安装docker
#设置源 wget https://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo -O /etc/yum.repos.d/docker-ce.repo # 安装依赖 yum install -y yum-utils device-mapper-persistent-data lvm2 # 安装docker yum install -y docker-ce # 检查安装 docker -v # 启动 systemctl start docker
设置容器镜像加速地址 登录 阿里云容器镜像服务 进入到 镜像工具 -> 镜像加速器
# https://xxxxxxx.mirror.aliyuncs.com 替换成你的地址 !!!!!!!!!!!! sudo mkdir -p /etc/docker sudo tee /etc/docker/daemon.json <<-'EOF' { "registry-mirrors": ["https://xxxxxxx.mirror.aliyuncs.com"] } EOF sudo systemctl daemon-reload sudo systemctl restart docker
设置开机启动
systemctl enable docker.service
2.2 安装es
# 创建容器网络 es-net docker network create es-net # docker 安装 es docker run -d \ --name es \ -e "ES_JAVA_OPTS=-Xms512m -Xmx512m" \ -e "discovery.type=single-node" \ -v es-data:/usr/share/elasticsearch/data \ -v es-plugins:/usr/share/elasticsearch/plugins \ --privileged \ --network es-net \ -p 9200:9200 \ -p 9300:9300 \ elasticsearch:7.12.1 # 开通端口 sudo iptables -A INPUT -p tcp --dport 9200 -j ACCEPT sudo iptables -A INPUT -p tcp --dport 9300 -j ACCEPT
2.3 安装kibana
# docker 安装 kibana docker run -d \ --name kibana \ -e ELASTICSEARCH_HOSTS=http://es:9200 \ -e "I18N_LOCALE=zh-CN" \ --network=es-net \ -p 5601:5601 \ kibana:7.12.1 # 开通端口 sudo iptables -A INPUT -p tcp --dport 5601 -j ACCEPT
3、 安装mysql以及canel
3.1 安装Jdk
# 创建jdk安装路径 mkdir -p /opt/java #将 jdk-8u301-linux-x64.tar.gz 放置 /opt/java mv /youpath/jdk-8u301-linux-x64.tar.gz /opt/java # 解压 cd /opt/java tar -zxvf jdk-8u301-linux-x64.tar.gz # 添加环境变量 vi /etc/profile # 加入如下片段 JAVA_HOME=/opt/java/jdk1.8.0_301 JRE_HOME=/opt/java/jdk1.8.0_301/jre PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib export JAVA_HOME JRE_HOME PATH CLASSPATH # 保存 # 刷新环境变量 source /etc/profile # 检查 java -version
3.2 安装Mysql
# 将安装包 mysql-5.7.44-linux-glibc2.12-x86_64.tar.gz 放入/opt下 cd /opt tar zxvf mysql-5.7.44-linux-glibc2.12-x86_64.tar.gz mv mysql-5.7.44-linux-glibc2.12-x86_64 mysql # 删除安装包 rm mysql-5.7.44-linux-glibc2.12-x86_64.tar.gz # 添加环境变量 vi /etc/profile # 加入如下代码段 export PATH=/opt/mysql/bin:$PATH # 刷新环境变量 source /etc/profile # 创建数据目录 mkdir -p /opt/mysql/data # 创建用户 mysql useradd -m mysql # 将/opt/mysql 权限给到mysql用户 chown -R mysql:mysql /home/mysql/mysql-5.7.44 # 切换用户 su mysql # 初始化mysql mysqld --initialize --user=mysql --basedir=/opt/mysql --datadir=/opt/mysql/data
如下:记录初始密码,下边要用到
# 编写配置文件 vi /etc/my.cnf # 新增或者修改参数如下 [mysqld] symbolic-links=0 # 禁用软连接 user=mysql # 用户 basedir=/opt/mysql datadir=/opt/mysql/data socket=/tmp/mysql.sock lower_case_table_names=1 server-id=1 port=3306 log-bin=/opt/mysql/mysql-bin binlog-format=ROW expire-logs-days=15
#复制启动脚本 cp /opt/mysql/support-files/mysql.server /etc/init.d/mysqld #启动mysql /etc/init.d/mysqld start
# 修改数据库密码 mysql -uroot -p # 这里输入的是上边初始的默认密码 mysql>set password=password('root'); # 创建用户 mysql>use mysql; mysql>CREATE USER 'canal'@'%' IDENTIFIED BY 'canal'; mysql>GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; mysql>FLUSH PRIVILEGES; # 创建数据库 mysql>create database canal_test character set utf8mb4 collate utf8mb4_bin; mysql>use canal_test; # 创建表 mysql>CREATE TABLE open_user ( id bigint(21) not null auto_increment, user_name varchar(255) CHARACTER SET utf8mb4 DEFAULT NULL, sex int(11) DEFAULT 1, portrait varchar(255) NOT NULL, create_time datetime DEFAULT NULL, update_time datetime DEFAULT NULL, primary key (id) ); # 退出 mysql>exit;
3.3 安装canal.deployer
# 创建 canal用户 useradd -m canal # 设置密码 passwd canal # 密码为 canal,输入两遍即可 su canal cd ~ mkdir canal-deployer # 将 canal.deployer-1.1.7.tar.gz 移至 canal-deployer mv /youpath/canal.deployer-1.1.7.tar.gz /home/canal/canal-deployer cd /home/canal/canal-deployer tar -zxvf canal.deployer-1.1.7.tar.gz cd conf/example vi instance.properties # 修改如下配置 canal.instance.master.address=127.0.0.1:3306 canal.instance.dbUsername=canal canal.instance.dbPassword=canal # 保存 # 进入到启动命令目录下 cd /home/canal/canal-deployer/bin sh ./startup.sh # 查看日志 tail -f /home/canal/canal-deployer/logs/example/example.log
3.4 安装canal.adapter
# 接上 cd ~ mkdir canal-adapter # 将 canal.adapter-1.1.7.tar.gz 移至 canal-adapter mv /youpath/canal.adapter-1.1.7.tar.gz /home/canal/canal-adapter cd canal-adapter tar -zxvf canal.adapter-1.1.7.tar.gz # 修改配置文件为如下
server: port: 8081 spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 default-property-inclusion: non_null canal.conf: mode: tcp #tcp kafka rocketMQ rabbitMQ flatMessage: true zookeeperHosts: syncBatchSize: 1000 retries: -1 timeout: accessKey: secretKey: consumerProperties: # canal tcp consumer canal.tcp.server.host: 127.0.0.1:11111 canal.tcp.zookeeper.hosts: canal.tcp.batch.size: 500 canal.tcp.username: canal.tcp.password: srcDataSources: defaultDS: url: jdbc:mysql://127.0.0.1:3306/canal_test?useUnicode=true&useSSL=false username: canal password: canal canalAdapters: - instance: example # canal instance Name or mq topic name groups: - groupId: g1 outerAdapters: - name: logger - name: es7 hosts: http://192.168.50.211:9200 # es地址 properties: mode: rest # or rest # security.auth: test:123456 # only used for rest mode cluster.name: docker-cluster
处理日期格式化为 yyyy-MM-dd HH:mm:ss 需将 client-adapter.es7x-1.1.7-jar-with-dependencies.jar 替换掉 /home/canal/canal-adapter/plugin 中的 client-adapter.es7x-1.1.7-jar-with-dependencies.jar
# 设置数据以及es映射信息 # 进入到配置目录下的es7目录 cd /home/canal/canal-adapter/conf/es7 # 创建文件 open_user.yml 内容如下:
dataSourceKey: defaultDS #此配置为application.yml 的key destination: example #此配置为canal的name groupId: g1 esMapping: _index: open_user _type: _doc _id: _id sql: "SELECT u.id AS _id,u.user_name AS userName,u.sex,u.portrait,u.create_time as createTime,u.update_time as updateTime FROM open_user u" commitBatch: 3000
# 启动 # 进入启动目录 cd /home/canal/canal-adapter/bin sh startup.sh tail -f /home/canal/canal-adapter/logs/adapter/adapter.log
3.5 初始数据
3.5.1 创建索引
进入kibana控制页面
打开 kibana
地址:http://192.168.50.211:5601
进入开发工具菜单
3.5.2 新增数据
在mysql中增加记录
INSERT INTO canal_test.open_user (id, user_name, sex, portrait, create_time, update_time) VALUES (1, '张三', 2, '学生', '2023-11-02 16:31:21', '2023-11-02 16:39:20'); INSERT INTO canal_test.open_user (id, user_name, sex, portrait, create_time, update_time) VALUES (2, '李四', 1, '美术组组长', '2023-11-03 08:57:32', '2023-11-03 08:57:34'); INSERT INTO canal_test.open_user (id, user_name, sex, portrait, create_time, update_time) VALUES (3, '王五', 1, '班长', '2023-11-03 09:13:35', '2023-11-03 09:13:37'); INSERT INTO canal_test.open_user (id, user_name, sex, portrait, create_time, update_time) VALUES (4, '赵六', 1, '劳动委员', '2023-11-03 09:44:45', '2023-11-03 09:44:46');
3.5.3 导入数据
进入canal 安装服务器 直接调用canal-adapter的Rest API:如下:
curl -X POST http://127.0.0.1:8081/etl/es7/open_user.yml
3.5.4 导入数据
1)查看 adapter.log 日志
tail -f /home/canal/canal-adapter/logs/adapter/adapter.log
2)修改数据库 open_user 表中的数据
update open_user set user_name = '章三' where id = 1;
日志输出如下:
2023-11-03 16:12:02.477 [pool-3-thread-1] INFO c.a.o.canal.client.adapter.logger.LoggerAdapterExample - DML: {"data":[{"id":1,"user_name":"章三","sex":2,"portrait":"学生","create_time":1698913881000,"update_time":1698914360000}],"database":"canal_test","destination":"example","es":1698999121000,"groupId":"g1","isDdl":false,"old":[{"user_name":"张三"}],"pkNames":["id"],"sql":"","table":"open_user","ts":1698999122129,"type":"UPDATE"} 2023-11-03 16:12:02.477 [pool-3-thread-1] DEBUG c.a.o.canal.client.adapter.es.core.support.ESSyncUtil - typeConvert valClass:class java.lang.String val:章三 esType:text 2023-11-03 16:12:02.483 [pool-3-thread-1] DEBUG c.a.o.canal.client.adapter.es.core.service.ESSyncService - DML: {"data":[{"id":1,"user_name":"章三","sex":2,"portrait":"学生","create_time":1698913881000,"update_time":1698914360000}],"database":"canal_test","destination":"example","es":1698999121000,"groupId":"g1","isDdl":false,"old":[{"user_name":"张三"}],"pkNames":["id"],"sql":"","table":"open_user","ts":1698999122129,"type":"UPDATE"} Affected indexes: open_user
4、Spring-boot集成
4.1 创建springboot工程引入如下依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-elasticsearch</artifactId> </dependency>
4.2 创建实体类
package com.example.demo.model; import com.alibaba.fastjson.annotation.JSONField; import com.fasterxml.jackson.annotation.JsonFormat; import org.springframework.data.annotation.Id; import org.springframework.data.elasticsearch.annotations.Document; import org.springframework.data.elasticsearch.annotations.Field; import org.springframework.data.elasticsearch.annotations.FieldType; import java.io.Serializable; import java.util.Calendar; import java.util.Date; @Document(indexName = "open_user", type = "_doc") public class OpenUser implements Serializable { @Id private String id; @Field(type = FieldType.Text) private String userName; @Field(type = FieldType.Text) private String sex; @Field(type = FieldType.Text) private String portrait; @Field(type = FieldType.Date) @JSONField(format = "yyyy-MM-dd HH:mm:ss") @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") private Date createTime; @Field(type = FieldType.Date) @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") @JSONField(format = "yyyy-MM-dd HH:mm:ss") private Date updateTime; public String getId() { return id; } public void setId(String id) { this.id = id; } public String getUserName() { return userName; } public void setUserName(String userName) { this.userName = userName; } public String getSex() { return sex; } public void setSex(String sex) { this.sex = sex; } public String getPortrait() { return portrait; } public void setPortrait(String portrait) { this.portrait = portrait; } public Date getCreateTime() { return createTime; } public void setCreateTime(Date createTime) { Calendar cal = Calendar.getInstance(); cal.setTime(createTime); cal.add(Calendar.HOUR, +8); this.createTime = cal.getTime(); } public Date getUpdateTime() { return updateTime; } public void setUpdateTime(Date updateTime) { Calendar cal = Calendar.getInstance(); cal.setTime(updateTime); cal.add(Calendar.HOUR, +8); this.updateTime = cal.getTime(); } @Override public String toString() { return "OpenUser{" + "id='" + id + '\'' + ", userName='" + userName + '\'' + ", sex='" + sex + '\'' + ", portrait='" + portrait + '\'' + ", createTime=" + createTime + '\'' + ", updateTime=" + updateTime + '}'; } }
4.3 创建接口
package com.example.demo.controller; import com.example.demo.model.OpenUser; import org.apache.commons.lang3.StringUtils; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.domain.Page; import org.springframework.data.domain.PageRequest; import org.springframework.data.domain.Pageable; import org.springframework.data.domain.Sort; import org.springframework.data.elasticsearch.core.ElasticsearchTemplate; import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder; import org.springframework.data.elasticsearch.core.query.SearchQuery; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; @RestController public class SearchController { @Autowired private ElasticsearchTemplate elasticsearchTemplate; @PostMapping("/findOpenUserByUserName") public Page<OpenUser> findOpenUserByUserName(@RequestParam(value = "userName") String userName, @RequestParam(value = "pageNum", required = false) Integer pageNum, @RequestParam(value = "pageSize", required = false) Integer pageSize) { if (StringUtils.isBlank(userName)) { return null; } if (pageNum == null || pageNum < 0) { pageNum = 0; // if page is null, page = 0 size default 1 } if (pageSize == null || pageSize < 0) { pageSize = 10; // if size is null, size default 10 } // 分页,根据时间倒序 Pageable pageable = PageRequest.of(pageNum, pageSize, Sort.Direction.DESC, "createTime"); // 查询姓名 QueryBuilder builder = null; if (userName.matches("^[A-Za-z0-9]+$")) { builder = QueryBuilders.boolQuery() .must(QueryBuilders.wildcardQuery("userName", ("*" + userName + "*").toLowerCase())); } else { builder = QueryBuilders.boolQuery() .must(QueryBuilders.matchPhraseQuery("userName", userName.toLowerCase())); } SearchQuery searchQuery = new NativeSearchQueryBuilder().withQuery(builder).withPageable(pageable).build(); return elasticsearchTemplate.queryForPage(searchQuery, OpenUser.class); } }