参考RabbitMQ实现一个消息队列

文章目录

  • 前言
  • 小小消息管家
    • 1.项目介绍
    • 2. 需求分析
      • 2.1 API
      • 2.2 消息应答
      • 2.3 网络通信协议设计
    • 3. 开发环境
    • 4. 项目结构介绍
      • 4.1 配置信息
    • 5. 项目演示

前言

消息队列的本质就是阻塞队列,它的最大用途就是用来实现生产者消费者模型,从而实现解耦合以及削峰填谷

在分布式系统中不再是单个服务器而是服务器“集群”,如果我们我们直接A服务器给B服务器发送请求,B服务器给A服务器返回响应,这样的话我们AB的耦合较大,如果A或者B服务器挂了,我们业务也就崩溃了。引入消息队列之后我们将请求和响应都通过消息队列这个中间人来传递,就降低了耦合度。

同样的,如果我们AB服务器直接进行通信,如果A服务器突然发送许多请求,我们B服务器也会收到巨多请求的影响,AB由于硬件资源限制可能都会崩溃。如果我们引入消息队列,消息队列可以将许多请求接收存储下来,B服务器依然可以按照原有节奏取请求,不会一下子接收大量请求。这也就是我们所说的削峰填谷,也是类似的,就算请求过少,我们的服务器依然可以从消息队列中取出挤压得请求。

在分布式系统中,跨主机之间使用生产者消费者模型就显得非常重要了。 我们通常把阻塞队列封装成⼀个独立的服务器程序,并且新增一些功能。这样的程序我们就称为 消息队列

所以此次就仿照市面上得RabbitMQ实现一个简易的消息队列。

小小消息管家

1.项目介绍

将消息队列分成服务器模块、客户端模块、公共模块来实现。

  1. 服务器模块通过虚拟主机实现交换机、队列、绑定、消息等相关操作的隔离。虚拟主机主要负责对上述内容的数据进行硬盘(数据库和文件)以及内存管理、消息的三种转发方式如Direct、Fanout、Topic、为提供客户端API的实现。
  2. 客户端模块:实现连接管理提供建立连接、信道管理通过信道实现TCP连接的复用。在信道管理中实现客户端调用的API。
  3. 公共模块:约定客户端服务器的通信协议、数据传输过程中的序列化以及反序列化。

在这里插入图片描述

2. 需求分析

由于是实现一个生产者消费者模型,在消息队列中我们要实现的逻辑如下:我们的生产者客户端向服务器发送消息,消费者客户端向服务器订阅消息,服务器负责消息的存储和转发。

在这里插入图片描述

概念解读:

  • 虚拟主机 (VirtualHost):是⼀个逻辑上的集合,⼀个 BrokerServer 上可以存在多个 VirtualHost。
  • 交换机 (Exchange):生产者把消息先发送到 BrokerServer 的 Exchange 上。 再根据不同的规则把消息转发给不同的 Queue。
  • 队列 (Queue):真正用来存储消息,每个消费者决定自己从哪个 Queue 上读取消息。
  • 绑定 (Binding):Exchange 和 Queue 之间的关联关系。Exchange 和 Queue 可以理解成 “多对多” 关
    系。
  • 消息 (Message): 传递的内容。

2.1 API

我们的服务器提供以下API是西安消息队列的基本功能。

  1. 创建队列 (queueDeclare)
  2. 销毁队列 (queueDelete)
  3. 创建交换机 (exchangeDeclare)
  4. 销毁交换机 (exchangeDelete)
  5. 创建绑定 (queueBind)
  6. 解除绑定 (queueUnbind)
  7. 发布消息 (basicPublish)
  8. 订阅消息 (basicConsume)
  9. 确认消息 (basicAck)

我们的客户端提供以下API供客户使用消息队列,为了复用TCP连接,我们提供了一个Channel逻辑通道。所以在客户端我们还需要提供Channel的创建和关闭:

  1. 创建 Connection
  2. 关闭 Connection
  3. 创建 Channel
  4. 关闭 Channel
  5. 创建队列 (queueDeclare)
  6. 销毁队列 (queueDelete)
  7. 创建交换机 (exchangeDeclare)
  8. 销毁交换机 (exchangeDelete)
  9. 创建绑定 (queueBind)
  10. 解除绑定 (queueUnbind)
  11. 发布消息 (basicPublish)
  12. 订阅消息 (basicConsume)
  13. 确认消息 (basicAck)

2.2 消息应答

