Springboot Rabbitmq + 线程池技术控制指定数量task执行

定义DataSyncTaskManager,作为线程池任务控制器

package org.demo.scheduletest.service;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;

@Slf4j
public class DataSyncTaskManager {
    // 线程数
    private static final Integer threadNum = 5;

    private static DataSyncTaskManager taskManager = null;
    private static BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>();

    private ThreadPoolExecutor taskExecutorPool;

    private DataSyncTaskManager() {
        taskExecutorPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(threadNum);
    }

    /**
     * 构建唯一Manager对象单例
     *
     * @return
     */
    public static synchronized DataSyncTaskManager getManager() {
        if (null == taskManager) {
            taskManager = new DataSyncTaskManager();
        }

        return taskManager;
    }

    /**
     * 提交需要运行的任务
     *
     * @param task
     */
    public void submitTask(DataSyncTask task) {
        taskQueue.add(task);
        log.info("[DataSyncTaskManager] submitTask size={}", taskQueue.size());
    }

    public void runTaskDaemon() {
        log.info("[DataSyncTaskManager] runTaskDaemon start.");
        Thread thread = new Thread(() -> {
            while (true) {
                try {
                    Runnable task = taskQueue.take();
                    taskExecutorPool.submit(task);
                    // log.info("[DataSyncTaskManager] runTaskDaemon submit task={}", task);

                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    log.error("[startTaskRunningDaemon] task run InterruptedException", e);
                } catch (Exception e) {
                    log.error("[startTaskRunningDaemon] task run Exception", e);
                }
            }
        });

        thread.setName(this.getClass().getSimpleName());
        thread.start();
    }
}

定义DataSyncTask,作为具体任务执行方

package org.demo.scheduletest.service;

import com.rabbitmq.client.Channel;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;


@Data
@Slf4j
public class DataSyncTask implements Runnable {
    private String name;
    private Channel channel;
    private long deliveryTag;

    public DataSyncTask(String name, Channel channel, long deliveryTag) {
        this.name = name;
        this.channel = channel;
        this.deliveryTag = deliveryTag;
    }

