2024.2.29 模拟实现 RabbitMQ —— 项目展示

目录

项目介绍

核心功能

核心技术

演示直接交换机

演示扇出交换机

演示主题交换机


项目介绍

  • 此处我们模拟 RabbitMQ 实现了一个消息队列服务器

核心功能

  • 提供了 虚拟主机交换机队列绑定消息 概念的管理
  • 九大核心 API 创建队列销毁队列创建交换机销毁交换机创建绑定解除绑定发布消息订阅消息确认消息
  • 实现了三种典型的消息转换方式 直接交换机(Direct)扇出交换机(Fanout)主题交换机(Topic)
  • 交换机队列绑定 使用 SQLite 数据库持久化,消息 使用文件持久化
  • 基于 TCP + 自定义应用层协议 实现生产者/消费者和 Broker Server 之间的交互工作

核心技术

  • Spring Boot / MyBatis / Lombok
  • SQLite
  • TCP

  • 关于该项目的需求分析,可点击下方链接跳转

模拟实现 RabbitMQ —— 需求分析


  • 关于该项目的核心类,可点击下方链接跳转

模拟实现 RabbitMQ —— 实现核心类


  • 关于该项目的数据库操作,可点击下方链接跳转

模拟实现 RabbitMQ —— 数据库操作


  • 关于该项目的消息持久化,可点击下方链接跳转

模拟实现 RabbitMQ —— 消息持久化


  • 关于该项目的内存数据管理,可点击下方链接跳转

模拟实现 RabbitMQ —— 内存数据管理


  • 关于该项目的虚拟机设计,可点击下方链接跳转

模拟实现 RabbitMQ —— 虚拟主机设计


  • 关于该项目的交换机转发规则,可点击下方链接跳转

模拟实现 RabbitMQ —— 实现转发规则


  • 关于该项目的消费逻辑,可点击下方链接跳转

模拟实现 RabbitMQ —— 实现消费消息逻辑


  • 关于该项目网络通信设计,可点击下方链接跳转

模拟实现 RabbitMQ —— 网络通信设计(服务器)

模拟实现 RabbitMQ —— 网络通信设计(客户端)

演示直接交换机

  • 简单写一个 demo 模拟 跨主机的生产者消费者模型
  • 此处为了方便,就在本机演示

  • 此处我们创建的交换机类型为 直接交换机

1、在 Spring Boot 项目的启动类中创建 Broker Server,绑定端口并启动!

@SpringBootApplication
public class DemoApplication {
	public static ConfigurableApplicationContext context;

	public static void main(String[] args) throws IOException {
		context = SpringApplication.run(DemoApplication.class, args);

		BrokerServer brokerServer = new BrokerServer(9090);
		brokerServer.start();
	}
}

2、编写生产者代码

/*
* 这个类用来表示一个生产着
* 通常这是一个单独的服务器程序
* */
public class DemoProducer {
    public static void main(String[] args) throws IOException, InterruptedException {
        System.out.println("启动生产者!");
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(9090);

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

//        创建交换机和队列
        channel.exchangeDeclare("testExchange", ExchangeType.DIRECT,true,false,null);
        channel.queueDeclare("testQueue",true,false,false,null);

//        创建一个消息并发送
        byte[] body = "hello".getBytes();
        boolean ok = channel.basicPublish("testExchange","testQueue",null,body);
        System.out.println("消息投递完成! ok = " + ok);

        Thread.sleep(500);
        channel.close();
        connection.close();
    }
}

3、编写消费者代码

/*
* 这个类表示一个消费者
* 通常这个类也应该是在一个独立的服务器中被执行
* */
public class DemoConsumer {
    public static void main(String[] args) throws IOException, InterruptedException, MqException {
        System.out.println("启动消费者!");
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(9090);

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare("testExchange",ExchangeType.DIRECT,true,false,null);
        channel.queueDeclare("testQueue",true,false,false,null);

        channel.basicConsume("testQueue", true, new Consumer() {
            @Override
            public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {
                System.out.println("[消费数据] 开始!");
                System.out.println("consumerTag = " + consumerTag);
                System.out.println("basicProperties = " + basicProperties);
                String bodyString = new String(body,0,body.length);
                System.out.println("body = " + bodyString);
                System.out.println("[消费数据] 结束!");
            }
        });

//        由于消费者也不知道生产者要生产多少消息,就在这里通过这个循环模拟一直等待消费
        while (true) {
            Thread.sleep(500);
        }
    }
}

