分布式 SpringCloudAlibaba、Feign与RabbitMQ实现MySQL到ES数据同步

文章目录

  • ⛄引言
  • 一、思路分析
    • ⛅实现方式
    • ⚡框架选择
  • 二、实现数据同步
    • ⌚需求分析
    • ⏰搭建环境
    • ⚡核心源码
  • 三、测试
  • 四、源码获取
  • ⛵小结

⛄引言

本文参考黑马 分布式Elastic search

Elasticsearch是一款非常强大的开源搜索引擎,具备非常多强大功能,可以帮助我们从海量数据中快速找到需要的内容

一、思路分析

⛅实现方式

同步调用

方案一:同步调用

在这里插入图片描述

基本步骤如下:

  • hotel-demo对外提供接口,用来修改elasticsearch中的数据
  • 酒店管理服务在完成数据库操作后,直接调用hotel-demo提供的接口,

异步通知

方案二:异步通知

在这里插入图片描述

流程如下:

  • hotel-admin对mysql数据库数据完成增、删、改后,发送MQ消息
  • hotel-demo监听MQ,接收到消息后完成elasticsearch数据修改

监听binlong

方案三:监听binlog

在这里插入图片描述

流程如下:

  • 给mysql开启binlog功能
  • mysql完成增、删、改操作都会记录在binlog中
  • hotel-demo基于canal监听binlog变化,实时更新elasticsearch中的内容

⚡框架选择

方式一:同步调用

  • 优点:实现简单,粗暴
  • 缺点:业务耦合度高

方式二:异步通知

  • 优点:低耦合,实现难度一般
  • 缺点:依赖mq的可靠性

方式三:监听binlog

  • 优点:完全解除服务间耦合
  • 缺点:开启binlog增加数据库负担、实现复杂度高

本次实现方式我们选择 以RabbitMQ 异步方式 搭载 SpringCloud Alibaba + Feign 实现。

二、实现数据同步

⌚需求分析

需求

实现酒店管理增删改查业务,已提供页面。 完成其数据发生增删改查操作时 同步 ElasticSearch

分析

我们采用分布式技术的方式来实现

框架采用 SpringCloud Alibaba、Nacos 、OpenFeign 远程调用、RabbitMQ 作为消息承载体承载数据、 Elastic Search 搜索引擎

⏰搭建环境

以下为模块概览

在这里插入图片描述

主要分为两大模块

  • 完成酒店模块增删改查业务,引入MQ依赖,完成其向MQ的发送消息 此模块作为生产者
  • 完成ES-MQ模块,引入MQ、ES依赖,完成接受MQ的消息以及完成对ES的更新 此模块作为消费者

注意:Nacos需要自行下载,本项目依赖于Nacos注册中心, 运行起来后不影响后面的服务注册进nacos

本次所用到的 RabbitMQ、 ElasticSearch 均部署在 云服务器

MQ结构如图:

在这里插入图片描述

⚡核心源码

hotel-service 业务模块

导入hotel-service 核心代码,已完成基础的增删改查工作。 具体源码公众号搜索 程序员Bug终结者 回复 es 获取

ES模块引入依赖

<!--amqp-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- ES -->
<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-high-level-client</artifactId>
</dependency>

声明队列交换机名称

public class MqConstants {

    /**
     * 交换机
     */
    public static final String HOTEL_EXCHANGE = "hotel.topic";

    /**
     * 新增或修改的routing_key
     */
    public static final String HOTEL_INSERT_KEY = "hotel.insert";

    /**
     * 删除的 routing_key
     */
    public static final String HOTEL_DELETE_KEY = "hotel.delete";
}

hotel-service模块 发送消息

@RestController
@RequestMapping("hotel")
public class HotelController {

    @Autowired
    private HotelService hotelService;

    @Resource
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/{id}")
    public Hotel queryById(@PathVariable("id") Long id){
        return hotelService.getById(id);
    }