    /**
     * When an object implementing interface <code>Runnable</code> is used
     * to create a thread, starting the thread causes the object's
     * <code>run</code> method to be called in that separately executing
     * thread.
     * <p>
     * The general contract of the method <code>run</code> is that it may
     * take any action whatsoever.
     *
     * @see Thread#run()
     */
    @Override
    public void run() {
        log.info("[DataSyncTask] run task start, name = {}", name);

        try {
            Thread.sleep(30000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }

        try {
            channel.basicAck(deliveryTag, true);
            log.info("[DataSyncTask] run task end, name = {}", name);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

}

InitTask,服务启动执行Task管理器

package org.demo.scheduletest.service;

import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;

/**
 * @author zhe.xiao
 * @version 1.0
 * @since 2025/1/9 上午11:38
 */
@Slf4j
@Component
public class InitTask implements ApplicationRunner {
    /**
     * Callback used to run the bean.
     *
     * @param args incoming application arguments
     * @throws Exception on error
     */
    @Override
    public void run(ApplicationArguments args) throws Exception {
        DataSyncTaskManager.getManager().runTaskDaemon();
    }
}

配置Rabbitmq

package org.demo.scheduletest.rabbitmq;

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.context.annotation.Bean;

/**
 * @author zhe.xiao
 * @date 2022-07-06 17:27
 * @description
 **/
@SpringBootConfiguration
public class MyRabbitTemplateConfig {
    @Value("${spring.rabbitmq.host}")
    private String host;

    @Value("${spring.rabbitmq.port}")
    private int port;

    @Value("${spring.rabbitmq.username}")
    private String username;

    @Value("${spring.rabbitmq.password}")
    private String password;

    @Value("${spring.rabbitmq.virtual-host:/}")
    private String virtualhost;

    /**
     * 连接工厂
     * @return
     */
    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(virtualhost);
        //connectionFactory.setPublisherConfirms(true);
        return connectionFactory;
    }

    /**
     *
     * @return RabbitTemplate
     */
    @Bean
    public RabbitTemplate rabbitTemplate() {
        return new RabbitTemplate(connectionFactory());
    }
}

package org.demo.scheduletest.rabbitmq;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;

/**
 * @author zhe.xiao
 * @date 2022-07-05 14:55
 * @description
 **/
@Configuration
public class MyRabbitExecutor {
    //正常队列
    public static final String QUEUE_1 = "my:queue:1";
    public static final String EXCHANGE_1 = "my:exchange:1";
    public static final String ROUTEING_1 = "data:route:1";

    //死信队列
    public static final String QUEUE_DEAD_LETTER = "my:queue:deadLetter";
    public static final String EXCHANGE_DEAD_LETTER = "my:exchange:deadLetter";
    public static final String ROUTING_DEAD_LETTER = "data:route:deadLetter";

    // 提供 Queue
    @Bean
    Queue myQueue1(){
        HashMap<String, Object> args = new HashMap<>();
        //绑定死信队列信息
        args.put("x-dead-letter-exchange", EXCHANGE_DEAD_LETTER);
        args.put("x-dead-letter-routing-key", ROUTING_DEAD_LETTER);


//        args.put("x-max-length", 5); //队列最大长度,超过了会进入死信队列
//         args.put("x-message-ttl", 5000); //如果5秒没被消费,则进入死信队列

        return new Queue(QUEUE_1, true, false, false, args);
    }

    // 提供 Exchange
    @Bean
    DirectExchange myExchange1(){
        return new DirectExchange(EXCHANGE_1, true, false);
    }

    // 创建一个Binding对象,将Exchange和Queue绑定在一起
    @Bean
    Binding myBinding1(){
        return BindingBuilder.bind(myQueue1()).to(myExchange1()).with(ROUTEING_1);
        // return BindingBuilder.bind(myQueue1()).to(myExchange1());
    }

    // 死信队列配置 QUEUE, EXCHANGE, BINDING
    @Bean
    Queue myQueueDeadLetter(){
        return new Queue(QUEUE_DEAD_LETTER, true, false, false);
    }

    @Bean
    DirectExchange myExchangeDeadLetter(){
        return new DirectExchange(EXCHANGE_DEAD_LETTER, true, false);
    }

    @Bean
    Binding myBindingDeadLetter(){
        return BindingBuilder.bind(myQueueDeadLetter()).to(myExchangeDeadLetter()).with(ROUTING_DEAD_LETTER);
    }
}

Rabbitmq消费者通过task控制器提交执行任务

package org.demo.scheduletest.rabbitmq;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.demo.scheduletest.service.DataSyncTask;
import org.demo.scheduletest.service.DataSyncTaskManager;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

/**
 * 消费QUEUE
 *
 * @author zhe.xiao
 * @date 2022-07-05 14:57
 * @description
 **/
@Slf4j
@Component
public class MyReceiver {
    @RabbitListener(queues = MyRabbitExecutor.QUEUE_1)
    public void handler1(String data, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
        try {
            log.info("handler1 process: {}", data);
            DataSyncTask dataSyncTask = new DataSyncTask(data, channel, deliveryTag);
            DataSyncTaskManager.getManager().submitTask(dataSyncTask);
        } catch (Exception e) {
            log.error(e.getMessage());
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/951353.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Win32汇编学习笔记09.SEH和反调试

Win32汇编学习笔记09.SEH和反调试-C/C基础-断点社区-专业的老牌游戏安全技术交流社区 - BpSend.net SEH - structed exception handler 结构化异常处理 跟筛选一样都是用来处理异常的,但不同的是 筛选器是整个进程最终处理异常的函数,但无法做到比较精细的去处理异常(例如处理…

详细数据库MySQL查询语句

查询语句 &#xff08;SELECT [ALL|DISTINCT] <目标列表达式> [,<目标列表达式>] FROM <表名或视图名> [,<表名或视图名>]|(<SELECT 语句>) [AS] <别名> [WHERE <条件表达式>] [GROUP BY <列名1> [HAVING <条件表达式…

解决anaconda prompt找不到的情况

由于打开某个文件夹导致系统卡死了&#xff0c;鼠标使用不了&#xff0c;只能使用快捷键ctrlaltdelete打开&#xff0c;点任务管理器也没什么用&#xff0c;就点了注销选项。 注销&#xff1a;清空缓存空间和注册表信息&#xff0c;向系统发出清除现在登陆的用户的请求。 导致…

计算机网络 (31)运输层协议概念

一、概述 从通信和信息处理的角度看&#xff0c;运输层向它上面的应用层提供通信服务&#xff0c;它属于面向通信部分的最高层&#xff0c;同时也是用户功能中的最低层。运输层的一个核心功能是提供从源端主机到目的端主机的可靠的、与实际使用的网络无关的信息传输。它向高层用…

【C++经典例题】求1+2+3+...+n,要求不能使用乘除法、for、while、if、else、switch、case等关键字及条件判断语句

&#x1f493; 博客主页&#xff1a;倔强的石头的CSDN主页 &#x1f4dd;Gitee主页&#xff1a;倔强的石头的gitee主页 ⏩ 文章专栏&#xff1a; 期待您的关注 题目描述&#xff1a; 原题链接&#xff1a; 求123...n_牛客题霸_牛客网 (nowcoder.com) 解题思路&#xff1a; …

开关不一定是开关灯用 - 命令模式(Command Pattern)

命令模式&#xff08;Command Pattern&#xff09; 命令模式&#xff08;Command Pattern&#xff09;命令设计模式命令设计模式结构图命令设计模式涉及的角色 talk is cheap&#xff0c; show you my code总结 命令模式&#xff08;Command Pattern&#xff09; 命令模式&…

【深度学习量化交易13】继续优化改造基于miniQMT的量化交易软件,增加补充数据功能,优化免费下载数据模块体验!

我是Mr.看海&#xff0c;我在尝试用信号处理的知识积累和思考方式做量化交易&#xff0c;应用深度学习和AI实现股票自动交易&#xff0c;目的是实现财务自由~ 目前我正在开发基于miniQMT的量化交易系统——看海量化交易系统。 MiniQMT是一种轻量级的量化交易解决方案&#xff0…

Vue进阶(贰幺贰)npm run build多环境编译

文章目录 一、前言二、实施三、总结&#xff1a;需要打包区分不同环境四、拓展阅读 一、前言 项目开发阶段&#xff0c;会涉及打包部署到多个环境应用场景&#xff0c;在不同环境中&#xff0c;需要进行项目层面的区分&#xff0c;做不同的操作&#xff0c;可以利用打包的--mo…

回归预测 | MATLAB实GRU多输入单输出回归预测

回归预测 | MATLAB实GRU多输入单输出回归预测 目录 回归预测 | MATLAB实GRU多输入单输出回归预测预测效果基本介绍程序设计参考资料 预测效果 基本介绍 回归预测 | MATLAB实GRU多输入单输出回归预测。使用GRU作为RNN的一种变体来处理时间序列数据。GRU相比传统的RNN有较好的记…

ARM交叉编译Boost库

Boost下载&#xff1a;点击跳转 编译过程&#xff1a; 生成project-config.jam ./bootstrap.sh --with-librariesfilesystem,thread --with-toolsetgcc 2. 修改project-config.jam&#xff08;位于第12行附近&#xff09; if ! gcc in [ feature.values <toolset> ] …

【.NET】Kafka消息队列介绍,使用Confluent.Kafka集成Kafka消息队列

一、Kafka介绍 kafka是一种高吞吐量、分布式、可扩展的消息中间件系统&#xff0c;最初由LinkedIn公司开发。随着不断的发展&#xff0c;在最新的版本中它定义为分布式的流处理平台&#xff0c;现在在大数据应用中也是十分广泛。 它可以处理大量的实时数据流&#xff0c;被广…

Jenkins内修改allure报告名称

背景&#xff1a; 最近使用Jenkins搭建自动化测试环境时&#xff0c;使用Jenkins的allure插件生成的报告&#xff0c;一直显示默认ALLURE REPORT&#xff0c;想自定义成与项目关联的名称&#xff0c;如图所示&#xff0c;很明显自定义名称显得高大上些&#xff0c;之前…

Elasticsearch学习(1) : 简介、索引库操作、文档操作、RestAPI、RestClient操作

目录 1.elasticsearch简介1.1.了解es1.2.倒排索引正向索引和倒排索引 1.3.es的一些概念:文档和字段&#xff1b;索引和映射&#xff1b;Mysql与ES1.4.安装es、kibana部署单点es部署kibanaIK分词器安装IK分词器与测试扩展与停用词词典总结 部署es集群 2.索引库操作2.1.mapping映…

【Linux】Linux常见指令(上)

个人主页~ 初识Linux 一、Linux基本命令1、ls指令2、pwd命令3、cd指令4、touch指令5、mkdir指令6、rmdir指令7、rm指令8、man指令9、cp指令10、mv命令 Linux是一个开源的、稳定的、安全的、灵活的操作系统&#xff0c;Linux下的操作都是通过指令来实现的 一、Linux基本命令 先…

【Java项目】基于SpringBoot的【校园交友系统】

【Java项目】基于SpringBoot的【校园交友系统】 技术简介&#xff1a;系统软件架构选择B/S模式、SpringBoot框架、java技术和MySQL数据库等&#xff0c;总体功能模块运用自顶向下的分层思想。 系统简介&#xff1a;系统主要包括管理员和用户。 (a) 管理员的功能主要有首页、个人…

点击底部的 tabBar 属于 wx.switchTab 跳转方式,目标页面的 onLoad 不会触发(除非是第一次加载)

文章目录 1. tabBar 的跳转方式2. tabBar 跳转的特点3. 你的配置分析4. 生命周期触发情况5. 总结 很多人不明白什么是第一次加载&#xff0c;两种情况讨论&#xff0c;第一种情况假设我是开发者&#xff0c;第一次加载就是指点击微信开发者工具上边的编译按钮&#xff0c;每点击…

什么是Kafka?有什么主要用途?

大家好&#xff0c;我是锋哥。今天分享关于【什么是Kafka&#xff1f;有什么主要用途&#xff1f;】面试题。希望对大家有帮助&#xff1b; 什么是Kafka&#xff1f;有什么主要用途&#xff1f; 1000道 互联网大厂Java工程师 精选面试题-Java资源分享网 Kafka 是一个分布式流…

基于QT和C++的实时日期和时间显示

一、显示在右下角 1、timer.cpp #include "timer.h" #include "ui_timer.h" #include <QStatusBar> #include <QDateTime> #include <QMenuBar> Timer::Timer(QWidget *parent) :QMainWindow(parent),ui(new Ui::Timer) {ui->setup…

单片机-定时器中断

1、相关知识 振荡周期1/12us; //振荡周期又称 S周期或时钟周期&#xff08;晶振周期或外加振荡周期&#xff09;。 状态周期1/6us; 机器周期1us; 指令周期1~4us; ①51单片机有两组定时器/计数器&#xff0c;因为既可以定时&#xff0c;又可以计数&#xff0c;故称之为定时器…

Java 如何传参xml调用接口获取数据

传参和返参的效果图如下&#xff1a; 传参&#xff1a; 返参&#xff1a; 代码实现&#xff1a; 1、最外层类 /*** 外层DATA类*/ XmlRootElement(name "DATA") public class PointsXmlData {private int rltFlag;private int failType;private String failMemo;p…