【梦辛工作室】java实现简易消息队列处理器 可分区 分区顺序消费MxMQ

大家好哇,又是我,梦辛工作室的灵,最近在巩固JUC并发包,突然想到如果自己的应用体量不大,但有需要消息队列来实现应用解耦和削峰来缓解服务器突增压力,比如抢票时,突然有比较用户同时抢票,就容易造成服务器同时连接数较多,拒绝其他用户的使用,就想着可以用消息队列来缓解,但是体量有不大,还没必要用MQ框架,那就直接自己写一个,这样,抢票请求来了就直接丢给队列处理器,然后再延迟查询处理结果,这样能减轻不少压力,老样子,先看下实现效果:
在这里插入图片描述
然后看下测试代码:

public class TestOptional {
    @Test
    public void doTestOptional(){

        MxMQ<Message> mxMQ = MxMQ.getInstance();

        /**
         * 添加分区 无消息一直阻塞
         */
        mxMQ.addPartion("test", new MQHandler<Message>() {
            @Override
            public void hand(Message message) {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                System.out.println(message.getMessage());
            }
        });
        /**
         * 添加分区 无消息且等待时长超过20秒自动移除该分区
         */
        mxMQ.addPartionAutoRemove("test2", new MQHandler<Message>() {
            @Override
            public void hand(Message message) {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                System.out.println(message.getMessage());
            }
        });

        for(int index = 0;index < 20;index++){
            int finalIndex = index;
            Message message = new Message("test_" + finalIndex);
            Message message2 = new Message("test2_" + finalIndex);
            try {
                mxMQ.sendMessage("test",message);
                mxMQ.sendMessage("test2",message2);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }

        while (true){}

    }
}

还可以自定义不同分区不同的处理器,逻辑自由定义,下面看下几个关键类:
MxMQRunnable:

package com.mx.mxmq;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;

public class MxMQRunnable<T> implements Runnable{

    boolean isRun = false;
    ArrayBlockingQueue<T> arrayBlockingQueue = null;
    MQHandler<T> mqHandler = null;
    int state = 0;

    MxMQ.QueueEmpty queueEmpty = null;

    public void setQueueEmpty(MxMQ.QueueEmpty queueEmpty) {
        this.queueEmpty = queueEmpty;
    }

    public MxMQRunnable(MQHandler<T> mqHandler){
        isRun = true;
        arrayBlockingQueue = new ArrayBlockingQueue(50);
        this.mqHandler = mqHandler;
        state = MxMQ.STATE_WAIT;
    }

    public MxMQRunnable(int number,MQHandler<T> mqHandler){
        arrayBlockingQueue = new ArrayBlockingQueue(number);
        this.mqHandler = mqHandler;
        state = MxMQ.STATE_WAIT;
    }

    public void setState(int state) {
        this.state = state;
    }

    @Override
    public void run() {
        while (isRun){
            try {
                T t = null;
                if(state == MxMQ.STATE_WAIT){
                   t = arrayBlockingQueue.take();
                } else {
                   t = arrayBlockingQueue.poll(20,TimeUnit.SECONDS);
                   if(t == null){
                       close();
                       queueEmpty.empty(this);
                       break;
                   }
                }
                if(mqHandler != null){
                    mqHandler.hand(t);
                }
            } catch (Exception e) {
                 e.printStackTrace();
            }
        }
    }

    public boolean sendMessage(T t) throws InterruptedException {
        return arrayBlockingQueue.offer(t,20, TimeUnit.SECONDS);
    }

    public boolean removeMessage(T t){
        return arrayBlockingQueue.remove(t);
    }