    @GetMapping("/list")
    public PageResult hotelList(
            @RequestParam(value = "page", defaultValue = "1") Integer page,
            @RequestParam(value = "size", defaultValue = "1") Integer size
    ){
        Page<Hotel> result = hotelService.page(new Page<>(page, size));

        return new PageResult(result.getTotal(), result.getRecords());
    }

    @PostMapping
    public void saveHotel(@RequestBody Hotel hotel){
        hotelService.save(hotel);
        rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE, MqConstants.HOTEL_INSERT_KEY, hotel.getId());
    }

    @PutMapping()
    public void updateById(@RequestBody Hotel hotel){
        if (hotel.getId() == null) {
            throw new InvalidParameterException("id不能为空");
        }
        hotelService.updateById(hotel);
        rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE, MqConstants.HOTEL_INSERT_KEY, hotel.getId());
    }

    @DeleteMapping("/{id}")
    public void deleteById(@PathVariable("id") Long id) {
        hotelService.removeById(id);
        rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE, MqConstants.HOTEL_DELETE_KEY, id);
    }
}

ES模块接受消息

@Component
@Slf4j
public class MqConsumerListener {


    @Resource
    private HotelService hotelService;

    /**
     * 监听酒店新增或修改的业务
     * @param id
     */
    @RabbitListener(bindings = @QueueBinding(value = @Queue(value = MqConstants.HOTEL_INSERT_KEY, durable = "true"),
            exchange = @Exchange(name = MqConstants.HOTEL_EXCHANGE, type = ExchangeTypes.DIRECT), key = MqConstants.HOTEL_INSERT_KEY))
    public void listenHotelInsertOrUpdate(String id) throws IOException {
        hotelService.insertById(id);
    }

    /**
     * 监听酒店删除的业务
     * @param id
     */
    @RabbitListener(bindings = @QueueBinding(value = @Queue(value = MqConstants.HOTEL_DELETE_KEY, durable = "true"),
            exchange = @Exchange(name = MqConstants.HOTEL_EXCHANGE, type = ExchangeTypes.DIRECT), key = MqConstants.HOTEL_DELETE_KEY))
    public void listenHotelDelete(String id) throws IOException {
        hotelService.deleteById(id);
    }
}

核心方法实现

@Service
public class HotelService {

    @Resource
    private RestHighLevelClient client;

    @Resource
    private HotelClient hotelClient;

