一、假设需求:
- 某系统在MySQL某表中操作了一条数据
- 在其他系统中,实时获取最新被操作数据的数据库名、数据表名、操作类型、数据内容
应用场景:
按最近项目的一个需求来说:
1.当某子系统向报警表中新增了一条报警数据;
2.项目中各个子系统需要获取刚刚新增的报警数据;
3.如果使用传统入库查库方式:
- 大批量插入时获取最新的报警数据需要新增查询逻辑
- 频繁获取最新新增数据效率较低
二、实现思路
- 使用ApplicationListener监听数据库
- 将监听到的数据同步并发布到Redis消息队列中
- 其他系统订阅Redis消息队列频道获取新增的最新数据
三、代码实现
- 引入redis客户端依赖(SpringBoot并未集成)
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>5.0.0</version>
</dependency>
- 创建数据同步事件
public class MessageEvent extends ApplicationEvent {
private CdcMessage message;
/**
* 初始化对象
*
* @param source
*/
public MessageEvent(Object source, CdcMessage message) {
super(source);
this.message = message;
}
@Override
public Object getSource() {
return super.getSource();
}
public CdcMessage getMessage() {
return this.message;
}
public void setMessage(CdcMessage message) {
this.message = message;
}
}
- 创建数据信息类CdcMessage
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class CdcMessage implements Serializable {
/**
* 数据
*/
private JSONObject data;
/**
* 数据库类型
*/
private String dbType;
/**
* 处理类型(UPDATE DELETE CREATE)
*/
private String handleType;
/**
* 数据库名
*/
private String database;
/**
* 表名
*/
private String table;
/**
* JSON 转对象
*
* @param clazz 转换类型
* @param <T> 泛型
* @return 集合结果
*/
public <T> List<T> toBean(Class<T> clazz) {
List<T> rst = new LinkedList<>();
rst.add(JSON.toJavaObject(data, clazz));
return rst;
}
}
- 创建数据同步方法(实现ApplicationListener数据监听接口,实现onApplicationEvent方法)
@Slf4j
@Component
public class Process implements ApplicationListener<MessageEvent> {
@Override
public void onApplicationEvent(MessageEvent event) {
CdcMessage message = event.getMessage();
// 当TableName表进行新增操作时,执行数据同步操作
if ("TableName".equalsIgnoreCase(message.getTable()) && "CREATE".equals(message.getHandleType())) {
// 创建Jedis对象,连接到Redis服务器
Jedis jedis = new Jedis("ip", 6379);
// 设置认证密码
jedis.auth("psssword");
JSONObject messageData = message.getData();
// 发布消息给消费者
jedis.publish("频道名称", JSON.toJSONString(messageData ));
// 关闭Jedis连接
jedis.close();
}
}
}
四、测试
- 编写测试代码(消息订阅)
@Test
public void test() {
// 创建Jedis对象,连接到Redis服务器
Jedis jedis = new Jedis("ip", 6379);
// 设置认证密码
jedis.auth("password");
// 创建消息订阅器对象
JedisPubSub jedisPubSub = new JedisPubSub() {
@Override
public void onMessage(String channel, String message) {
// 在接收到消息时执行的逻辑,可以根据实际需求进行编写
System.out.println(message);
}
};
// 订阅指定频道
jedis.subscribe(jedisPubSub, "频道名称");
// 关闭Jedis连接
jedis.close();
}
- 新增数据
- 获取消息订阅数据
五、总结
该功能主要实现方式为传统数据监听+MQ消息发布/订阅。由于该项目系统MQ只集成了Redis,所以未使用四大MQ从而使用Redis。