    public void close(){
        isRun = false;
    }

}

MxMQ:

package com.mx.mxmq;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class MxMQ<T> {

    public static final int STATE_WAIT = 0;
    public static final int STATE_REMOVE = 1;

    private MxMQ(){
        executors = Executors.newCachedThreadPool();
        partionRunMap = new ConcurrentHashMap<>();
    }

    public static MxMQ getInstance() {
        if(instance == null){
            synchronized (MxMQ.class){
                if(instance == null){
                    instance = new MxMQ();
                }
            }
        }
        return instance;
    }

    private static volatile MxMQ instance = null;

    private ConcurrentHashMap<String,MxMQRunnable<T>> partionRunMap = null;

    private ExecutorService executors =  null;

    /**
     * 添加分区
     * @param partion 分区
     * @param mxHandler 处理器
     * @return
     */
    public boolean addPartion(String partion,MQHandler<T> mxHandler){
        if(partionRunMap.get(partion) == null){
            MxMQRunnable<T> curMxMQRunnable = new MxMQRunnable<T>(mxHandler);
            partionRunMap.put(partion,curMxMQRunnable);
            executors.execute(curMxMQRunnable);
            System.out.println(partion+"被添加");
            return true;
        }
        return false;
    }

    /**
     * 当分区里面没有任务超过20秒后就会自动移除分区
     * @param partion 分区
     * @param mxHandler 处理器
     * @return
     */
    public boolean addPartionAutoRemove(String partion,MQHandler<T> mxHandler){
        if(partionRunMap.get(partion) == null){
            MxMQRunnable<T> curMxMQRunnable = new MxMQRunnable<T>(mxHandler);
            curMxMQRunnable.setState(STATE_REMOVE);
            curMxMQRunnable.setQueueEmpty(new QueueEmpty() {
                @Override
                public void empty(MxMQRunnable mxMQRunnable) {
                    removePartion(partion);
                }
            });
            partionRunMap.put(partion,curMxMQRunnable);
            executors.execute(curMxMQRunnable);
            System.out.println(partion+"被添加");
            return true;
        }
        return false;
    }

    public boolean removePartion(String partion){
        if(partionRunMap.get(partion) != null){
            MxMQRunnable<T> remove = partionRunMap.remove(partion);
            remove.close();
            System.out.println(partion+"被移除");
            return true;
        }
        return false;
    }

    public boolean sendMessage(String partion,T t) throws InterruptedException {
        MxMQRunnable<T> tMxMQRunnable = partionRunMap.get(partion);
        if(tMxMQRunnable != null){
            tMxMQRunnable.sendMessage(t);
            return true;
        }
        return false;
    }

    public boolean removeMessage(String partion,T t){
        MxMQRunnable<T> tMxMQRunnable = partionRunMap.get(partion);
        if(tMxMQRunnable != null){
            return tMxMQRunnable.removeMessage(t);
        }
        return false;
    }

    interface QueueEmpty{
        void empty(MxMQRunnable mxMQRunnable);
    }

}

MQHandler:

package com.mx.mxmq;

public interface MQHandler<T> {
    void hand(T t);
}

Message:

package com.mx.mxmq;

public class Message {
    String message;

    public String getMessage() {
        return message;
    }

    public void setMessage(String message) {
        this.message = message;
    }

