2024.2.23 模拟实现 RabbitMQ —— 实现消费消息逻辑

目录

引言

函数式接口

消费者订阅消息 实现思路

关于消息确认


引言

函数式接口

  • Lambda 表达式的本质是匿名函数
  • Java 函数无法脱离类而存在,所以 Java 通过引入函数式接口以支持 Lambda 表达式

特性:

  1. 函数式接口为一个 interface 类
  2. 该类中有且仅有一个方法
  3. 该类需加上 @FunctionalInterface 注解

注意:

  • 上述三点其实就是 Lambda 的本质,即底层实现

消费者订阅消息 实现思路

1、让 broker server 把有哪些消费者管理好

  • 消费者调用 basicConsume 方法就是订阅某个指定队列的消息

注意:

  • 消费者是以队列为纬度订阅的
  • 一个队列可以有多个消费者

  • 约定 消费者之间按照 轮询 的方式进行消费

代码编写:

  • 定义一个 ConsumerEnv 类,用来描述一个消费者
  • 该类中也会包含一些消费者消费过程中用到的数据
import lombok.Data;

/*
* 表示一个消费者(完整的执行环境)
* */
@Data
public class ConsumerEnv {
    private String consumerTag;
    private String queueName;
    private boolean autoAck;
//    通过这个回调来处理收到的消息
    private Consumer consumer;

    public ConsumerEnv(String consumerTag, String queueName, boolean autoAck, Consumer consumer) {
        this.consumerTag = consumerTag;
        this.queueName = queueName;
        this.autoAck = autoAck;
        this.consumer = consumer;
    }
}
  • 给每个队列对象(MSGQueue 对象)添加属性 List,用于存储该队列的 消费者对象
//    当前队列都有哪些消费者订阅了
    private List<ConsumerEnv> consumerEnvList = new ArrayList<>();
//    记录当前取到了第几个消费者,方便实现轮询策略
    private AtomicInteger consumerSeq = new AtomicInteger(0);
//    添加一个新的订阅者
    public void addConsumerEnv(ConsumerEnv consumerEnv) {
            consumerEnvList.add(consumerEnv);
    }
//    订阅者的删除暂时不考虑
//    挑选一个订阅者,用来处理当前的消息 (按照轮询的方式)
    public ConsumerEnv chooseConsumer() {
        if(consumerEnvList.size() == 0) {
//            该队列没有人订阅
            return null;
        }
//        计算一下当前要取的元素的下标
        int index = consumerSeq.get() % consumerEnvList.size();
        consumerSeq.getAndDecrement();
        return consumerEnvList.get(index);
    }

2、消费者 订阅队列消息,并使用该消息完成明确好的业务逻辑

  • 所谓 消费者 消费消息,其实就是让线程池 执行对应消费者中的回调函数
  • 通过回调函数,将消息的内容通过参数传递
  • 回调函数中的内容由消费者编写,具体里面要干啥,取决于消费者自己的业务逻辑

代码编写:

  • 此处我们使用 函数式接口 的方式,让消费者在订阅消息时,明确使用该消息进行的业务逻辑是什么
import com.example.demo.mqserver.core.BasicProperties;

/*
* 只是一个单纯的函数式接口(回调函数),收到消息之后要处理消息时调用的方法
* */
@FunctionalInterface
public interface Consumer {
//    Delivery 的意思是 "投递",这个方法预期是在每次服务器收到消息之后,来调用
//    通过这个方法把消息推送给对应的消费者
//    (注意!!这里的方法名和参数,也都是参考 RabbitMQ 展开的)
    void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body);
}
  • 在 VirtualHost 中实现消费者订阅某个队列的消息
//    订阅消息
//    添加一个队列的订阅者,当队列收到消息之后,就要把消息推送给对应的订阅者
//    consumerTag:表示消费者的身份标识
//    autoAck:消息被消费完成后,应答的方式,为 true 自动应答,为 false 手动应答
//    consumer:是一个回调函数,此处类型设定成函数式接口,这样后续调用 basicConsume 并且传实参的时候,就可以写作 lambda 样子
    public boolean basicConsume(String consumerTag, String queueName, boolean autoAck, Consumer consumer) {
//        构造一个 ConsumerEnv 对象,把这个对应的队列找到,再把这个 Consumer 对象添加到该队列中
        queueName = virtualHostName + queueName;
        try {
            consumerManager.addConsumer(consumerTag,queueName,autoAck,consumer);
            System.out.println("[VirtualHost] basicConsume 成功! queueName = " + queueName);
            return true;
        }catch (Exception e) {
            System.out.println("[VirtualHost] basicConsume 失败! queueName = " + queueName);
            e.printStackTrace();
            return false;
        }
    }