    public void insertById(String id) {
        try {
            //1. 根据id查询酒店数据
            Hotel hotel = hotelClient.findById(id);
            HotelDoc hotelDoc = new HotelDoc(hotel);
            //2. 准备Request
            IndexRequest request = new IndexRequest("hotel").id(hotel.getId().toString());

            //3. 准备DSL
            request.source(JSON.toJSONString(hotelDoc), XContentType.JSON);
            //4. 发送请求
            client.index(request, RequestOptions.DEFAULT);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void deleteById(String id) {
        try {
            //1. 准备 Request
            DeleteRequest request = new DeleteRequest("hotel", id);
            // 2.发送请求
            client.delete(request, RequestOptions.DEFAULT);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

三、测试

运行nacos

startup.cmd -m standalone

将hotel-service模块注册到nacos

在这里插入图片描述

访问页面,对酒店数据进行增删改查操作

在这里插入图片描述

将第一条信息价格修改为399

在这里插入图片描述

查看es中数据的变化

在这里插入图片描述

成功完成数据同步

四、源码获取

请联系 公众号 程序员Bug终结者 回复 es同步 获取源码及数据库文件

⛵小结

以上就是【Bug 终结者】对 分布式 SpringCloudAlibaba、Feign与RabbitMQ实现MySQL到ES数据同步 的简单介绍,ES搜索引擎无疑是最优秀的分布式搜索引擎,使用它,可大大提高项目的灵活、高效性! 通过本文已了解 MySQL数据同步ES基本过程以及核心实现 技术改变世界!!!

如果这篇【文章】有帮助到你,希望可以给【Bug 终结者】点个赞👍,创作不易,如果有对【后端技术】、【前端领域】感兴趣的小可爱,也欢迎关注❤️❤️❤️ 【Bug 终结者】❤️❤️❤️,我将会给你带来巨大的【收获与惊喜】💝💝💝!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/534481.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

R语言绘制一次和二次相关性热图

在数据探索的过程中&#xff0c;我们往往会对数据与数据的相关性进行分析&#xff0c;例如我们常用的corrplot包&#xff0c;或者psych包中的corr.test函数&#xff0c;对两两变量间的相关性进行分析。我们常常会看到这样的相关性热图&#xff1a; 但有时变量间的关系并非线性…

在线课程平台LearnDash评测 – 最佳 WordPress LMS插件

在我的LearnDash评测中&#xff0c;我探索了流行的 WordPress LMS 插件&#xff0c;该插件以其用户友好的拖放课程构建器而闻名。我深入研究了各种功能&#xff0c;包括课程创建、测验、作业、滴灌内容、焦点模式、报告、分析和管理工具。 我的评测还讨论了套餐和定价选项&…

Linux基础指令补全,权限问题分析—3

一、命令补全&#xff1a; 1.bc指令&#xff1a; 功能&#xff1a;命令行计算器&#xff0c;使用quit退出语法&#xff1a;bc 算式 2.uname指令&#xff1a; 语法&#xff1a;uname 选项功能&#xff1a;uname原来获取电脑或操作系统的相关信息选项&#xff1a; ①-a选项&am…

深拷贝总结

JSON.parse(JSON.stringify(obj)) 这行代码的运行过程&#xff0c;就是利用 JSON.stringify 将js对象序列化&#xff08;JSON字符串&#xff09;&#xff0c;再使用JSON.parse来反序列化&#xff08;还原&#xff09;js对象&#xff1b;序列化的作用是存储和传输。&#xff08…

自动驾驶_交通标志识别:各目标检测算法评测

自动驾驶|交通标志识别&#xff1a;各目标检测算法评测 论文题目&#xff1a;Evaluation of Deep Neural Networks for traffic sign detection systems 开源代码&#xff1a;https://github.com/aarcosg/traffic-sign-detection 附赠自动驾驶学习资料和量产经验&#xff1a;…

【Canvas与艺术】椭圆形五星环绕Premium Quality标志

【关键点】 绘制此标志最难在星星之间间隔整齐&#xff0c;我目前用的是类似四心定位法&#xff0c;用四条圆弧去拟近一个椭圆&#xff0c;这种方法需要不断调试&#xff0c;比较费工。 【成品】 【代码】 <!DOCTYPE html> <html lang"utf-8"> <me…

DCDC 5V2A电源升压芯片FP6276BXR-G1 FP6298XR-G1

一、FP6276BXR-G1 3.7v升5V2A同步升压输入电压:2.4V-4.5V FP6276B是一个具有PWM/PSM控制的电流模式增压直流-直流转换器。它的PWM电路内置40mΩ高侧开关和40mΩ低侧开关使该调节器高高效。内部补偿网络还将外部组件计数最小化到只有6个。一个内部的0.6V电压被连接到误差放大器…

你知道 Java 线程池的原理吗?

Java线程池是用于管理和复用线程的机制&#xff0c;它可以帮助开发者有效地管理线程的生命周期和资源&#xff0c;并提高应用程序的性能和稳定性。 1. 线程池概述 在计算机科学中&#xff0c;线程池是一种可用来执行异步任务的线程队列。它主要包含以下几个组成部分&#xff…

【YOLOv8】Yolov5和Yolov8网络结构的分析与对比

目录 一 YOLOv5 二 YOLOv8 yolo通常采用backbone-neck-head的网络结构。 Backbone 主要负责从输入图像中提取高层次的语义特征,常包含多个卷积层和池化层&#xff0c;构建了一个深层次的特征提取器。Neck通常用来进一步整合与调整backbone提取的特征&#xff0c;有利于将不同…

计算机网络——WEB服务器编程实验

实验目的 1. 处理一个 http 请求 2. 接收并解析 http 请求 3. 从服务器文件系统中获得被请求的文件 4. 创建一个包括被请求的文件的 http 响应信息 5. 直接发送该信息到客户端 具体内容 一、C 程序来实现 web 服务器功能。 二、用 HTML 语言编写两个 HTML文件&#xff0c;并…

深入OceanBase内部机制:系统架构与组件精讲

码到三十五 &#xff1a; 个人主页 心中有诗画&#xff0c;指尖舞代码&#xff0c;目光览世界&#xff0c;步履越千山&#xff0c;人间尽值得 ! 目录 1️⃣OceanBase 整体架构1.1 分区1.2 分片1.3 日志流1.4 对等节点1.5 多租户 2️⃣OceanBase 架构与组件详解2.1 存储层2.2 …

错误日志:解决在VScode中调试C++代码断点无效、断点错位的问题

问题可能原因有&#xff1a; 调试时断点无效&#xff0c;大概率是 CMakeLists.txt 设置成了 Release 模式&#xff1b;如果在 CMakeLists.txt 在设置成 Debug 以后&#xff0c;调试时能够停下来&#xff0c;但没在断点处停下&#xff0c;而是停在了别的地方&#xff0c;这就是…

解决solidworks electrical无法连接数据库

很多人在第一次安装电气软件的过程中&#xff0c;会遇到这样的一个问题&#xff1a;“无法连接至数据库&#xff0c;请检查连接参数”&#xff0c;相信很多人看到就感到非常的头疼。的确&#xff0c;对于我们专业人士来说&#xff0c;也是非常的难受&#xff0c;那怎么办呢&…

计算机毕业设计vue+PHP校园二手书交易系统_ij5dr

开发语言&#xff1a;php 后端框架&#xff1a;Thinkphp/Laravel 前端框架&#xff1a;vue.js 服务器&#xff1a;apache 数据库&#xff1a;mysql 运行环境:phpstudy/wamp/xammp等 基于vue框架的二手图书交易系统为当前传统管理模式提供了一个高效、便捷、信息化的解决方案&a…

【热门话题】OneFlow深度学习框架介绍

&#x1f308;个人主页: 鑫宝Code &#x1f525;热门专栏: 闲话杂谈&#xff5c; 炫酷HTML | JavaScript基础 ​&#x1f4ab;个人格言: "如无必要&#xff0c;勿增实体" 文章目录 OneFlow深度学习框架介绍引言一、OneFlow概述1.1 定位与起源1.2 核心特性数据流…

TQZC706开发板教程:在ZC706上运行AD9361

首先需要在github上下载两个文件&#xff0c;本例程用到的文件以及最终文件&#xff0c;我都会放在网盘里面&#xff0c;地址在本文的末尾&#xff0c;需要自行提取 在github上搜索hdl选择第一个-->选择版本-->我所使用的vivado是2018.3版本&#xff0c;所以这里我下载的…

JAVA面试八股文之数据库

MySQL面试题 MySQL 存储引擎架构了解吗&#xff1f;CHAR 和 VARCHAR 的区别是什么&#xff1f;索引是越多越好嘛&#xff1f;MySQL数据库中空值&#xff08;null&#xff09;和空字符串&#xff08;&#xff09;的区别&#xff1f;SQL 中 on 条件与 where 条件的区别&#xff1…

【Linux系列】如何确定当前运行的是 RHEL 9 还是 RHEL 8?

&#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学…

mysql8主从配置报错Authentication plugin ‘caching_sha2_password‘ reported error

错误信息&#xff1a; Error connecting to source slave192.168.2.177:3306. This was attempt 2/86400, with a delay of 60 seconds between attempts. Message: Authentication plugin caching_sha2_password reported error: Authentication requires secure connection.…

C/C++的内存管理

栈帧最主要的作用就是存储局部数据 C语言中动态内存管理方式 C语言动态内存管理 该篇详细的讲述了C语言动态内存管理的使用&#xff0c;不太懂的小伙伴可以去了解一下 C中动态内存管理方式 首先&#xff0c;C语言内存管理的方式在C中可以继续使用。但有些地方就无能为力而且使用…