SpringBoot通过文件监听实现MQ加密数据异步转发

一、前言

  • 假设在两个局域网中,生产者和消费者进行通信

  • 使用同步方式,mq偶尔会因为网络策略等问题导致消息发送失败,那么这条数据就丢失了

  • 这时可以使用异步方式,将数据在生产端存一份,网通时发,网断时存

二、实现思路

  • 将数据进行加密,写入json文件,并存入指定目录

  • 监听目录,当文件生成时,获取文件内容

  • 将文件内容通过rabbitMQ转发,并删除文件

  • 若mq转发过程中出现异常,捕获异常类型进行判断:

    • 若为ResourceAccessException(访问网络资源异常)或AmqpConnectException(连接到AMQP服务异常),则判定为通信异常,将文件存入repeat(临时目录),等待后续发送
    • 若为其他异常,则说明为数据或程序本身问题,存入error目录,作为错误记录,后续不发送
  • 通过定时任务,将repeat目录中的数据定时转发

三、代码实现

  • 配置文件
# 异步数据文件转发,失败文件重发时间设置,cron表达式
transmit.dataforwarding-resend=0 0/1 * * * ?
#文件本地存储地址(文件摆渡/文件生成地址。地址最后不带/)
file.local.sync.path=${user.dir}
  • 数据加密,并写入json文件
public class MsgTransmit{

    @Value("${file.local.sync.path}")
    private String failFilePath;

    @Override
    public void saveLocal(Object msg) {
        // 进行消息加密
        String msgJson = Encryptor.create().signatureEncrypt(msg);
        // 拼接文件目录
        String filePath = String.format("%s%s_%s.json", failFilePath, File.separator, 
String.valueOf(Instant.now().toEpochMilli()), new Date()));
        try (PrintWriter writer = new PrintWriter(new FileWriter(filePath))) {
            // 将加密数据写入文件
            writer.print(msgJson);
        } catch (IOException e) {
            log.error("error: {}", e);
        }
    }
}
  • 线程池配置(配置,无需改动,可替代)
@EnableAsync
@Configuration
public class ExecutorConfig {

    @Bean("executor")
    public Executor asyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(4);
        executor.setMaxPoolSize(8);
        executor.setQueueCapacity(200);
        executor.setKeepAliveSeconds(60);
        executor.setThreadNamePrefix("async-executor-");
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    }

    @Bean
    public ExecutorService executorService() {
        return TtlExecutors.getTtlExecutorService(Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2));
    }
}
  • 文件监听逻辑(包含文件监听、mq消息转发、异常文件存储、异常文件数据定时转发)
  • 开启定时任务记得启动类加@EnableScheduling
@Slf4j
@Component
@ConditionalOnProperty(name = "transmit.type", havingValue = "2")
public class FileListener extends FileAlterationListenerAdaptor {

    @Resource
    private ExecutorService executorService;
    
    @Resource
    private RabbitTemplate rabbitTemplate;
    
	@Value("${file.local.sync.path}")
    private String failFilePath;

    /**
     * 文件修改
     *
     * @param file
     */
    @Override
    public void onFileChange(File file) {
        log.info("文件监听目录[修改]:" + file.getAbsolutePath());
        handleFile(file);
    }

    /**
     * 文件创建
     *
     * @param file
     */
    @Override
    public void onFileCreate(File file) {
        log.info("文件监听目录[新建]:{}", file.getAbsolutePath());
        handleFile(file);
    }

    /**
     * 目录删除
     *
     * @param file
     */
    @Override
    public void onDirectoryDelete(File file) {
        log.info("文件监听[目录删除]:{}", file.getAbsolutePath());
    }

    /**
     * 处理文件
     *
     * @param file 文件
     */
    private void handleFile(File file) {
        executorService.execute(() -> {
            String path = file.getAbsolutePath();
            try {
                // rabbitMq发送消息,记得自定义mq队列
                rabbitTemplate.convertAndSend("your MQ queue", new String(Files.readAllBytes(Paths.get(path))));
                log.info("文件转发MQ成功:{}", path);
                // 删除发送成功的文件
                boolean del = new File(path).delete();
            } catch (Exception e) {
                if (file.exists()) {
                    // 遇到mq连接等问题,存入重发文件存储目录。其他问题,存入失败文件存储目录
                    String sourcePath;
                    if (e instanceof ResourceAccessException || e instanceof AmqpConnectException) {
                        sourcePath = String.format("%s/%s", this.getTmpRepeatDir(), file.getName());
                    } else {
                        sourcePath = String.format("%s/%s", this.getTmpErrorDir(), file.getName());
                    }
                    file.renameTo(new File(sourcePath));
                }
                log.error("文件解析异常, path:{}, error:{}.", file.getAbsolutePath(), e.getMessage());
            }
        });
    }

    /**
     * 重发临时文件存储目录
     *
     * @return
     */
    private String getTmpRepeatDir() {
        return String.format("%s/%s", failFilePath + "/tempMsg", "repeat");
    }