4、启动 Spring Boot 项目(启动 Broker Server)


5、运行消费者代码


6、运行生产者代码


7、继续观察消费者的控制台

演示扇出交换机

  • 此处我们创建的交换机类型为 扇出交换机

 1、编写生产者代码

/*
 * 这个类用来表示一个生产着
 * 通常这是一个单独的服务器程序
 * */
public class DemoProducer {
    public static void main(String[] args) throws IOException, InterruptedException {
        System.out.println("启动生产者!");
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(9090);

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
//        创建交换机
        channel.exchangeDeclare("testExchange", ExchangeType.FANOUT,true,false,null);
//        创建一个消息并发送
        byte[] body = "hello".getBytes();
        boolean ok = channel.basicPublish("testExchange","",null,body);
        System.out.println("消息投递完成! ok = " + ok);

        Thread.sleep(500);
        channel.close();
        connection.close();
    }
}

3、编写消费者A 代码

/*
 * 这个类表示一个消费者A
 * 通常这个类也应该是在一个独立的服务器中被执行
 * */
public class DemoConsumerA {
    public static void main(String[] args) throws IOException, InterruptedException, MqException {
        System.out.println("启动消费者!");
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(9090);

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
//        创建交换机
        channel.exchangeDeclare("testExchange",ExchangeType.FANOUT,true,false,null);
//        创建队列
        channel.queueDeclare("testQueue1",true,false,false,null);
//        设置绑定
        channel.queueBind("testQueue1","testExchange","");
//        订阅消息
        channel.basicConsume("testQueue1", true, new Consumer() {
            @Override
            public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {
                System.out.println("[testQueue1 消费数据] 开始!");
                System.out.println("consumerTag = " + consumerTag);
                System.out.println("basicProperties = " + basicProperties);
                String bodyString = new String(body,0,body.length);
                System.out.println("body = " + bodyString);
                System.out.println("[testQueue1 消费数据] 结束!");
            }
        });

//        由于消费者也不知道生产者要生产多少消息,就在这里通过这个循环模拟一直等待消费
        while (true) {
            Thread.sleep(500);
        }
    }
}

4、编写消费者B 代码

/*
 * 这个类表示一个消费者B
 * 通常这个类也应该是在一个独立的服务器中被执行
 * */
public class DemoConsumerB {
    public static void main(String[] args) throws IOException, InterruptedException, MqException {
        System.out.println("启动消费者!");
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(9090);

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
//        创建交换机
        channel.exchangeDeclare("testExchange", ExchangeType.FANOUT,true,false,null);
//        创建队列
        channel.queueDeclare("testQueue2",true,false,false,null);
//        设置绑定
        channel.queueBind("testQueue2","testExchange","");
//        订阅消息
        channel.basicConsume("testQueue2", true, new Consumer() {
            @Override
            public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {
                System.out.println("[testQueue1 消费数据] 开始!");
                System.out.println("consumerTag = " + consumerTag);
                System.out.println("basicProperties = " + basicProperties);
                String bodyString = new String(body,0,body.length);
                System.out.println("body = " + bodyString);
                System.out.println("[testQueue1 消费数据] 结束!");
            }
        });

//        由于消费者也不知道生产者要生产多少消息,就在这里通过这个循环模拟一直等待消费
        while (true) {
            Thread.sleep(500);
        }
    }
}

5、启动 Spring Boot 项目(启动 Broker Server)


6、运行消费者A 代码


7、运行消费者B 代码


8、运行生产者代码


9、继续观察消费者A 的控制台


10、继续观察消费者B 的控制台

演示主题交换机

  • 此处我们创建的交换机为 主题交换机

 1、编写生产者代码

/*
 * 这个类用来表示一个生产着
 * 通常这是一个单独的服务器程序
 * */
public class DemoProducer {
    public static void main(String[] args) throws IOException, InterruptedException {
        System.out.println("启动生产者!");
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(9090);

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

//        创建交换机和队列
        channel.exchangeDeclare("testExchange", ExchangeType.TOPIC,true,false,null);
        channel.queueDeclare("testQueue",true,false,false,null);

//        创建消息A 并发送
        byte[] body = "hello".getBytes();
        boolean ok = channel.basicPublish("testExchange","ccc.aaa.bbb",null,body);
        System.out.println("消息投递完成! ok = " + ok);

//        创建消息B 并发送
        body = "hi".getBytes();
        ok = channel.basicPublish("testExchange","aaa.bbb",null,body);
        System.out.println("消息投递完成! ok = " + ok);

        Thread.sleep(500);
        channel.close();
        connection.close();
    }
}