3、队列收到消息,并将消息推送给订阅该队列的消费者

  • 为了能够让 线程池 知道要执行哪个回调函数及其参数中的 消息 来自于哪个队列
  • 我们定义一个单独的扫描线程,用于感知哪个队列收到了新消息

问题一:

  • 为啥搞了扫描线程,还要再搞个线程池呢?
  • 既让该扫描线程获取 消息和 消费者的回调函数,又让其执行回调函数不就行了?

回答:

  • 由于消费者编写的回调函数,具体是干啥的,我们并不知道
  • 如果是比较耗时的业务逻辑的话,此时仅由一个线程来完成上述这些操作,就可能周转不开了

问题二:

  • 当前有多个队列,但扫描线程就一个,扫描线程如何知道是哪个队列中 来了新消息呢?

方案一:

  • 直接让扫描线程不停的循环遍历所有对列,如果发现有新的元素就立即处理

方案二:

  • 引入一个阻塞队列,哪个队列新增了一个消息,就将哪个队列的名字放入 阻塞队列中
  • 此时 扫描线程 仅需要盯住这阻塞队列即可
  • 阻塞队列中队列名相当于 "令牌",扫描线程从阻塞队列中取队列名,进而再根据队列名,从对应的队列中取一个消息

回答:

  • 此处我们采用方案二!

代码编写:

  • 此处我们实现一个 ConsumerManager 类,用于实现消费消息的核心逻辑
import com.example.demo.common.Consumer;
import com.example.demo.common.ConsumerEnv;
import com.example.demo.common.MqException;
import com.example.demo.mqserver.VirtualHost;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

/*
* 通过这个类来实现消费消息的核心逻辑
* */
public class ConsumerManager {
//    持有上层的 VirtualHost 对象的引用,用来操作数据
    private VirtualHost parent;
//    指定一个线程池,负责执行具体的回调任务
    private ExecutorService workerPool = Executors.newFixedThreadPool(4);
//    存放令牌的阻塞队列
    private BlockingQueue<String> tokenQueue = new LinkedBlockingQueue<>();
//    扫描线程
    private Thread scannerThread = null;

    public ConsumerManager(VirtualHost p) {
        this.parent = p;

        scannerThread = new Thread(() -> {
            while (true) {
                try {
//                    1、拿到令牌
                    String queueName = tokenQueue.take();
//                    2、根据令牌找到队列
                    MSGQueue queue = parent.getMemoryDataCenter().getQueue(queueName);
                    if(queue == null) {
                        throw new MqException("[ConsumerManager] 取令牌后发现,该队列名不存在!queueName = " + queueName);
                    }
//                    3、从队列中消费消费一个消息
                    synchronized (queue) {
                        consumeMessage(queue);
                    }
                } catch (InterruptedException | MqException e) {
                    e.printStackTrace();
                }
            }
        });
//        把线程设为后台线程
        scannerThread.setDaemon(true);
        scannerThread.start();
    }

//    这个方法的调用时机就是发送消息的时候
    public void notifyConsume(String queueName) throws InterruptedException {
        tokenQueue.put(queueName);
    }

    public void addConsumer(String consumerTag, String queueName, boolean autoAck, Consumer consumer) throws MqException {
//        找到对应的队列
       MSGQueue queue = parent.getMemoryDataCenter().getQueue(queueName);
       if(queue == null) {
           throw new MqException("[ConsumerManager] 队列不存在! queueName = " + queueName);
       }
        ConsumerEnv consumerEnv = new ConsumerEnv(consumerTag,queueName,autoAck,consumer);
       synchronized (queue) {
           queue.addConsumerEnv(consumerEnv);
//           如果当前队列中已经有一些消息了,需要立即就消费掉
           int n = parent.getMemoryDataCenter().getMessageCount(queueName);
           for (int i = 0;i < n;i++) {
//               这个方法调用一次就消费一条消息
               consumeMessage(queue);
           }
       }
    }