    /**
     * 失败临时文件存储目录
     *
     * @return
     */
    private String getTmpErrorDir() {
        return String.format("%s/%s", failFilePath + "/tempMsg", "error");
    }

    /**
     * 文件重发定时任务逻辑
     */
    @Scheduled(cron = "${transmit.dataforwarding-resend}")
    public void resendTask() {
        File connect = new File(this.getTmpRepeatDir());
        // 获取目录中的所有文件和子目录
        File[] files = connect.listFiles();
        for (File file : files) {
            if (file.isFile() && file.getName().endsWith(".json")) {
                log.info("文件重发, path:{}", file.getAbsolutePath());
                handleFile(file);
            }
        }
    }
}
  • 开启文件监听
@Slf4j
@Configuration
@ConditionalOnProperty(name = "transmit.type", havingValue = "2")
public class ApplicationRunner implements CommandLineRunner {

    @Resource
    private FileListener fileListener;
    
    @Value("${file.local.sync.path}")
    private String failFilePath;

    @Override
    public void run(String... args) {
        this.createFileListener(failFilePath + "/tempMsg");
        long newLastModifiedTime = System.currentTimeMillis();
        File rootPath = new File(failFilePath + "/tempMsg");
        // 获取目录中的所有文件和子目录
        File[] files = rootPath.listFiles();
        for (File file : files) {
            if (file.isFile() && file.getName().endsWith(".json")) {
                file.setLastModified(newLastModifiedTime);
            }
        }
    }

    /**
     * 创建文件夹监听
     *
     * @param rootDir 监听目录
     */
    private void createFileListener(String rootDir) {
        File input = new File(rootDir);
        if (!input.exists()) {
            log.error("【文件夹监听】 目录不存在,path:【{}】", rootDir);
            input.mkdirs();
        }
        File errorDirectory = new File(String.format("%s/%s", failFilePath + "/tempMsg", "/error"));
        if (!errorDirectory.exists()) {
            errorDirectory.mkdirs();
        }
        File connectDirectory = new File(String.format("%s/%s", failFilePath + "/tempMsg", "/repeat"));
        if (!connectDirectory.exists()) {
            connectDirectory.mkdirs();
        }
        try {
            FileAlterationObserver observer = new FileAlterationObserver(new File(rootDir));
            observer.addListener(fileListener);
            // 轮询间隔 3 秒
            long interval = TimeUnit.SECONDS.toMillis(3);
            //创建文件变化监听器(默认为1000毫秒执行一次扫描)
            FileAlterationMonitor monitor = new FileAlterationMonitor(interval, observer);
            // 开始监控
            monitor.start();
            log.info("【文件夹监听】 开始监听{}文件变化--------", rootDir);
        } catch (Exception e) {
            log.error("【文件夹监听】 初始化失败.");
        }
    }
}

四、测试

  • 接口调用,断点打到消费文件前,可以看到文件生成到指定目录

在这里插入图片描述

  • 放开断点,文件消费,mq转发数据

在这里插入图片描述


注:该代码为实现最近某需求:互联网与某局域网通信,而mq偶发歇逼想出。

各位大佬有好想法欢迎指出。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/968825.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

windows10本地的JMeter+Influxdb+Grafana压测性能测试,【亲测,避坑】

一、环境,以下软件需要解压、安装到电脑上。 windows10 apache-jmeter-5.6.3 jdk-17.0.13 influxdb2-2.7.11 grafana-enterprise-11.5.1二、配置Influxdb,安装完默认连接http://localhost:8086/。打开连接,配置如下。 1、配置bucket&#x…

excel如何拆分为1000行一个的文件

方法一:使用hpctb 打开文件,点“行分表”就行了。如图: 一步完成,不需要安装环境,也不需要专业知识,也不需要“神”一样的ai。 方法二:使用Python 安装库(如未安装) pip install p…

docker nginx 配置文件详解

在平常的开发工作中,我们经常需要访问静态资源(图片、HTML页面等)、访问文件目录、部署项目时进行负载均衡等。那么我们就会使用到Nginx,nginx.conf 的配置至关重要。那么今天主要结合访问静态资源、负载均衡等总结下 nginx.conf …

分布式 IO:矿山砂石装备高效控制的新引擎

在砂石与矿山行业这片充满挑战与机遇的领域,装备的高效运行和精准控制一直是企业追求的核心目标。随着科技的飞速发展,明达技术推出的MR30分布式 IO 模块作为一种先进的控制技术,正逐渐崭露头角,为砂石、矿山装备的升级改造带来了…

微信小程序配置3 配置sass

1. 在config。json文件里面的setting配置“sass” 2. 改你需要的页面后缀名为scss。 3.查看页面即可看到样式。

VS Code 通知中一直显示“Reactivating terminals...”的问题解决

VS Code 通知中一直显示“Reactivating terminals...”的问题解决 问题截图解决方案修复截图相关链接 问题截图 解决方案 点击顶部放大镜(🔍) -> 输入尖括号(>) -> 然后搜索(Python: Clear Workspace Interpreter Setting) -> 最后点击即可 修复截图…