被消费者消费的消息,需要进行应答来确定我们消费者正确消费了消息。我们设置两种应答模式:

  1. 自动应答:消费者只要消费了消息,就算应答完毕。Broker直接删除这个消息。
  2. 手动应答:消费者手动调用应答接口(确认消息),Broker收到应答请求后删除这个消息。

2.3 网络通信协议设计

我们使用TCP 协议,来作为通信的底层协议。在这个基础上自定义应用层协议,实现客户端对服务器提供的功能远程调用。所以我们在协议中约定type标记调用的功能,length为消息的长度,payload为消息的具体内容。

请求和响应的格式如下:
在这里插入图片描述

其中 type取值如下:
• 0x1 创建 channel
• 0x2 关闭 channel
• 0x3 创建 exchange
• 0x4 销毁 exchange
• 0x5 创建 queue
• 0x6 销毁 queue
• 0x7 创建 binding
• 0x8 销毁 binding
• 0x9 发送 message
• 0xa 订阅 message
• 0xb 返回 ack
• 0xc 服务器给客户端推送的消息(被订阅的消息)

其中 payload 部分,会根据不同的 type,存在不同的格式。对于请求来说, payload 表示这次方法调用的各种参数信息,我们定义对应的类实现。
对于响应,payload 表示这次方法调用的返回值。

3. 开发环境

数据库:SQLite
开发语言:Java
技术框架:SpringBoot、SpringMVC、Mybatis
管理工具:Maven
开发工具:Intellij IDEA 2020.1.4
操作系统:Windows10

4. 项目结构介绍

在这里插入图片描述

  • common 包中约定通信协议包括请求响应格式以及不同的payload对应的数据格式;创建自定义异常类;实现序列化反序列化;定义消费者以及处理消息调用的函数接口。
  • demo 创建了一个消费者和生产者用于测试项目。
  • mqclient 客户端模块,提供创建连接的工厂类;定义完整连接的内容;定义channel实现客户端api
  • mqserver.core 定义了交换机、队列、消息、交换机类型、转发规则、消费消息的逻辑。
  • mqserver.datacenter 定义了交换机、队列、消息、绑定的存储以及管理。
  • mqserver.mapper:实现对sqlite数据库的操作。

4.1 配置信息

由于我们对于数据库的存储只涉及一小部分,所以此处我们利用sqlite进行数据管理。

pom.xml:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.14</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>mq</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>mq</name>
    <description>mq</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>2.3.1</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.xerial/sqlite-jdbc -->
        <dependency>
            <groupId>org.xerial</groupId>
            <artifactId>sqlite-jdbc</artifactId>
            <version>3.41.0.1</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter-test</artifactId>
            <version>2.3.1</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

application.yml: 配置数据库信息
在这里插入图片描述

5. 项目演示

创建一个生产者客户端向服务器发送一个消息:

package com.example.mq.demo;

import com.example.mq.mqclient.Channel;
import com.example.mq.mqclient.Connection;
import com.example.mq.mqclient.ConnectionFactory;
import com.example.mq.mqserver.core.ExchangeType;

import java.io.IOException;

/**
 * 这个类用来表示一个生产者.
 *  通常这是一个单独的服务器程序.
 */
public class DemoProducer {
    public static void main(String[] args) throws IOException, InterruptedException {
        System.out.println("启动生产者");
        //创建一个连接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(9090);

        //得到逻辑链接,这个就类似于socket
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 通过逻辑连接创建交换机和队列
        channel.exchangeDeclare("testExchange", ExchangeType.DIRECT, true, false, null);
        channel.queueDeclare("testQueue", true, false, false, null);

        // 创建一个消息并发送
        byte[] body = "hello 欢迎消费消息".getBytes();
        boolean ok = channel.basicPublish("testExchange", "testQueue", null, body);
        System.out.println("消息投递完成! ok=" + ok);

        Thread.sleep(500);
        channel.close();
        connection.close();
    }
}

创建一个消费者客户端,订阅消费消息

package com.example.mq.demo;

import com.example.mq.common.Consumer;
import com.example.mq.common.MqException;
import com.example.mq.mqclient.Channel;
import com.example.mq.mqclient.Connection;
import com.example.mq.mqclient.ConnectionFactory;
import com.example.mq.mqserver.core.BasicProperties;
import com.example.mq.mqserver.core.ExchangeType;

import java.io.IOException;

/**
 * @author zq
 * @date 2023-08-05 19:29
 */

/*
 * 这个类表示一个消费者.
 * 通常这个类也应该是在一个独立的服务器中被执行
 */