    private void consumeMessage(MSGQueue queue) {
//        1、按照轮询的方式,找个消费者出来
        ConsumerEnv luckyDog = queue.chooseConsumer();
        if(luckyDog == null) {
//            当前队列没有消费者,暂时不消费,等后面有消费者出现再说
            return;
        }
//        2、从队列中取出一个消息
        Message message = parent.getMemoryDataCenter().pollMessage(queue.getName());
        if(message == null) {
//            当前队列中还没有消息,也不需要消费
            return;
        }
//        3、把消息带入到消费者的回调方法中,丢给线程池执行
        workerPool.submit(() -> {
            try {
//            1) 把消息放入待确认的集合中,这个操作必须要在执行回调之前
                parent.getMemoryDataCenter().addMessageWaitAck(queue.getName(), message);
//            2) 真正执行回调函数
                luckyDog.getConsumer().handleDelivery(luckyDog.getConsumerTag(),message.getBasicProperties(),message.getBody());
//            3) 如果当前是 "自动应答",就可以直接把消息删除了
//               如果当前是 "手动应答",则先不处理,交给后续消费者调用 basicAck 方法来处理
                if(luckyDog.isAutoAck()) {
//                    a.删除硬盘上的消息
                    if(message.getDeliverMode() == 2) {
                        parent.getDiskDataCenter().deleteMessage(queue,message);
                    }
//                    b.删除上面的待确认集合中的消息
                    parent.getMemoryDataCenter().removeMessageWaitAck(queue.getName(),message.getMessageId());
//                    c.删除内存中消息中心里的消息
                    parent.getMemoryDataCenter().removeMessage(message.getMessageId());
                    System.out.println("[ConsumerManager] 消息被成功消费 queueName = " + queue.getName());
                }
            }catch (Exception e) {
                e.printStackTrace();
            }
        });
    }
}

关于消息确认

  • 为了能够确保消息是被正确的消费掉了,我们需要引入 消息确认 机制
  • 即消费者的回调方法 顺利执行完(未抛异常啥的),那么这条消息的历史使命就算完成了,该消息也就可以被删除了
  • 消息确认机制 也就是为了保证 消息不丢失

具体思路

  1. 在真正执行回调之前,先将该消息放到 "待确认集合" 中,避免因为回调失败,导致消息的丢失
  2. 真正执行回调
  3. 当前消费者采取的是 autoAck=true 自动应答,就认为回调执行完毕不抛异常,就算消费成功,然后就可以删除消息了(硬盘、消息中心、待确认集合)
  4. 当前消费者采取的是 autoAck=false 手动应答,需要消费者这边,在自己的回调方法内部,显式调用 basicAck 这个核心 API

basicAck 代码编写:

public boolean basicAck(String queueName,String messageId) {
        queueName = virtualHostName + queueName;
        try {
//            1、获取到消息和队列
            Message message = memoryDataCenter.getMessage(messageId);
            if(message == null) {
                throw new MqException("[VirtualHost] 要确认的消息不存在!messageId = " + messageId);
            }
            MSGQueue queue = memoryDataCenter.getQueue(queueName);
            if(queue == null) {
                throw new MqException("[VirtualHost] 要确认的消息不存在!queueName = " + queueName);
            }
//            2、删除硬盘上的数据
            if(message.getDeliverMode() == 2) {
                diskDataCenter.deleteMessage(queue,message);
            }
//            3、删除消息中心的数据
            memoryDataCenter.removeMessage(messageId);
//            4、删除待确认的集合中的消息
            memoryDataCenter.removeMessageWaitAck(queueName,messageId);
            System.out.println("[VirtualHost] basicAck 成功!消息被确认成功!queueName = " + queueName
            + ", messageId = " + messageId);
            return true;
        }catch (Exception e) {
            System.out.println("[VirtualHost] basicAck 失败!消息确认失败!queueName = " + queueName
                    + ", messageId = " + messageId);
            e.printStackTrace();
            return false;
        }
    }

问题一:

  • 执行回调方法的过程中抛异常了会产生什么影响?

回答:

  • 当回调方法抛异常,后续逻辑便会执行不到,此时该消息就会始终在 待确认的集合中
  • RabbitMQ 的做法是另外搞一个扫描线程(其实 RabbitMQ 里面不叫线程,人家是叫线程,但是这个进程不是操作系统的进程,而是 erlang 中的概念)
  • 由该线程负责关注 待确认集合中,每个待确认的消息呆多久了,如果呆的时间超出了范围就会把这个消息放到一个特定的队列 "死信队列"
  • 当然,死信对列 也是程序员手动配置的,但此处我们并未实现 死信队列逻辑