小白学网络安全难吗?需要具备哪些条件?

作为一名零基础小白,想要转行IT学习一门新技术,且上手难度低、就业前景好、薪资待遇高、入行门槛低,网络安全是最值得的选择,掌握它之后你可以获得一份收入不错的工作。那么零基础学网络安全好学吗?以下是具体内容介绍。 首先&am…

服务器,交换机和路由器的一些笔记

服务器、交换机和路由器是网络中常用的设备,它们的本质区别和联系如下: 本质区别 功能不同 服务器:就像一个大型的资料仓库和工作处理中心,主要用来存储和管理各种数据,比如网站的网页数据、公司的办公文档等&#x…

untiy3D为游戏物体制作简单的动画

1.创建一个物体挂载动画组件Animator 2.创建一个动画控制器 3.动画控制器挂载到Animator组件 4.创建动画窗口>动画 入口默认执行left 执行效果 20250212_151707 脚本控制动画 鼠标点击是切换到动画t using System.Collections; using System.Collections.Generic; usi…

2、k8s 二进制安装(详细)

k8s 二进制安装 IP规划初始化部署 etcd 集群在 etcd01 节点上操作准备cfssl证书生成工具,加权生成etcd证书上传etcd软件包启动 etcd 服务 部署 Master 组件部署 Worker Node 组件node 节点安装 docker部署组件 部署 CNI 网络组件部署 flannel简介部署 部署 Calico简…

3.React 组件化开发

react:版本 18.2.0node: 版本18.19.1脚手架:版本 5.0.1 一、类组件 (一) 一个干净的脚手架 【1】使用已经被废弃的 CRA (create-react-app) create-react-app 已经被废弃,且目前使用会报错,官方已经不推荐使用&…

第二天:工具的使用

每天上午9点左右更新一到两篇文章到专栏《Python爬虫训练营》中,对于爬虫有兴趣的伙伴可以订阅专栏一起学习,完全免费。 键盘为桨,代码作帆。这趟为期30天左右的Python爬虫特训即将启航,每日解锁新海域:从Requests库的…

AI前端开发:赋能开发者,提升解决实际问题的能力

近年来,人工智能技术飞速发展,深刻地改变着各行各业。在软件开发领域,AI写代码工具的出现更是引发了一场革命,尤其是前端开发领域,AI的应用正在显著提升开发者的解决实际问题的能力。本文将探讨AI前端开发如何提升效率…

20vue3实战-----使用echarts搭建各种图表

20vue3实战-----使用echarts搭建各种图表 1.实现目标2.实现步骤2.1封装组件2.2使用组件 1.实现目标 如上,页面上有各种各样类型的图标。这时候需要用到echarts库作为辅助。 2.实现步骤 首先安装echarts库的步骤就不用多说。 2.1封装组件 page-echarts/index.ts: import Bas…

【Qt 常用控件】多元素控件(QListWidget、QTableWidgt、QTreeWidget)

**View和**Widget的区别? **View的实现更底层,**Widget是基于**View封装实现的更易用的类型。 **View使用MVC结构 MVC是软件开发中 经典的 软件结构 组织形式,软件设计模式。 M(model)模型。管理应用程序的核心数据和…

Flappy Bird开发学习记录

概述 为了了解一下Unity的开发过程,或者说感受?先搞简单的练练手。 工具 Unity:2022.3.51f1c1 visual studio 2022 开发过程 项目基本设置 新建2d项目,游戏画面设置为1080*1920(9:16)。 图片素材设…

35~37.ppt

目录 35.张秘书-《会计行业中长期人才发展规划》 题目​ 解析 36.颐和园公园(25张PPT) 题目​ 解析 37.颐和园公园(22张PPT) 题目 解析 35.张秘书-《会计行业中长期人才发展规划》 题目 解析 插入自定义的幻灯片:新建幻灯片→重用…

19.4.6 读写数据库中的二进制数据

版权声明:本文为博主原创文章,转载请在显著位置标明本文出处以及作者网名,未经作者允许不得用于商业目的。 需要北风数据库的请留言自己的信箱。 北风数据库中,类别表的图片字段在【数据表视图】中显示为Bitmap Image&#xff1…

sqli-lab靶场学习(六)——Less18-22(User-Agent、Referer、Cookie注入)

前言 前面的关卡,都是直接在输入框或者浏览器的地址栏上做文章即可。但本文这几关,需要用工具拦截请求修改请求头部才行。 Less18(User-Agent注入) 本关的注入点在User-Agent。我们在用户名和密码框中输入admin/admin后&#xf…

uniapp 使用 鸿蒙开源字体

uniapp vue3 使用 鸿蒙开源字体 我的需求是全局使用鸿蒙字体。 所以: 0. 首先下载鸿蒙字体: 鸿蒙资源 下载后解压,发现里面有几个文件夹: 字体名称说明Sans默认的鸿蒙字体,支持基本的多语言字符(包括字…