public class DemoConsumer {
    public static void main(String[] args) throws IOException, MqException, InterruptedException {
        System.out.println("启动消费者!");
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(9090);

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare("testExchange", ExchangeType.DIRECT, true, false, null);
        channel.queueDeclare("testQueue", true, false, false, null);

        //订阅消息,并定义如何消费消息
        channel.basicConsume("testQueue", true, new Consumer() {
            @Override
            public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {
                System.out.println("[消费数据] 开始!");
                System.out.println("consumerTag=" + consumerTag);
                System.out.println("basicProperties=" + basicProperties);
                String bodyString = new String(body, 0, body.length);
                System.out.println("body=" + bodyString);
                System.out.println("[消费数据] 结束!");
            }
        });

        // 通过这个循环模拟一直等待消费完生产者生产的所有消息
        while (true) {
            Thread.sleep(500);
        }
    }
}

实现结果如下 :

在这里插入图片描述
在这里插入图片描述

通过结果我们可以看出,我们消费者成功取出订阅的队列中的消息进行消费

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/64348.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

docker安装neo4j

参考文章&#xff1a; 1、Mac 本地以 docker 方式配置 neo4j_neo4j mac docker_Abandon_first的博客-CSDN博客 2、https://www.cnblogs.com/caoyusang/p/13610408.html 安装的时候&#xff0c;参考了以上文章。遇到了一些问题&#xff0c;记录下自己的安装过程&#xff1a; …

在centos7下通过docker 安装onlyoffice

因为需要调试网盘&#xff0c;所以今天安装一下centos7的onlyoffice 官方介绍如下&#xff1a; 为了方便&#xff0c;还是通过docker方式来安装onlyoffice了&#xff0c;这里我们采用社区版本了。 1、下载docker安装包 如下&#xff1a; docker pull onlyoffice/documentserv…

桌面端UI自动化测试如何让SplitButtonControl展开

原始SplitButtonControl图 从图中鼠标所指的控件属性为&#xff1a; ControlType&#xff08;控件类型&#xff09;: SplitButtonControl ClassName&#xff08;类名&#xff09;: SplitButton AutomationId&#xff08;自动化ID&#xff09;: esri_geoprocessing_Pyt…

m3u8怎么变成本地视频?一个小妙招教你轻松搞定

m3u8视频是一种流媒体视频格式&#xff0c;它将整个视频分成多个小文件&#xff0c;每个小文件的长度通常为几秒钟。这些小文件存储在服务器上&#xff0c;并通过网络传输到观众的设备上。当观众观看视频时&#xff0c;视频播放器会按照正确的顺序下载和播放这些小文件&#xf…

【Hystrix技术指南】(4)故障切换的运作流程

[每日一句] 也许你度过了很糟糕的一天&#xff0c;但这并不代表你会因此度过糟糕的一生。 [背景介绍] 分布式系统的规模和复杂度不断增加&#xff0c;随着而来的是对分布式系统可用性的要求越来越高。在各种高可用设计模式中&#xff0c;【熔断、隔离、降级、限流】是经常被使…

uni-app离线打包高德地图导入android studio不能正常显示

本人使用的uni-app SDK版本&#xff1a;Android-SDK3.8.7.81902_20230704 1.导入以上文件&#xff0c;依赖已经自动添加了 2.确保这个正常引入 3.修改AndroidMainifest.xml,添加自己的密钥

flask------消息闪现 flash

1介绍 flask提供了一个非常有用的flash()函数&#xff0c;它可以用来“闪现”需要提示给用户的消息&#xff0c;比如当用户登录成功后显示“欢迎回来&#xff01;”。在视图函数调用flash()函数&#xff0c;传入消息内容&#xff0c;flash&#xff08;&#xff09;函数把消息存…

win10 2022unity设置中文

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言解决方法 前言 在Edit->preferences里找不到language选项。 解决方法 【1】打开下面地址 注意 :把{version}换成你当前安装的版本&#xff0c;比如说如果…

textarea 标签如何创建多行文本输入框?

聚沙成塔每天进步一点点 ⭐ 专栏简介⭐ textarea 的写法⭐ 代码含义⭐ 写在最后 ⭐ 专栏简介 前端入门之旅&#xff1a;探索Web开发的奇妙世界 记得点击上方或者右侧链接订阅本专栏哦 几何带你启航前端之旅 欢迎来到前端入门之旅&#xff01;这个专栏是为那些对Web开发感兴趣、…

Harbor企业镜像仓库部署(本地)