问题二:

  • 执行回调过程中,broker server 崩溃了,其中的内存数据全没了,此时有什么影响?

回答:

  • 此时硬盘数据还是在的!
  • 正在消费的这个消息,在硬盘中仍然存在
  • broker server 重启之后,这个消息就又被加载回内存了,就像从来没有消费过一样
  • 消费者就有机会重新消费到这个消息
  • 当然重复消费的问题,应该由消费者的业务代码负责保证,broker server 管不了!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/408916.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【Python笔记-设计模式】代理模式

一、说明 代理模式是一种结构型设计模式&#xff0c;提供对象的替代品或其占位符。代理控制着对于原对象的访问&#xff0c;并允许在将请求提交给对象前后进行一些处理。 (一) 解决问题 控制对对象的访问&#xff0c;或在访问对象前增加额外的功能或控制访问 (二) 使用场景…

统信UOS系统窗口特效设置

原文链接&#xff1a;统信UOS系统设置窗口特效 在今天的技术分享中&#xff0c;我们将探讨如何在统信UOS系统上充分利用窗口特效来美化和提升用户界面的交互体验。统信UOS作为一款注重视觉体验和用户友好性的操作系统&#xff0c;提供了丰富的窗口特效设置&#xff0c;让用户可…

R语言入门笔记2.6

描述统计 分类数据与顺序数据的图表展示 为了下面代码便于看出颜色参数所对应的值&#xff0c;在这里先集中介绍&#xff0c; col1是黑色&#xff0c;2是粉红&#xff0c;3是绿色&#xff0c;4是天蓝&#xff0c;5是浅蓝&#xff0c;6是紫红&#xff0c;7是黄色&#xff0c;…

Go 利用上下文进行并发计算

关注公众号【爱发白日梦的后端】分享技术干货、读书笔记、开源项目、实战经验、高效开发工具等&#xff0c;您的关注将是我的更新动力&#xff01; 在Go编程中&#xff0c;上下文&#xff08;context&#xff09;是一个非常重要的概念&#xff0c;它包含了与请求相关的信息&…

Bluejay电调固件修改自检音乐、自定义启动音乐旋律

Bluejay电调固件修改自检音乐、自定义启动音乐旋律 Bluejay电调固件基本介绍Bluejay电调固件特点修改自检音乐、启动音乐旋律准备材料修改过程 Bluejay固件旋律音乐格式开头部分音符部分 收集到的音乐代码 Bluejay电调固件基本介绍 Bluejay是一种数字电调固件&#xff0c;用于控…

Stable Diffusion 3 发布及其重大改进

1. 引言 就在 OpenAI 发布可以生成令人瞠目的视频的 Sora 和谷歌披露支持多达 150 万个Token上下文的 Gemini 1.5 的几天后&#xff0c;Stability AI 最近展示了 Stable Diffusion 3 的预览版。 闲话少说&#xff0c;我们快来看看吧&#xff01; 2. 什么是Stable Diffusion…

运维SRE-08 网络基础与进阶

今日内容 - **定时备份案例进阶.** - **定时巡检(检查系统基础指标),写入到文件中.** - 网络(抽象) 掌握与吸收时间: 直到课程结束.(第2阶段结束) - 网络基础: 网络概述,网络结构,网络设备. - 网络核心: OSI7层模型 ※※※※※※TCP/IP 3次握手 ※※※※※※TCP/IP 4…

Django入门指南:从环境搭建到模型管理系统的完整教程

环境安装&#xff1a; ​ 由于我的C的Anaconda 是安装在C盘的&#xff0c;但是没内存了&#xff0c;所有我将环境转在e盘&#xff0c;下面的命令是创建环境到指定目录中. conda create --prefixE:\envs\dj42 python3.9进入环境中&#xff1a; conda activate E:\envs\dj42…

【并发】CAS原子操作

1. 定义 CAS是Compare And Swap的缩写&#xff0c;直译就是比较并交换。CAS是现代CPU广泛支持的一种对内存中的共享数据进行操作的一种特殊指令&#xff0c;这个指令会对内存中的共享数据做原子的读写操作。其作用是让CPU比较内存中某个值是否和预期的值相同&#xff0c;如果相…

C#与VisionPro联合开发——串口通信

