消息队列 (9)-消费者核心类的实现

目录

  • 前言
  • 消费者类设计思路
    • 核心API
    • 总体代码

前言

我们上一篇博客,写了虚拟主机的实现, 在虚拟主机中需要用到俩个未实现的类,分别是验证绑定关键字和消费者类,接下来我们实现消费者类的核心代码

消费者类设计思路

在这个类中,首先我们要持有virtualHost对象来操作数据, 然后我们指定一个线程池负责具体的回调函数,通过一个扫描队列来不停的扫描所有的队列,看那个队列有新的消息,如果有就放到阻塞队列中去,消费者每次从阻塞队列中取出一个消息来响应。如果是多个消费者都订阅了一个消息,那么就使用轮询的方式来获取消息
在这里插入图片描述

核心API

属性

虚拟主机
线程池
阻塞 队列
扫描线程

方法
①往阻塞队列中添加消息

// 往阻塞队列中添加消息
    public void notifyConsume(String queueName) throws InterruptedException {
        tokenQueue.put(queueName);
    }

②订阅消息

我们的思路是,先找到对应的队列,然后去查看队列中是否有消息,如果有就要消费掉这些消息

 //添加订阅者
    public void addConsumer(String consumerTag, String queueName, boolean autoAck, Consumer consumer) throws MqException {
        // 1 找到对应的队列
        MSGQueue queue = virtuaHost.getMemoryDataCenter().getQueue(queueName);

        if (queue == null){
            throw new MqException("[ConsumerManager] 队列不存在! queueName=" + queueName);
        }
        ConsumEnv consumEnv = new ConsumEnv(consumerTag,queueName,autoAck,consumer);
        synchronized (queue){
            queue.addConsumEnv(consumEnv);
            // 如果当前队列中已经有了一些消息了, 需要立即就消费掉.
            int n = virtuaHost.getMemoryDataCenter().getMessageCount(queueName);
            for (int i = 0; i < n; i++) {
                // 这个方法调用一次就消费一条消息.
                consumeMessage(queue);
            }
        }
    }

③消费消息
关于消费消息,我们按照轮询的方式来依次消费

 // 消费消息
    private void consumeMessage(MSGQueue queue) {
        // 1. 按照轮询的方式, 找个消费者出来.
        ConsumEnv luckyDog = queue.chooseConsumEnv();
        if (luckyDog == null){
            // 说明没有消费者
            return;
        }
        // 2. 从队列中取出一个消息
        Message message = virtuaHost.getMemoryDataCenter().pollMessage(queue.getName());
        if (message == null){
            // 说明没有消息,不能消费
            return;
        }
        // 3. 把消息带入到消费者的回调方法中, 丢给线程池执行.
        workPool.submit(()->{
            try {
                //1,将消息放到待确认的集合中, 这个操作在回调函数之前
                virtuaHost.getMemoryDataCenter().addMessageWaitAck(queue.getName(),message);
                //2. 执行回调函数
                luckyDog.getConsumer().handleDelivery(luckyDog.getConsumerTag(),message.getBasicProperties(),message.getBody());
                System.out.println("[ConsumerManager] 消息被成功消费, queueName = "+queue.getName());
                //3. 如果是自动应答, 就可以之间删除消息
                // 如果是手动应答,  就先什么也不做
                if (luckyDog.isAutoAck()){
                    // 1删除硬盘上的消息
                    if (message.getDeliverMode() == 2){
                        virtuaHost.getDiskDataCenter().deleteMessage(queue,message);
                    }
                    //2 删除待确认的消息
                    virtuaHost.getMemoryDataCenter().removeMessageWaitAck(queue.getName(), message.getMessageId());
                    // 3 删除内存中的消息
                    virtuaHost.getMemoryDataCenter().removeMessage(message.getMessageId());
                    System.out.println("[ConsumerManager] 消息被成功消费! queueName=" + queue.getName());
                }
            } catch (IOException | ClassNotFoundException | MqException e) {
                e.printStackTrace();
            }
        });
    }

总体代码

package com.example.demo.mqServer.core;

import com.example.demo.Common.ConsumEnv;
import com.example.demo.Common.Consumer;
import com.example.demo.Common.MqException;
import com.example.demo.mqServer.VirtuaHost;