简述&#xff1a; Docker 官方镜像仓库是用于管理公共镜像的地方&#xff0c;大家可以在上面找到想要的镜像&#xff0c;也可以把自己的镜像推送上去。但是有时候服务器无法访问互联网&#xff0c;或者不希望将自己的镜像放到互联网上&#xff0c;那么就需要用到 Docker Regis…

通过easyui实现动态控制表格字段显示、导出表格数据

前言 学过layui前端框架的都知道&#xff0c;layui默认帮我们实现了控制表格字段显示以及数据的导出功能。 1、控制表格字段显示 2、数据导出 3、导出为pdf&#xff1a;导出按钮的右边那个按钮就是打印pdf的 那么&#xff0c;easyui要怎么实现这些功能呢&#xff1f;这篇文章就…

深度学习中的优化算法

文章目录 前言一、优化和深度学习1.1 优化的目标1.2 深度学习中的优化挑战1.2.1 局部最小值1.2.2 鞍点1.2.3 梯度消失 二、梯度下降2.1 一维梯度下降2.1.1 学习率 2.2 多元梯度下降2.3 自适应方法2.3.1 牛顿法2.3.2 其他自适应方法 三、随机梯度下降3.1 随机梯度更新3.2 动态学…

了解Swarm 集群管理

Swarm 集群管理 简介 Docker Swarm 是 Docker 的集群管理工具。它将 Docker 主机池转变为单个虚拟 Docker 主机。 Docker Swarm 提供了标准的 Docker API&#xff0c;所有任何已经与 Docker 守护程序通信的工具都可以使用 Swarm 轻松地扩展到多个主机。 支持的工具包括但不限…

ubuntu服务器配置ftp服务

需求&#xff1a;配置ftp服务用于在windows电脑上直接浏览、下载、上传ubuntu服务器上的文件&#xff0c;用于文件共享&#xff0c;方便实用 效果&#xff1a;用户打开windows资源管理器后输入ftp://xxx.xxx.xxx.xxx &#xff08;公网IP地址&#xff09;后&#xff0c;即可浏览…

linux onlyOffice docker 离线部署

文章目录 前言1. 安装Docker容器2. 拉取镜像3. 验证 前言 docker 离线安装onlyoffice&#xff0c;如在线安装可直接跳过导出导入镜像步骤&#xff0c;拉取后直接运行。 1. 安装Docker容器 下载文件 wget https://download.docker.com/linux/static/stable/x86_64/docker-19…

在Linux中安装MySQL

在Linux中安装MySQL 检测当前系统中是否安装MySQL数据库 命令作用rpm -qa查询当前系统中安装的所有软件rpm -qa|grep mysql查询当前系统中安装的名称带mysql的软件rpm -qa | grep mariadb查询当前系统中安装的名称带mariadb的软件 RPM ( Red-Hat Package Manager )RPM软件包管理…

[golang gin框架] 45.Gin商城项目-微服务实战之后台Rbac微服务之角色权限关联

角色和权限的关联关系在前面文章中有讲解,见[golang gin框架] 14.Gin 商城项目-RBAC管理之角色和权限关联,角色授权,在这里通过微服务来实现角色对权限的授权操作,这里要实现的有两个功能,一个是进入授权,另一个是,授权提交操作,页面如下: 一.实现后台权限管理Rbac之角色权限关…

VSCode C/C++ 分目录编译配置

分目录编译配置记录 launch.json文件 注释处为修改内容 {// 使用 IntelliSense 了解相关属性。 // 悬停以查看现有属性的描述。// 欲了解更多信息&#xff0c;请访问: https://go.microsoft.com/fwlink/?linkid830387"version": "0.2.0","configur…

【JavaEE进阶】Spring创建与使用

文章目录 一. 创建 Spring 项目1.1 创建一个Maven项目1.2 添加Spring依赖1.4. 创建一个启动类 二. 将 Bean 对象存放至 Spring 容器中三. 从 Spring 容器中读取到 Bean1. 得到Spring对象2. 通过Spring 对象getBean方法获取到 Bean对象【DI操作】 一. 创建 Spring 项目 接下来使…

嵌入式开发学习(STC51-9-led点阵)

内容 点亮一个点&#xff1b; 显示数字&#xff1b; 显示图像&#xff1b; LED点阵简介 LED 点阵是由发光二极管排列组成的显示器件 通常应用较多的是8 * 8点阵&#xff0c;然后使用多个8 * 8点阵可组成不同分辨率的LED点阵显示屏&#xff0c;比如16 * 16点阵可以使用4个8 *…