    public Message(String message){
        this.message = message;
    }
}

好了,收,大概就是这样子,主要应用场景为:需要轻量级的顺序队列消费 应用场景

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/34642.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

分布式运用——rsync远程同步

分布式运用——rsync远程同步 一、rsync的背景和原理1.rsync的功能2.rsync的应用场景3.使用rsync的基本命令4.scp与rsync的区别 二、配置rsync源服务器1.关闭防火墙2.建立/etc/rsyncd.conf 配置文件3.保证所有用户对源目录/var/www/html 都有读取权限4.启动 rsync 服务程序5.关…

flutter3.7版本下使用flutter boost解决使用platview崩溃或异常问题

背景 工程使用了混合开发&#xff0c;使用flutter boost插件&#xff0c;flutter 的activity1 frament1 跳转activity2 frament2&#xff0c;frament1 包含platformView&#xff0c;按照上面老哥解决崩溃问题的基础上&#xff0c;出现activity2 frament2返回activity1 framen…

“未来之光:揭秘创新科技下的挂灯魅力“

写在前面&#xff1a; 高度信息化当下时代&#xff0c;对电脑及数字设备的需求与日俱增无处不在&#xff0c;随之而来的视觉疲劳和眼睛问题也攀升到了前所未有的高度。传统台灯对于长时间使用电脑的人群来说是完全无法解决这些问题的。一款ScreenBar Halo 屏幕挂灯&#xff0c;…

数学建模——插值(下)

本文是面向数学建模准备的&#xff0c;是介绍性文章&#xff0c;没有过多关于原理的说明&#xff01;&#xff01;&#xff01; 目录 一、2维插值原理及公式 1、二维插值问题 2、最邻近插值 3、分片线性插值 4、双线性插值 5、二维样条插值 二、二维插值及其Matlab工具箱…

微信小程序

私人博客 许小墨のBlog —— 菜鸡博客直通车 系列文章完整版&#xff0c;配图更多&#xff0c;CSDN博文图片需要手动上传&#xff0c;因此文章配图较少&#xff0c;看不懂的可以去菜鸡博客参考一下配图&#xff01; 系列文章目录 前端系列文章——传送门 后端系列文章——传送…

opencv使用applyColorMap()函数,可以将灰度图或彩色图转换成自定义的彩色图,或opencv提供的20多种色彩值

文章目录 1、applyColorMap()函数的使用&#xff1a;&#xff08;1&#xff09;函数原型&#xff1a;void applyColorMap(InputArray src, OutputArray dst, int colormap)void applyColorMap(InputArray src, OutputArray dst, InputArray userColor) &#xff08;2&#xff0…

利用Python构建宁德时代、比亚迪、隆基绿能股票时间序列预测模型

存货 import tushare as ts # 导包 import numpy as np import matplotlib.pyplot as plt from scipy.signal import find_peaks from scipy.stats import norm import datetime import pandas as pd import seaborn as sns # pip install seaborn import matplotlib.patches …

WideNet:让网络更宽而不是更深

这是新加坡国立大学在2022 aaai发布的一篇论文。WideNet是一种参数有效的框架&#xff0c;它的方向是更宽而不是更深。通过混合专家(MoE)代替前馈网络(FFN)&#xff0c;使模型沿宽度缩放。使用单独LN用于转换各种语义表示&#xff0c;而不是共享权重。 混合专家(MoEs) 条件计…

数通王国历险记之TCP协议下的三大协议的验证实验

系列文章目录 数通王国历险记&#xff08;1&#xff09; 前言 一&#xff0c;我们要先知道PDU是什么&#xff1f; 二、TCP协议下的三大协议的验证实验 1.FTP的验证实验 1&#xff0c;拓扑图 2.将lsw4配置一下 3&#xff0c;FTP服务器端开启FTP服务&#xff1a; 4&#x…

LIN诊断实现MCU本地OTA升级

一、目标 通过PC端上位机实现MCU本地的OTA升级,本篇文章对实现的目的、需要用到的第三方工具、LIN诊断帧、升级协议、MCU端升级过程以及PC端升级过程做详细说明。 二、目的 最近在做MCU项目时需要将样机寄给客户进行验证,在客户的验证过程中要求参数可调试,如果需要修改软…

vue下基于elementui自定义表单-后端数据设计篇

vue下基于elementui自定义表单-后端篇 自定义表单目前数据表单设计是基于数据量不大的信息单据场景&#xff0c;因为不考虑数据量带来的影响。 数据表有: 1.表单模版表&#xff0c;2.表单实例表&#xff0c;3.表单实例项明细表&#xff0c;4表单审批设计绑定表 以FormJson存…

RuoYi(分离版) 使用代码生成器添加子模块(idea版)

右键总文件夹&#xff0c;选择新模块添加新模块 新建的业务模块 新建的业务模块中添加若依通用模块工具 <dependencies><dependency><groupId>com.ruoyi</groupId><artifactId>ruoyi-common</artifactId></dependency></depen…

【Thunder送书 | 第三期 】「Python系列丛书」

文章目录 前言《Python高效编程——基于Rust语言》《Python从入门到精通》《Python Web深度学习》《Python分布式机器学习》文末福利 | 赠书活动 前言 Thunder送书第三期开始啦&#xff01;前面两期都是以【文末送书】的形式开展&#xff0c;本期将赠送Python系列丛书&#xff…

学习系统编程No.25【核心转储实战】

引言&#xff1a; 北京时间&#xff1a;2023/6/16/8:39&#xff0c;实训课中&#xff0c;大一下学期最后有课的一天&#xff0c;还有两天就要期末考啦&#xff01;目前什么都还没有复习&#xff0c;不到星期天晚上&#xff0c;咱不慌&#xff0c;小小挂科&#xff0c;岂能拦得…

ElementUI plus框架Table表格cell-style属性的使用

官方文档说明&#xff1a; 例&#xff1a;设置单元格文字居中 Object方式&#xff1a; function方式&#xff1a;

NOSQL——redis的安装,配置与简单操作

一、缓存的概述 缓存是为了调节速度不一致的两个或多个不同的物质的速度&#xff0c;在中间对速度较慢的一方起到加速作用&#xff0c;比如CPU的一级、二级缓存是保存了CPU最近经常访问的数据&#xff0c;内存是保存CPU经常访问硬盘的数据&#xff0c;而且硬盘也有大小不一的缓…

抖音seo矩阵系统源码开发部署|抖音小程序接入(一)

一、 开发部署步骤&#xff1a; &#xff08;1&#xff09;申请开放平台服务商 &#xff08;2&#xff09;申请开放平台网站应用 &#xff08;3&#xff09;申请开放平台应用权限 &#xff08;4&#xff09;提交各个API接口申请文档 &#xff08;5&#xff09;审核通过技…

详解JS 作用域与作用域链、IIFE模式、js执行过程

文章目录 一、什么是作用域二. 全局作用域、函数作用域、块级作用域全局作用域函数作用域注意 if、for循环、while循环变量 块级作用域 二、什么是作用域链1. 什么是自由变量2.什么是作用域链3. 关于自由变量的取值 三、IIFE模式由来语法基本语法带参 四、JavaScript 执行过程编…

【运维工程师学习】Debian安装

【运维工程师学习】Debian安装 1、界面说明2、选择语言3、等待探测并挂载安装介质完成4、设置主机名称、用户信息5、磁盘分区6、创建分区7、最终分区为8、安装ssh9、查看ssh状态10、查看内存大小11、查询系统磁盘及分区情况12、查看各磁盘及分区剩余13、查看ip地址 选择镜像文件…

【Linux】进程信号之信号的产生

进程信号 一 一、信号入门1、信号的一些特性2、信号的处理方式信号捕捉初识 3、Linux下的信号 二、信号的产生1、通过终端按键产生信号2、调用系统函数向进程发信号a、kill函数b、raise函数c、abort函数 3. 由软件条件产生信号4、硬件异常产生信号 结语 一、信号入门 什么是信号…