import java.io.IOException;
import java.util.concurrent.*;

/*
* 通过这个类, 来实现来实现消费者消费消息的核心功能
* */
public class ConsumerManager {
    // 持有上层对象 VirtualHost 调用 ,来操作数据
    private VirtuaHost virtuaHost;
    // 指定一个线程池, 负责执行具体的回调函数
    private ExecutorService workPool = Executors.newFixedThreadPool(4);
    //  存放令牌的队列  - 阻塞队列
    private BlockingDeque<String> tokenQueue = new LinkedBlockingDeque<>();
    //  扫描线程
    private Thread scannerThread = null;
    //
    public ConsumerManager(VirtuaHost virtuaHost) {
        this.virtuaHost = virtuaHost;
        scannerThread =new Thread(()->{
            while (true){
                try {
                    String queueName = tokenQueue.take();
                    MSGQueue queue = virtuaHost.getMemoryDataCenter().getQueue(queueName);
                    if (queue == null){
                        throw new MqException("[ConsumerManager] 取令牌后发现, 该队列名不存在! queueName=" + queueName);
                    }
                    synchronized (queue){
                        consumeMessage(queue);
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (MqException e) {
                    e.printStackTrace();
                }
            }
        });
        scannerThread.setDaemon(true);
        scannerThread.start();
    }
    // 往阻塞队列中添加消息
    public void notifyConsume(String queueName) throws InterruptedException {
        tokenQueue.put(queueName);
    }
    // 增加订阅
    public void addConsumer(String consumerTag, String queueName, boolean autoAck, Consumer consumer) throws MqException {
       // 先找到对应的队列
        MSGQueue queue = virtuaHost.getMemoryDataCenter().getQueue(queueName);
        if (queue == null){
            throw new MqException("[ConsumerManager] 队列不存在! queueName=" + queueName);
        }
        ConsumEnv consumEnv = new ConsumEnv(consumerTag,queueName,autoAck,consumer);
        synchronized (queue){
            queue.addConsumEnv(consumEnv);
            // 如果当前队列中已经有了一些消息了, 需要立即就消费掉.
            int n = virtuaHost.getMemoryDataCenter().getMessageCount(queueName);
            for (int i = 0; i < n; i++) {
                // 这个方法调用一次就消费一条消息.
                consumeMessage(queue);
            }
        }
    }
    private void consumeMessage(MSGQueue queue) {
        // 1. 按照轮询的方式, 找个消费者出来.
        ConsumEnv luckyDog = queue.chooseConsumEnv();
        if (luckyDog == null){
            // 说明没有消费者
            return;
        }
        // 2. 从队列中取出一个消息
        Message message = virtuaHost.getMemoryDataCenter().pollMessage(queue.getName());
        if (message == null){
            // 说明没有消息,不能消费
            return;
        }
        // 3. 把消息带入到消费者的回调方法中, 丢给线程池执行.
        workPool.submit(()->{
            try {
                //1,将消息放到待确认的集合中, 这个操作在回调函数之前
                virtuaHost.getMemoryDataCenter().addMessageWaitAck(queue.getName(),message);
                //2. 执行回调函数
                luckyDog.getConsumer().handleDelivery(luckyDog.getConsumerTag(),message.getBasicProperties(),message.getBody());
                System.out.println("[ConsumerManager] 消息被成功消费, queueName = "+queue.getName());
                //3. 如果是自动应答, 就可以之间删除消息
                // 如果是手动应答,  就先什么也不做
                if (luckyDog.isAutoAck()){
                    // 1删除硬盘上的消息
                    if (message.getDeliverMode() == 2){
                        virtuaHost.getDiskDataCenter().deleteMessage(queue,message);
                    }
                    //2 删除待确认的消息
                    virtuaHost.getMemoryDataCenter().removeMessageWaitAck(queue.getName(), message.getMessageId());
                    // 3 删除内存中的消息
                    virtuaHost.getMemoryDataCenter().removeMessage(message.getMessageId());
                    System.out.println("[ConsumerManager] 消息被成功消费! queueName=" + queue.getName());
                }
            } catch (IOException | ClassNotFoundException | MqException e) {
                e.printStackTrace();
            }
        });
    }


}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/65777.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Vue.js2+Cesium1.103.0 六、标绘与测量

Vue.js2Cesium1.103.0 六、标绘与测量 点&#xff0c;线&#xff0c;面的绘制&#xff0c;可实时编辑图形&#xff0c;点击折线或多边形边的中心点&#xff0c;可进行添加线段移动顶点位置等操作&#xff0c;并同时计算出点的经纬度&#xff0c;折线的距离和多边形的面积。 De…

问世28年经久不衰,大厂为何独爱这门技术?(文末送书5本)

&#x1f935;‍♂️ 个人主页&#xff1a;艾派森的个人主页 ✍&#x1f3fb;作者简介&#xff1a;Python学习者 &#x1f40b; 希望大家多多支持&#xff0c;我们一起进步&#xff01;&#x1f604; 如果文章对你有帮助的话&#xff0c; 欢迎评论 &#x1f4ac;点赞&#x1f4…

聚焦AIGC与大模型,和鲸ModelWhale荣登“2023数字生态500强”优秀案例解决方案榜单

8月4日&#xff0c;2023 数字生态大会在北京盛大举行&#xff0c;大会聚焦并锁定 AIGC 及大模型热点&#xff0c;以“ AIGC 新生态 数智新时代”为主题&#xff0c;由 B.P 商业伙伴联合盛景网联共同举办。 为深入发挥在产业领域的启迪借鉴价值作用&#xff0c;本次大会重磅发布…

专业商城财务一体化-线上商城+进销存管理软件,批发零售全行业免费更新

订货流程繁琐&#xff1f;订单处理效率低&#xff1f;小程序商城与进销存系统不打通&#xff1f;数据需要手动输入同步&#xff1f;财务与的结算对账需要大量手工处理&#xff1f;零售批发从业者&#xff0c;如何你也有以上烦恼&#xff0c;可以看看进销存小程序订货商城&#…

如何调教让chatgpt读取自己的数据文件(保姆级图文教程)

提示&#xff1a;如何调教让chatgpt读取自己的数据文件(保姆级图文教程) 文章目录 前言一、如何投喂自己的数据&#xff1f;二、调教步骤总结 前言 chatgpt提示不能读取我们提供的数据文件&#xff0c;我们应该对它进行调教。 一、如何投喂自己的数据&#xff1f; 让chatgpt读…

数据结构--BFS求最短路

数据结构–BFS求最短路 BFS求⽆权图的单源最短路径 注&#xff1a;⽆权图可以视为⼀种特殊的带权图&#xff0c;只是每条边的权值都为1 以 2 为 b e g i n 位置 以2为begin位置 以2为begin位置 代码实现 //求顶点u到其他顶点的最短路径 void BFS_MIN_Distance(Graph G, int u…

SDU Crypto School - 计算不可区分性1

Encryption: Computational security 1-4 主讲人&#xff1a;李增鹏&#xff08;山东大学&#xff09; 参考教材&#xff1a;Jonathan Katz, Yehuda Lindell, Introduction to Modern Cryptography - Principles and Protocols. 什么是加密 首先&#xff0c;加密方案的目的在于…

Electron+vue3项目使用SQLite3数据库

SQLite 是一个进程内的库&#xff0c;实现了自给自足的、无服务器的、零配置的、事务性的 SQL 数据库引擎。它是一个零配置的数据库&#xff0c;这意味着与其他数据库不一样&#xff0c;我们不需要在系统中配置。 就像其他数据库&#xff0c;SQLite 引擎不是一个独立的进程&am…

leetcode 738. 单调递增的数字

2023.8.4 这题用暴力法会超时&#xff0c;我就没试了&#xff0c;采用了个挺巧的方法&#xff0c;为了方便需要先将整数n转换为字符串的形式&#xff0c;然后从后向前遍历&#xff0c;当两个数字非递增时&#xff0c;将前一个数字--&#xff0c;后一个数字的位置记录在index中&…

Hybrid技术的下一站是什么?

Hybrid这个词&#xff0c;在App开发领域&#xff0c;相信大家都不陌生。Hybrid App是指介于web-app、native-app这两者之间的app&#xff0c;它虽然看上去是一个Native App&#xff0c;但只有一个UI WebView&#xff0c;里面访问的是一个Web App。Hybrid在移动领域的发展&#…

基于 CentOS 7 构建 LVS-DR 群集以及配置nginx负载均衡

目录 一、基于 CentOS 7 构建 LVS-DR 群集 1、前期准备 1、关闭防火墙 2、安装ifconfig 3、准备四台虚拟机 2、在DS上 2.1、配置LVS虚拟IP 2.2、手工执行配置添加LVS服务并增加两台RS 2.3、查看配置 3、在RS端&#xff08;第三台、第四台&#xff09; 上 3.1、配置W…

C数据结构与算法——二叉树 应用一

实验任务 (1) 掌握二叉树的二叉链表存储结构定义&#xff1b; (2) 掌握该存储方式下的二叉树基本算法&#xff1b; (3) 掌握三种遍历的递归算法。 实验内容 实现二叉链表存储结构及其基本算法算法简单应用 创建一颗二叉树的二叉链表输出该二叉树的三种遍历序列&#xff08;前…

flutter-第三方组件

卡片折叠 stacked_card_carousel 扫一扫组件 qr_code_scanner 权限处理组件 permission_handler 生成二维码组件 pretty_qr_code 角标组件 badges 动画组件 animations app更新 app_installer 带缓存的图片组件 cached_network_image 密码输入框 collection 图片保存 image_g…

前端安全XSS和CSRF讲解

文章目录 XSSXSS攻击原理常见的攻击方式预防措施 CSRFCSRF攻击原理常见攻击情景预防措施&#xff1a; CSRF和XSS的区别 XSS 全称Cross Site Scripting&#xff0c;名为跨站脚本攻击。为啥不是单词第一个字母组合CSS&#xff0c;大概率与样式名称css进行区分。 XSS攻击原理 不…

ppt压缩文件怎么压缩最小?文件压缩技巧分享

在日常的工作和学习中&#xff0c;难免会遇到PPT太大&#xff0c;需要将其压缩变小的情况&#xff0c;但很多朋友还不知道怎么压缩PPT文件&#xff0c;下面就给大家分享几个简单的方法&#xff0c;分分钟缩小过大的PPT文件。 一、PowerPoint PowerPoint就是微软公司的演示文稿…

无涯教程-Perl - gethostent函数

描述 此函数遍历主机文件中的条目。它在列表context中返回以下内容-($name,$aliases,$addrtype,$length,addrs) 语法 以下是此函数的简单语法- gethostent返回值 此函数在错误时返回undef,否则在scalrcontext中返回主机名,在错误时返回空列表,否则在列表context中返回主机…

基于CAS的单点登录实践之路

前言 上个月我负责的系统SSO升级&#xff0c;对接京东ERP系统&#xff0c;这也让我想起了之前我做过一个单点登录的项目。想来单点登录有很多实现方案&#xff0c;不过最主流的还是基于CAS的方案&#xff0c;所以我也就分享一下我的CAS实践之路。 什么是单点登录 单点登录的…

怎么进行流程图制作?用这个工具制作很方便

怎么进行流程图制作&#xff1f;流程图是一种非常有用的工具&#xff0c;可以帮助我们更好地理解和展示各种复杂的业务流程和工作流程。它可以将复杂的过程简化为易于理解的图形和文本&#xff0c;使得人们更容易理解和跟踪整个流程。因此&#xff0c;制作流程图是在日常工作中…

抑郁症与肠道微生物群有何关联

谷禾健康 抑郁症肠道菌群 当一个人面临抑郁症时&#xff0c;一切看似平常的事都会变得很有挑战性。上班、与朋友社交&#xff0c;甚至只是起床都感觉很困难。 抑郁症是如今已是世界上最普遍的精神障碍之一&#xff0c;一直是心理学和医学领域的研究热点。抑郁症是一种需要预防和…

探索 TypeScript 元组的用例

元组扩展了数组数据类型的功能。使用元组&#xff0c;我们可以轻松构造特殊类型的数组&#xff0c;其中元素相对于索引或位置是固定类型的。由于 TypeScript 的性质&#xff0c;这些元素类型在初始化时是已知的。使用元组&#xff0c;我们可以定义可以存储在数组中每个位置的数…