3、编写消费者代码

/*
 * 这个类表示一个消费者
 * 通常这个类也应该是在一个独立的服务器中被执行
 * */
public class DemoConsumer {
    public static void main(String[] args) throws IOException, InterruptedException, MqException {
        System.out.println("启动消费者!");
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(9090);

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
//        创建交换机
        channel.exchangeDeclare("testExchange", ExchangeType.TOPIC,true,false,null);
//        创建队列
        channel.queueDeclare("testQueue",true,false,false,null);
//        设置绑定
        channel.queueBind("testQueue","testExchange","*.aaa.bbb");
//        订阅消息
        channel.basicConsume("testQueue", true, new Consumer() {
            @Override
            public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {
                System.out.println("[消费数据] 开始!");
                System.out.println("consumerTag = " + consumerTag);
                System.out.println("basicProperties = " + basicProperties);
                String bodyString = new String(body,0,body.length);
                System.out.println("body = " + bodyString);
                System.out.println("[消费数据] 结束!");
            }
        });
//        由于消费者也不知道生产者要生产多少消息,就在这里通过这个循环模拟一直等待消费
        while (true) {
            Thread.sleep(500);
        }
    }
}

4、启动 Spring Boot 项目(启动 Broker Server)


5、运行消费者代码


6、运行生产者代码


7、继续观察消费者的控制台

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/412302.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

react useMemo 用法

1,useCallback 的功能完全可以由 useMemo 所取代,如果你想通过使用 useMemo 返回一个记忆函数也是完全可以的。 usecallback(fn,inputs)is equivalent to useMemo(()> fn, inputs). 区别是:useCallback不会执行第一个参数函数,而是将它返…

自定义Chrome的浏览器开发者工具DevTools界面的字体和样式

Chrome浏览器开发者工具默认的字体太小,想要修改但没有相关设置。 外观——字体可以自定义字体,但大小不可以调整。 github上有人给出了方法 整理为中文教程: 1.打开浏览器开发者工具,点开设置——实验,勾上红框设…

网络技术ensp 一个简单的交换机配置案例

由于工作调岗,转战网络运维了,第一次网络笔记 1.,目的:2台主机相互可以ping通,并且可以ping通网关地址,设备:2台主机,2台交换机 2网络拓扑图如下 3.主机pc1的配置信息 ip&#xff…

轻量级模型,重量级性能,TinyLlama、LiteLlama小模型火起来了,针对特定领域较小的语言模型是否与较大的模型同样有效?

轻量级模型,重量级性能,TinyLlama、LiteLlama小模型火起来了,针对特定领域较小的语言模型是否与较大的模型同样有效? 当大家都在研究大模型(LLM)参数规模达到百亿甚至千亿级别的同时,小巧且兼具高性能的小…

Springboot应用执行器Actuator源码分析

文章目录 一、认识Actuator1、回顾Actuator2、Actuator重要端点 二、源码分析1、Endpoint自动装配(1)自动配置入口(2)普通Endpoint自动装配(3)配置Web - Endpoint(4)注册Endpoint为M…

微信小程序-全局配置