串口通信 串口通信是一种常见的数据传输方式&#xff0c;通过串行接口&#xff08;串口&#xff09;将数据以串行比特流的形式进行传输。在计算机和外部设备之间&#xff0c;串口通信通常是通过串行通信标准&#xff08;如RS-232&#xff09;来实现的。串口通信可以用于连接各…

AtCoder ABC342 A-D题解

华为出的比赛&#xff1f; 好像是全站首个题解哎&#xff01; 比赛链接:ABC342 Problem A: 稍微有点含金量的签到题。 #include <bits/stdc.h> using namespace std; int main(){string S;cin>>S;for(int i0;i<s.size();i){if(count(S.begin(),S.end(),S[i…

《穿越火线:枪战王者》手游客户端技术方案: 实时同步与手感优化 转载

一、项目背景 CF手游的团队有着相当丰富的FPS游戏制作经验&#xff0c;但是移动端开发经验相对匮乏。团队面对的挑战很大&#xff0c;我们需要在手机端完美还原CF十多个游戏模式&#xff0c;上百把枪械手感。 虽然我们有实时对战FPS游戏开发经验&#xff0c;但是手游网络质量…

H5获取手机相机或相册图片两种方式-Android通过webview传递多张照片给H5

需求目的&#xff1a; 手机机通过webView展示H5网页&#xff0c;在特殊场景下&#xff0c;需要使用相机拍照或者从相册获取照片&#xff0c;上传后台。 完整流程效果&#xff1a; 如下图 一、H5界面样例代码 使用html文件格式&#xff0c;文件直接打开就可以展示布局&#…

从源码学习单例模式

单例模式 单例模式是一种设计模式&#xff0c;常用于确保一个类只有一个实例&#xff0c;并提供一个全局访问点。这意味着无论在程序的哪个地方&#xff0c;只能创建一个该类的实例&#xff0c;而不会出现多个相同实例的情况。 在单例模式中&#xff0c;常用的实现方式包括懒汉…

【C语言】linux内核ipoib模块 - ipoib_send

一、中文注释 int ipoib_send(struct net_device *dev, struct sk_buff *skb,struct ib_ah *address, u32 dqpn) {struct ipoib_dev_priv *priv ipoib_priv(dev); // 获取IPoIB设备的私有数据struct ipoib_tx_buf *tx_req; // 发送请求结构体int hlen, rc; // 分别为头部长度…

安装 WSL 报错 Error code: Wsl/WININET_E_NAME_NOT_RESOLVED 问题解决

问题描述 在执行 wsl --install 安装Windows子系统Linux WSL (Windows Subsystem for Linux) 时报错&#xff1a; 无法从“https://raw.githubusercontent.com/microsoft/WSL/master/distributions/DistributionInfo.json”中提取列表分发。无法解析服务器的名称或地址 Error…

如何在本地电脑部署HadSky论坛并发布至公网可远程访问【内网穿透】

文章目录 前言1. 网站搭建1.1 网页下载和安装1.2 网页测试1.3 cpolar的安装和注册 2. 本地网页发布2.1 Cpolar临时数据隧道2.2 Cpolar稳定隧道&#xff08;云端设置&#xff09;2.3 Cpolar稳定隧道&#xff08;本地设置&#xff09;2.4 公网访问测试 总结 前言 经过多年的基础…

2000-2022年上市公司全要素生产率测算数据合计(原始数据+计算代码+结果)(LP法+OLS法+GMM法+固定效应法)

2000-2022年上市公司全要素生产率测算数据合计&#xff08;原始数据计算代码结果&#xff09;&#xff08;LP法OLS法GMM法固定效应法&#xff09; 1、时间&#xff1a;2000-2022年 2、范围&#xff1a;上市公司 3、指标&#xff1a;证券代码、证券简称、统计截止日期、固定资…

怎么自学python,大概要多久?python多久上手?

无限时长~~~~技术不断在更新&#xff0c;你的自学不也需要一直进行吗&#xff1f; 但如果是问&#xff1a;自学多长时间可以入门&#xff1f;或者可以找到工作&#xff1f;那我可以告诉你答案。 从零基础开始自学Python&#xff0c;依照每个人理解能力的不同&#xff0c;大致…

免费的WP模板网站推荐

免费wordpress模板下载 高端大气上档次的免费wordpress主题&#xff0c;首页大图全屏显示经典风格的wordpress主题。 https://www.wpniu.com/themes/289.html wordpress免费企业主题 深蓝色经典实用的wordpress网站模板&#xff0c;用wordpress免费企业主题搭建网站。 http…