说明:缓存数据同步,以Redis为例,如何保证从Redis中取出来的数据与MySQL中的一致?在微服务架构下,通常可以用以下两种技术来实现:
- MQ:在修改数据的同时,发送一个消息修改缓存;
- Canal:监听数据库,数据库发生改变时,同步更新缓存;
本文介绍Canal的实现,以下操作均在云服务上,操作系统是CentOS
设置MySQL主从
Canal是基于MySQL的主从同步功能,使用前需要先开启MySQL的主从功能,操作如下:
第一步:开启binlog
找到MySQL容器所挂载的日志文件,添加以下内容:
log-bin=/var/lib/mysql/mysql-bin
binlog-do-db=cache
log-bin=/var/lib/mysql/mysql-bin
:指定库记录binary log events,取名为cache;
binlog-do-db=cache
:设置binary log文件的存放地址和文件名;
查看mysql容器所挂载的数据卷可使用下面这个命令
docker volume inspect 数据卷名
如果不知道数据卷名,可停掉mysql容器,并删掉。在/tmp目录下创建一个mysql目录,并进入到mysql目录下,执行下面的命令启动mysql容器;
# 进入/tmp目录
cd /tmp
# 创建文件夹
mkdir mysql
# 进入mysql目录
cd mysql
# 启动mysql容器,设置密码为123456
docker run \
-p 3306:3306 \
--name mysql \
-v $PWD/conf:/etc/mysql/conf.d \
-v $PWD/logs:/logs \
-v $PWD/data:/var/lib/mysql \
-e MYSQL_ROOT_PASSWORD=123456 \
--privileged \
-d \
mysql:5.7.25
第二步:设置用户权限
进入mysql命令行模式,如使用navicat或者使用CMD连接MySQL,敲以下命令(建议一行一行敲):
create user canal@'%' IDENTIFIED by 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%' identified by 'canal';
FLUSH PRIVILEGES;
第三部:重启容器
重启mysql容器
回到mysql命令行,敲下面的命令
show master status;
出现下面的内容,表示设置完成(如果报错了,可能是有延迟,可以等下再敲命令重试)
环境搭建
第一步:安装Canal
拉取Canal镜像,拉取前应该先去docker官方仓库查看可提供的版本号
docker pull canal:版本号
如果网络状态差()的话不推荐拉取,可使用本地加载的方式;
加载完成
第二步:创建网络
canal容器需要和mysql容器关联,创建一个网络,取名demo;
docker network create demo
把mysql容器加入到这个网络中;
docker network connect demo mysql
第三步:启动canal
输入下面的命令,启动canal容器,需要注意相关名称;
docker run -p 11111:11111 --name canal \
-e canal.destinations=cache \
-e canal.instance.master.address=mysql:3306 \
-e canal.instance.dbUsername=canal \
-e canal.instance.dbPassword=canal \
-e canal.instance.connectionCharset=UTF-8 \
-e canal.instance.tsdb.enable=true \
-e canal.instance.gtidon=false \
-e canal.instance.filter.regex=db_user\\..* \
--network demo \
-d canal/canal-server:v1.1.5
canal.destinations=cache \
:binlog-do-db的名称;
canal.instance.master.address=mysql:3306
:数据库名称和端口;
--network demo
:上面创建的网络名称;
代码实现
本项目基于Redis缓存预热(参考:http://t.csdn.cn/rZ4En),是一个很简单的项目,部分代码如下:
controller层代码:
@RestController
@RequestMapping("/user")
public class UserController {
@Autowired
private UserService userService;
/**
* 查询所有用户信息
* @return
*/
@GetMapping("list")
public List<User> getUsers() {
return userService.list();
}
/**
* 根据ID查询用户信息
* @param id
* @return
*/
@GetMapping("{id}")
public User getUserById(@PathVariable Long id) {
return userService.getById(id);
}
/**
* 根据ID删除用户
* @param id
*/
@DeleteMapping("{id}")
public void deleteUserById(@PathVariable Long id){
userService.removeById(id);
}
}
Redis缓存预热代码
@Component
public class RedisHandler implements InitializingBean {
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private UserService userService;
private static final ObjectMapper MAPPER = new ObjectMapper();
@Override
public void afterPropertiesSet() throws Exception {
// 初始化缓存
// 1.查询所有用户信息
List<User> userList = userService.list();
// 2.放入缓存
for (User user : userList) {
// 2.1.将user对象序列化为JSON
String json = MAPPER.writeValueAsString(user);
// 2.2.设置key前缀,存入redis
redisTemplate.opsForValue().set("user:id:" + user.getId(), json);
}
}
}
因为使用了Redis缓存预热,在项目启动时会使用全查方法,将所有用户的数据存入到redis中;
第一步:引入依赖
先引入canal的依赖
<dependency>
<groupId>top.javatool</groupId>
<artifactId>canal-spring-boot-starter</artifactId>
<version>1.2.1-RELEASE</version>
</dependency>
添加相关配置
canal:
destination: cache #binlog-do-db的名称;
server: 服务器IP:11111
第二步:修改实体类
修改User类,添加一些注解(@Id注解、@Colume注解、@Transient注解),分别用于表示主键,关键字段名,容易发生变动的字段;
@Data
@AllArgsConstructor
@NoArgsConstructor
@TableName("tb_user")
public class User implements Serializable {
@TableId(type = IdType.AUTO)
@Id
private Integer id;
@Column(name = "username")
private String username;
private String password;
private String name;
private Integer gender;
private String image;
@Transient
private Integer job;
private String entrydate;
@Transient
private Integer deptId;
private String createTime;
private String updateTime;
@TableLogic(value = "1", delval = "0")
private Integer isDel;
}
第三步:修改Redis缓存代码
修改RedisHandler代码如下,添加新增、删除相关的方法;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.hzy.pojo.User;
import com.hzy.service.UserService;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
public class RedisHandler implements InitializingBean {
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private UserService userService;
private static final ObjectMapper MAPPER = new ObjectMapper();
@Override
public void afterPropertiesSet() throws Exception {
// 1.查询用户信息
List<User> UserList = userService.list();
// 2.放入缓存
for (User User : UserList) {
// 2.1.User序列化为JSON
String json = MAPPER.writeValueAsString(User);
// 2.2.设置key值,存入redis
redisTemplate.opsForValue().set("user:id:" + User.getId(), json);
}
}
public void saveUser(User User) {
try {
String json = MAPPER.writeValueAsString(User);
redisTemplate.opsForValue().set("user:id:" + User.getId(), json);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
public void deleteUserById(Long id) {
redisTemplate.delete("user:id:" + id);
}
}
第四步:编写监听器类
写一个监听器类,当数据库中的数据发生变化时,会修改Redis中的缓存数据;
import cn.hutool.core.convert.Convert;
import com.github.benmanes.caffeine.cache.Cache;
import com.hzy.config.RedisHandler;
import com.hzy.pojo.User;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import top.javatool.canal.client.annotation.CanalTable;
import top.javatool.canal.client.handler.EntryHandler;
@CanalTable("tb_user")
@Component
public class UserHandler implements EntryHandler<User> {
@Autowired
private RedisHandler redisHandler;
@Autowired
private Cache<Long, User> userCache;
@Override
public void insert(User user) {
// 写数据到JVM进程缓存
userCache.put(Convert.toLong(user.getId()), user);
// 写数据到redis
redisHandler.saveUser(user);
}
@Override
public void update(User before, User after) {
// 写数据到JVM进程缓存
userCache.put(Convert.toLong(after.getId()), after);
// 写数据到redis
redisHandler.saveUser(after);
}
@Override
public void delete(User user) {
// 删除数据到JVM进程缓存
userCache.invalidate(user.getId());
// 删除数据到redis
redisHandler.deleteUserById(Convert.toLong(user.getId()));
}
}
第五步:启动测试
项目启动后,会发现控制台在实时打印检测状态
让我们看下Redis中的数据,因为有Redis缓存预热,项目启动就会有所有用户的数据;
此时,让我们删掉一条用户信息,再查看Redis中的缓存数据,看有没有更新;
控制台报错了
百度了说是Druid连接池的问题,我试了下也没有解决
总之,以上就是缓存数据同步技术Canal的实现,我使用VM测试过,是可以跑通的,可能是云服务器带宽的原因或者是身份验证的原因,导致没有跑通。
另外说一句,即便删除了用户,在Redis缓存那边也还是有用户信息的,因为在User类中设置了“逻辑删除”的字段,所以并不会真的删除用户,但是Redis缓存中对应用户的逻辑删除字段应该是会发生改变的。
删除用户,数据库中对应用户的逻辑删除字段设置为0;
正常的话,Redis缓存中的该用户信息,逻辑删除字段的值应该要同步修改;
总结
使用Canal作为缓存数据同步,没有代码入侵,耦合低;