问题分析
mysql和redis之间有数据同步问题,ES和mysql之间也有数据同步问题。
单体项目可以在crud时就直接去修改,但在微服务里面不同的服务不行。
方案一
方案二
方案三
总结
导入酒店管理项目
倒入完成功启动后可以看见数据成功获取到了
声明队列和交换机
发生增,删,改时要发消息,这里增和改可以合成一个业务。
在消费者中声明交换机和队列。
在hotel-demo项目中引入依赖
<!--amqp-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置yaml文件
rabbitmq:
host:
port: 5672
username:
password:
virtual-host:
定义一个常量类
public class MqConstants {
/**
* 交换机
*/
public final static String HOTEL_EXCHANGE="hotel.topic";
/**
* 监听新增和修改的队列
*/
public final static String HOTEL_INSERT_QUEUE="hotel.insert.queue";
/**
* 监听删除的队列
*/
public final static String HOTEL_DELETE_QUEUE="hotel.delete.queue";
/**
* 新增和修改的RoutingKey
*/
public final static String HOTEL_INSERT_KEY="hotel.insert";
/**
* 删除的RoutingKey
*/
public final static String HOTEL_DELETE_KEY="hotel.delete";
}
基于Bean的方式
定义一个配置类并绑定关系
@Configuration
public class MqConfig {
@Bean
public TopicExchange topicExchange(){
return new TopicExchange(MqConstants.HOTEL_EXCHANGE,true,false);
}
@Bean
public Queue insertQueue(){
return new Queue(MqConstants.HOTEL_INSERT_QUEUE,true);
}
@Bean
public Queue deleteQueue(){
return new Queue(MqConstants.HOTEL_DELETE_QUEUE,true);
}
@Bean
public Binding insertQueueBinding(){
return BindingBuilder.bind(insertQueue()).to(topicExchange()).with(MqConstants.HOTEL_INSERT_KEY);
}
@Bean
public Binding deleteQueueBinding(){
return BindingBuilder.bind(deleteQueue()).to(topicExchange()).with(MqConstants.HOTEL_DELETE_KEY);
}
}
发送消息
在生产者中进行发送。把上面的常量类复制到hotel-admin项目中,同时也要配置rabbit的配置信息
在hotel-admin中引入依赖
<!--amqp-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
在Controller层中
@Autowired
private RabbitTemplate rabbitTemplate;
@PostMapping
public void saveHotel(@RequestBody Hotel hotel){
hotelService.save(hotel);
rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_INSERT_KEY,hotel.getId());
}
@PutMapping()
public void updateById(@RequestBody Hotel hotel){
if (hotel.getId() == null) {
throw new InvalidParameterException("id不能为空");
}
hotelService.updateById(hotel);
rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_INSERT_KEY,hotel.getId());
}
@DeleteMapping("/{id}")
public void deleteById(@PathVariable("id") Long id) {
hotelService.removeById(id);
rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_DELETE_KEY,id);
}
监听消息
在消费者端hotel-demo项目进行修改
新建一个监听类
@Component
public class HotelListener {
@Autowired
private IHotelService hotelService;
/**
* 鉴定酒店新增或修改的业务
* @param id
*/
@RabbitListener(queues = MqConstants.HOTEL_INSERT_QUEUE)
public void listenHotelInsertOrUpdate(Long id){
hotelService.insertById(id);
}
/**
* 鉴定酒店删除的业务
* @param id
*/
@RabbitListener(queues = MqConstants.HOTEL_DELETE_QUEUE)
public void listenHotelDelete(Long id){
hotelService.deleteById(id);
}
}
对应在Service中
要对ES进行修改。
但是这里应该是不能访问数据库.......只能访问ES才对
@Override
public void deleteById(Long id) {
try {
//1.准备request
DeleteRequest request = new DeleteRequest("hotel", id.toString());
//2.发送请求
client.delete(request,RequestOptions.DEFAULT);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public void insertById(Long id) {
try {
//0.根据id查询酒店数据
Hotel hotel = getById(id);
//转换为文档类型
HotelDoc hotelDoc = new HotelDoc(hotel);
//1.准备Request对象
IndexRequest request = new IndexRequest("hotel").id(hotel.getId().toString());
//2.准备JSON文档
request.source(JSON.toJSONString(hotelDoc), XContentType.JSON);
//3.发送请求
client.index(request,RequestOptions.DEFAULT);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
测试同步功能
.....有一点小小的问题,内存不够情况下es会莫名其妙删除数据,导致我只能重新创建索引库并且导入数据,但最后功能无误