个人笔记,仅供参考。 1.entryPagePath 代码: "entryPagePath": "pages/index/index" 具体用法: 2.pages 小程序中新增/减少页面,都需要对 pages 数组进行修改。 代码: "pages": [&…

设计模式系列文章-7个创建型模式更新已完结

其实从2019年开始就有些一套关于设计模式的系列文章,但是因为种种原因一直搁置到现在。直到2024年才又恢复更新。 24年1月份上旬一直在弄博客站:https://jaune162.blog 的搭建 24年1月份下旬弄专题站:https://books.jaune162.blog 的搭建。…

设计模式(十) - 工厂方式模式

前言 在此前的设计模式(四)简单工厂模式中我们介绍了简单工厂模式,在这篇文章中我们来介绍下工厂方法模式,它同样是创建型设计模式,而且又有些类似,文章的末尾会介绍他们之间的不同。 1.工厂方法模式简介 …

每日五道java面试题之spring篇(七)

目录: 第一题. 什么是Spring beans?第二题. 一个 Spring Bean 定义 包含什么?第三题. 如何给Spring 容器提供配置元数据?Spring有几种配置方式?第四题. Spring基于xml注入bean的几种方式?第五题:你怎样定义类的作用域…

性能优化问题思考总结

INP 是什么? Interaction to Next Paint (INP) INP是一项指标,通过观察用户在访问网页期间发生的所有点击、点按和键盘互动的延迟时间,评估网页对用户互动的总体响应情况。 互动是指在同一逻辑用户手势期间触发的一组事件处理脚本。例如&a…

酷开科技,让酷开系统成为现代生活的变革者

电视,从问世就一直受到人们的追捧。还记得小时候一家人围坐在电视机前的场景,小小的黑白屏幕,牢牢的吸引着大家的目光。随着科技的不断进步,我们的生活也发生了翻天覆地的变化。而电视,也从笨重的黑白电视变成了轻薄的…

jenkins + gitlab + nginx 自动部署(webhook)

一、意义 当代码仓库被更新时,Jenkins会自动拉取代码进行构建。 适用于测试环境 二、jenkins gitlab nginx 自动部署(webhook) 1.准备服务器 ①安装Jenkins(Java17,tomcat9) ②安装gitlab (16) ③…

深入理解Python中的JSON模块:基础大总结与实战代码解析【第102篇—JSON模块】

深入理解Python中的JSON模块:基础大总结与实战代码解析 在Python中,JSON(JavaScript Object Notation)模块是处理JSON数据的重要工具之一。JSON是一种轻量级的数据交换格式,广泛应用于Web开发、API通信等领域。本文将…

2023 re:Invent 用 Amazon Q 打造你的知识库

前言 随着 ChatGPT 的问世,我们迎来了许多创新和变革的机会。一年一度的亚马逊云科技大会 re:Invent 也带来了许多前言的技术,其中 Amazon CEO Adam Selipsky 在 2023 re:Invent 大会中介绍 Amazon Q 让我印象深刻,这预示着生成式 AI 的又一…

【wu-acw-client 使用】案例

wu-acw-client 使用 项目介绍,使用acw-client,创建对应Java项目的增删改查(ORM:Lazy ORM、mybatis),项目模块架构:mvc、feign、ddd 演示项目环境:idea 、mac、mysql、jdk17 spring …

geotools解析shp 提示 opengis.*.SimpleFeatureType‘ 不在其界限内

问题:( geotools.version:31-SNAPSHOT) 解析shp文件时提示类型SimpleFeatureType不在其界限内 解决: 在引用处将org.opengis.feature.simple.SimpleFeatureType 改为 org.geotools.api.feature.simple.SimpleFeatureType

qt-C++笔记之使用QProcess去执行一个可执行文件时指定动态库所存放的文件夹lib的路径

qt-C笔记之使用QProcess去执行一个可执行文件时指定动态库所存放的文件夹lib的路径 参考博文: 1.C笔记之执行一个可执行文件时指定动态库所存放的文件夹lib的路径 2.Linux笔记之LD_LIBRARY_PATH详解 3.qt-C笔记之使用QProcess去执行一个可执行文件时指定动态库所存放…

LeetCode 0938.二叉搜索树的范围和:深度优先搜索(可中序遍历)

【LetMeFly】938.二叉搜索树的范围和:深度优先搜索(可中序遍历) 力扣题目链接:https://leetcode.cn/problems/range-sum-of-bst/ 给定二叉搜索树的根结点 root,返回值位于范围 [low, high] 之间的所有结点的值的和。…

数一满分150分总分451东南大学920电子信息通信考研Jenny老师辅导班同学,真题大纲,参考书。

记录用来打破的,信息通信考研Jenny老师2024级辅导班同学,数一满分150分,专业课920专业基础综合143,总分451分,一位及其优秀的本科985报考东南大学信息学院的学生,东南大学920考研,东南大学信息科…

查看NGINX版本

查看Nginx版本有几种常用方法: 命令行: 在Linux或macOS系统中打开终端,然后输入以下命令之一: nginx -v 或者 nginx -V -v 参数将输出简短的nginx版本信息。 -V 参数(大写)将输出更详细的版本和配置信息&am…