目录
一、开启binlog日志
1.首先查看是否开启了binlog
2、开启binlog日志,并重启mysql服务
二、授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限
三、下载配置canal
1、下载 canal, 访问 release 页面 , 选择需要的包下载, 如以 1.0.17 版本为例
2、 修改conf\example文件夹下instance.properties配置文件
3、启动canal服务(windows下是bat,linux是sh)
四、基于Canal通知原理
五、项目集成
1、pom依赖
2、编写依赖
3、修改Item实体类
4、编写监听器(Redis/ jvm缓存同步)
六、测试
1、数据库表数据和redis数据
2、添加数据
3、修改数据
编辑4、删除数据
一、开启binlog日志
1.首先查看是否开启了binlog
show variables like '%log_bin%';
如果是OFF说明位开启
2、开启binlog日志,并重启mysql服务
右键我的电脑——管理——服务——MYSQL——属性
这里是my.ini地址
在[mysqld]底下添加
log-bin= mysqlbinlog
binlog-format=ROW
配置好之后,要进行重启mysql服务
查看状态
show variables like '%log_bin%';
开启成功
二、授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
三、下载配置canal
1、下载 canal, 访问 release 页面 , 选择需要的包下载, 如以 1.0.17 版本为例
解压后
2、 修改conf\example文件夹下instance.properties配置文件
最终配置文件
#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0
# enable gtid use true/false
canal.instance.gtidon=false
# position info
canal.instance.master.address=127.0.0.1:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=
# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=
# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=
# username/password
canal.instance.dbUsername=root
canal.instance.dbPassword=msir1234
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
# table regex
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch
# mq config
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,topic2:mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.enableDynamicQueuePartition=false
#canal.mq.partitionsNum=3
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#################################################
3、启动canal服务(windows下是bat,linux是sh)
点击startup.bat启动
查看日志 logs\canal\canal.log,如果启动成功则如下图
四、基于Canal通知原理
解读:
- 商品服务完成商品修改后,业务直接结束,没有任何代码侵入
- Canal监听MySQL数据库的变化,当发现变化后,立即通知缓存服务。
- 缓存服务接收到canal通知,更新缓存
五、项目集成
1、pom依赖
使用GitHub上的第三方开源的canal-starter客户端。地址:https://github.com/NormanGyllenhaal/canal-client
<dependency>
<groupId>top.javatool</groupId>
<artifactId>canal-spring-boot-starter</artifactId>
<version>1.2.1-RELEASE</version>
</dependency>
2、编写依赖
canal:
destination: example
server: localhost:11111 #canal的ip地址和端口
3、修改Item实体类
Canal推送给canal-client的是被修改的这一行数据(row),而我们引入的canal-client则会帮我们把行数据封装到Item实体类中。这个过程需要知道数据库与实体的映射关系。需要用到JPA的注解:
通过@Id、@Column等注解完成Item与数据库表字段的映射:
注意,如果和数据库名称不同必须用@Column
package com.springboot3.domain;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import java.io.Serializable;
import lombok.Data;
import javax.persistence.Column;
import javax.persistence.Id;
/**
*
* @TableName course
*/
@TableName(value ="course")
@Data
public class Course implements Serializable {
/**
*
*/
@Id
@TableId
private String id;
/**
*
*/
private String name;
/**
*
*/
@Column(name = "teacher_id")
private String teacherId;
@TableField(exist = false)
private static final long serialVersionUID = 1L;
}
4、编写监听器(Redis/ jvm缓存同步)
通过实现EntryHandler<T>
接口编写监听器,监听Canal消息。注意两点:
- 实现类通过
@CanalTable("course")
指定监听的表信息。 - EntryHandler的泛型是与表对应的实体类
package com.springboot3.handler;
import com.springboot3.domain.Course;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import top.javatool.canal.client.annotation.CanalTable;
import top.javatool.canal.client.handler.EntryHandler;
@CanalTable(value = "Course")
@Component
@Slf4j
public class CourseHandler implements EntryHandler<Course> {
@Autowired
private RedisTemplate redisTemplate;
@Override
public void insert(Course course) {
log.info("insert message {}", course);
redisTemplate.opsForValue().set(course.getId(),course);
}
@Override
public void update(Course before, Course after) {
log.info("update before {} ", before);
log.info("update after {}", after);
redisTemplate.opsForValue().set(after.getId(),after);
}
@Override
public void delete(Course course) {
log.info("delete {}", course);
redisTemplate.delete(course.getId());
}
}
六、测试
1、数据库表数据和redis数据
数据库:
redis:
2、添加数据
添加数据物理
控制台显示:
redis数据库:
3、修改数据
将物理修改为化学
控制台显示:
redis数据库:
4、删除数据
删除化学科目
控制台数据
redis数据库数据: