java通过FTP跨服务器动态监听读取指定目录下文件数据

背景:

1、文件数据在A服务器(windows)(不定期在指定目录下生成),项目应用部署在B服务器(Linux);
2、项目应用在B服务器,监听A服务器指定目录,有新生成文件,进行读取文件信息,持久化数据;
3、提供两块内容,第一安装windows FTP服务;第二项目源码,希望可以帮助到你。

共计4种方案,试错采用了第三种方案,第四种方案没有试。

1、使用jcsh.jar提供方法读取文件信息,但需要A服务器开通SSH远程连接,一般linux服务器都是默认开通的,可直接读取连接读取,windows系统需安装SSH,因现场环境A服务器是windows2003,故放弃这种方法。
2、曲线救国,通过脚本(脚本监听比较困难,故放弃)把A服务器信息定时存入B服务器(Linux),再通过jcsh.jar读取文件信息。
3、通过A服务器安装FTP服务,B服务器安装FTP客户端,使用java动态监听该目录下生成文件读取信息。
4、把A服务器指定目录进行共享(等同于共享的这个目录就是B服务的目录了),再进行读取,因第三种方案成功,故没有尝试第四种方案。

windows安装FTP服务

1、开启ftp服务:控制面板–程序和功能–启用或关闭windows功能–标红框全部打开–点击确定
在这里插入图片描述
2、新建站点:
控制面板–大图标–管理工具
在这里插入图片描述
IIS管理器
在这里插入图片描述
网站–添加FTP站点
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
以上就是windows安装FTP服务的过程,我这演示了匿名创建站点,谁都可以访问,还可以新建用户,需要用户登录才能访问。

源码

引入该依赖

<dependency>
   <groupId>commons-net</groupId>
    <artifactId>commons-net</artifactId>
    <version>3.6</version>
</dependency>

FileChangeData

@Data
public class FileChangeData {

    /**
     * 文件信息
     * */
    private FTPFile ftpFile;

    /**
     * 文件改变类型
     * */
    private FileChangeType eventType;

    /**
     * 文件名称
     * */
    private   String fileName;

    /**
     * 文件大小
     * */
    private Long fileSize;

    /**
     * FTPClient
     * */
    private FTPClient ftpClient;

    /**
     * 获取文件输入流
     * @return InputStream
     * */
    public InputStream getInputStream(String filePathName) {
        //如果是删除事件则不能够获取流
        if (Objects.equals(eventType, FileChangeType.FILE_DELETED)) {
            return null;
        }

        try {
            return ftpClient.retrieveFileStream(filePathName);
        } catch (IOException e) {
            return null;
        }
    }
}

FileChangeEvent

public interface FileChangeEvent {

    /**
     * 文件发生改变时触发此方法
     * @param fileChangeData 文件发生了改变
     * */
    @Function
    void change(FileChangeData fileChangeData) throws IOException;
}

FTPService

public interface FTPService {

    /**
     * ftp登陆
     * @return boolean 是否登陆成功
     * */
    FTPClient login();

    /**
     * ftp登出
     * @return boolean 是否登出成功
     * */
    boolean loginOut();

    /**
     * 获取文件列表
     * @return FTPFile[] 文件列表
     * */
    FTPFile[] listFile();

    /**
     * 监听文件夹的改变
     * @param fileChangeEvent 文件改变事件
     * */
    void addListenerFileChange(FileChangeEvent fileChangeEvent);
}

ListenerChangeRunnable

public interface ListenerChangeRunnable extends Runnable {

    /**
     * 停止监听文件
     * @return boolean 是否停止成功
     * */
    boolean stopListener();
}

FTPServiceImpl

@Service
public class FTPServiceImpl implements FTPService {

    @Autowired
    private FTPConfig ftpConfig;

    private String SPLIT = ":";

    private ThreadLocal<FTPClient> currentFTPClient;

    private ThreadLocal<ListenerChangeRunnable> currentListener;

    public FTPServiceImpl() {
        this.currentFTPClient = new ThreadLocal<>();
        this.currentListener = new ThreadLocal<>();
    }

    @Override
    public FTPClient login() {
        FTPClient ftpClient = new FTPClient();
        try {
            ftpClient.connect(ftpConfig.getFtpIp(), ftpConfig.getFtpPort());
            ftpClient.login(ftpConfig.getUsername(), ftpConfig.getPassword());
//            ftpClient.setControlEncoding("gb2312");
            this.currentFTPClient.set(ftpClient);
            return ftpClient;
        } catch (Exception e) {
            return null;
        }
    }

    @Override
    public boolean loginOut() {
        try {
            currentFTPClient.get().logout();
            currentFTPClient.get().disconnect();
            return Boolean.TRUE;
        } catch (Exception e) {
            return Boolean.FALSE;
        }
    }

    @Override
    public FTPFile[] listFile() {
        FTPClient ftpClient = this.currentFTPClient.get();
        try {
            return ftpClient.listFiles();
        } catch (Exception e) {
            return null;
        }
    }

    @Override
    public void addListenerFileChange(FileChangeEvent fileChangeEvent) {
        FTPClient ftpClient = this.currentFTPClient.get();
        ListenerFileChangeThreadRunnable listenerFileChangeThread = new ListenerFileChangeThreadRunnable(ftpClient, fileChangeEvent);
        this.currentListener.set(listenerFileChangeThread);
        new Thread(listenerFileChangeThread).start();
    }
}

ListenerFileChangeThreadRunnable

@Slf4j
public class ListenerFileChangeThreadRunnable implements ListenerChangeRunnable {

    private final FTPClient ftpClient;

    private volatile boolean stop;

    private final Map<String, Long> fileMemory;

    private final FileChangeEvent fileChangeEvent;

    public ListenerFileChangeThreadRunnable(FTPClient ftpClient, FileChangeEvent fileChangeEvent) {
        this.ftpClient = ftpClient;
        this.fileChangeEvent = fileChangeEvent;
        this.fileMemory = new HashMap<>();
    }

    @Override
    public void run() {
        while (!stop) {
            try {
                FTPFile[] ftpFiles = ftpClient.listFiles();

                //判断文件被删除
                if (fileMemory.size() > 0) {
                    Set<String> fileNames = new HashSet<>();
                    for (FTPFile ftpFile : ftpFiles) {
                        if (ftpFile.isDirectory()) {
                            log.info("文件夹不做删除判断");
                            continue;
                        }
                        fileNames.add(ftpFile.getName());
                    }
                    Set<Map.Entry<String, Long>> entries = fileMemory.entrySet();
                    for (Map.Entry<String, Long> map : entries) {
                        if (!fileNames.contains(map.getKey())) {
                            log.info("文件{}被删除了", map.getKey());
                            FileChangeData fileChangeData = new FileChangeData();
                            fileChangeData.setEventType(FileChangeType.FILE_DELETED);
                            fileChangeData.setFileName(map.getKey());
                            fileChangeData.setFileSize(map.getValue());
                            fileMemory.remove(map.getKey());
                            entries.remove(map.getKey());
                            fileChangeEvent.change(fileChangeData);
                        }
                    }
                }
                //判断文件是否有更改或新增
                for (FTPFile ftpFile: ftpFiles) {
                    //判断是否为文件夹
                    if (ftpFile.isDirectory()) {
//                        log.info("{}为文件不进行监听操作", ftpFile.getName());
                        continue;
                    }
                    FileChangeData fileChangeData = new FileChangeData();
                    fileChangeData.setFileName(ftpFile.getName());
//                    fileChangeData.setFileName("D:\\ftptest\\aaa\\"+ftpFile.getName());
                    fileChangeData.setFileSize(ftpFile.getSize());
                    fileChangeData.setFtpFile(ftpFile);
                    //文件是否存在于缓存文件列表中
                    if (fileMemory.containsKey(ftpFile.getName())) {
//                        log.info("文件{}在内存中已经存在,进行大小判断", ftpFile.getName());
                        if (!Objects.equals(fileMemory.get(ftpFile.getName()), ftpFile.getSize())) {
//                            log.info("文件{}在内存中已经存在且大小不一致,进行更新缓存操作", ftpFile.getName());
                            fileMemory.put(ftpFile.getName(), ftpFile.getSize());
                            fileChangeData.setEventType(FileChangeType.FILE_UPDATE);
                            fileChangeEvent.change(fileChangeData);
                        }
                        continue;
                    }
//                    log.info("文件{}在内存中不存在进行缓存操作", ftpFile.getName());
                    fileMemory.put(ftpFile.getName(), ftpFile.getSize());
                    fileChangeData.setEventType(FileChangeType.FILE_ADD);
                    fileChangeEvent.change(fileChangeData);
                }
            } catch (Exception e) {
                continue;
                //throw new RuntimeException(e);
            }
            try {
                TimeUnit.SECONDS.sleep(20);
            } catch (InterruptedException e) {
                continue;
                //throw new RuntimeException(e);
            }
        }
    }

    @Override
    public boolean stopListener() {
        this.stop = Boolean.TRUE;
        this.fileMemory.clear();
        return this.stop;
    }
}

FileChangeType

public enum FileChangeType {
    FILE_UPDATE(0, "文件更新"),
    FILE_ADD(1, "文件添加"),
    FILE_DELETED(2, "文件删除");

    @Getter
    private Integer type;

    @Getter
    private String desc;

    FileChangeType(Integer type, String desc) {
        this.type = type;
        this.desc = desc;
    }
}

FTPConfig

@Data
@Configuration
public class FTPConfig {

    @Value("${ftp.ip:ftp的ip}")
    private String ftpIp;

    @Value("${ftp.port:ftp端口,默认21}")
    private Integer ftpPort;

    @Value("${ftp.username:ftp创建的用户名}")
    private String username;

    @Value("${ftp.password:ftp创建的用户名密码}")
    private String password;
}

SendEmailApplicationTests

@Component
class SendEmailApplicationTests implements ApplicationRunner {
    @Autowired
    private FTPService ftpService;
    void ftpTest() {
        FTPClient ftpClient = ftpService.login();
        ftpService.addListenerFileChange(ftpFile -> {
            System.out.println(String.format("文件%s被改变了,文件改变类型%s", ftpFile.getFileName(), ftpFile.getEventType().getDesc()));
            InputStream inputStream = ftpClient.retrieveFileStream("/"+ ftpFile.getFileName());
            if(inputStream!=null){
                BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream,"GBK"));
                String s = null;
                List<String> listStr = new ArrayList<>();//读取的数据在listStr
                while ((s = reader.readLine()) != null) {
                    System.out.println("===================>" + s);
                    listStr.add(s);
                }
                //处理业务逻辑
                
                inputStream.close();
                reader.close();
                ftpClient.completePendingCommand();
            }
        });
    }

    @Override
    public void run(ApplicationArguments args) throws Exception {
        ftpTest();
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/125151.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【vue会员管理系统】篇五之系统首页布局和导航跳转

一、效果图 1.首页 2.会员管理&#xff0c;跳转&#xff0c;跳其他页面也是如此&#xff0c;该页的详细设计会在后面的章节完善 二、代码 新增文件 components下新增文件 view下新增文件&#xff1a; 1.componets下新建layout.vue 放入以下代码&#xff1a; <template…

学术论文的实证数据来源

一、引言 在当今的学术研究中&#xff0c;数据是至关重要的。无论是自然科学、社会科学还是人文科学&#xff0c;都需要借助数据来支撑和证明其研究假设和理论。然而&#xff0c;数据的来源却是多种多样的&#xff0c;而且不同的学科领域也有其特定的数据来源。本文旨在探讨论文…

30道高频Vue面试题快问快答

※其他的快问快答&#xff0c;看这里&#xff01; 10道高频Qiankun微前端面试题快问快答 10道高频webpack面试题快问快答 20道高频CSS面试题快问快答 20道高频JavaScript面试题快问快答 30道高频Vue面试题快问快答 面试中的快问快答 快问快答的情景在面试中非常常见。 在面试过…

linux 操作系统

先讲一下叭&#xff0c;自己学这的原因&#xff0c;是因为我在做项目的时候使用到啦Redis&#xff0c;其实在windows系统上我其实也装啦Redis上&#xff0c;但是我觉得后期在做其他的项目的时候可能也会用到这个然后就想着要不先学学redis&#xff0c;然后在后面也不至于什么都…

国标28181-2022检测内容GB28181-2022检测内容

目前国标28181-2022平台全项检测一共181项&#xff0c;总的检测相对2016版本要复杂很多&#xff0c;增加了一些比较重要的功能,下面列举下检测项(qq 123011785):

求臻医学MRD产品喜获北京市新技术新产品(服务)证书

近日&#xff0c;北京市科学技术委员会、中关村科技园区管理委员会、北京市发展和改革委员会等五大部门联合公示了2023年度第一批&#xff08;总第十八批&#xff09;北京市新技术新产品&#xff08;服务&#xff09;名单。凭借领先的技术能力、产品创新能力及质量可靠性等优势…

2023最新版本 FreeRTOS教程 -9-互斥量(基本使用和解决优先级反转)

互斥量是一种特殊的二进制信号量 使用场景1 &#xff08;互斥访问&#xff09; 外设的独立访问 如打印 协议操作 使用场景2 解决优先级反转 外设的独立访问 如打印 协议操作 使用场景2 解决优先级反转 我们以较为复杂的场景2来分析 -1- 创建三个任务 优先级从低到高&…

【教学类-40-03】A4骰子纸模制作3.0(6.5CM嵌套+记录表)

作品展示 背景需求 骰子2.0&#xff08;7字形&#xff09;存在幼儿不会“包边”的问题&#xff0c;求助老师帮忙示范&#xff0c;最后累的还是老师 1.0版本&#xff0c;边缘折线多&#xff0c;幼儿剪起来费力。 2.0版本&#xff0c;边缘折线多&#xff0c;幼儿剪起来费力。&a…

【ChatGLM2-6B】小白入门及Docker下部署

【ChatGLM2-6B】小白入门及Docker下部署 一、简介1、ChatGLM2是什么2、组成部分3、相关地址 二、基于Docker安装部署1、前提2、CentOS7安装NVIDIA显卡驱动1&#xff09;查看服务器版本及显卡信息2&#xff09;相关依赖安装3&#xff09;显卡驱动安装 2、 CentOS7安装NVIDIA-Doc…

“产业大数据”助推园区实现可持续发展!

​产业园区在现代经济体系中扮演着重要角色&#xff0c;不仅是地方经济的重要支柱&#xff0c;更是企业发展的舞台。产业园区要想实现可持续的长远发展&#xff0c;不仅需要不断的招引优质企业入驻&#xff0c;更要时刻关注园内的企业&#xff0c;培育有潜力的企业&#xff0c;…

华为OD机试 - 最优策略组合下的总的系统消耗资源数(Java 2023 B卷 100分)

目录 专栏导读一、题目描述二、输入描述三、输出描述四、解题思路五、Java算法源码六、效果展示1、输入2、输出3、说明4、思路 华为OD机试 2023B卷题库疯狂收录中&#xff0c;刷题点这里 专栏导读 本专栏收录于《华为OD机试&#xff08;JAVA&#xff09;真题&#xff08;A卷B卷…

FPGA时序分析与约束(10)——生成时钟

一、概述 最复杂的设计往往需要多个时钟来完成相应的功能。当设计中存在多个时钟的时候&#xff0c;它们需要相互协作或各司其职。异步时钟是不能共享确定相位关系的时钟信号&#xff0c;当多个时钟域交互时&#xff0c;设计中只有异步时钟很难满足建立和保持要求。我们将在后面…

如何改善食品饮料包装生产企业的OEE?

食品饮料这类商品在我们的日常生活中十分常见&#xff0c;它们存在于各类商店、超市或路边的小店里。而食品饮料的包装是吸引人们购买该产品的一个重要因素。为了在这个市场中脱颖而出并提高盈利能力&#xff0c;企业需要关注设备的综合效率&#xff0c;即OEE&#xff08;Overa…

数据结构-单链表-力扣题

移除链表元素 题目链接&#xff1a;力扣&#xff08;LeetCode&#xff09; 思路&#xff1a;和前面学的单链表的中间删除数据一样&#xff0c;使要被删除节点的前一个节点指向下要被删除节点的下一个节点&#xff0c;然后把要被删除的节点free掉。 具体实现过程&#xff1a;先…

docker搭建mysql环境

1. 基础环境 名称描述CentOS 7.6Linux操作系统版本docker 20.10.5docker版本mysql 8.0.29mysql镜像版本 2. 下载安装 使用docker命令下载mysql镜像 [rootzhouwei ~]# docker pull mysql:8.0.29查看docker仓库是否已经下载了mysql镜像 [rootzhouwei ~]# docker images将mys…

【PHP】医院HIS手术麻醉临床信息管理系统源码 实现术前、术中、术后全流程管理

手术麻醉系统是一套以数字形式与医院信息系统&#xff08;如HIS、EMR、LIS、PACS等&#xff09;和医疗设备等软、硬件集成并获取围手术期相关信息的计算机系统&#xff0c;其核心是对围手术期患者信息自动采集、储存、分析并呈现。该系统通过整合围手术期中病人信息、人员信息、…

最速下降法

目录 前言 一、梯度下降相关数学概念 二、最速下降法实战 2.1、例图1 2.2、Matlab代码实现 2.3、例题2 三、小结 前言 最速下降法&#xff0c;在SLAM中&#xff0c;作为一种很重要求解位姿最优值的方法&#xff0c;缺点很明显&#xff1a;迭代次数太多&#xff0c…

Linux笔记——Ubuntu子系统从系统盘迁移到非系统盘

Linux笔记——Ubuntu子系统从系统盘迁移到非系统盘 一、子系统迁移1. 关闭linux子系统2. 使用move-wsl进行迁移 二、 虚拟机子系统瘦身 安了子系统还没用几天&#xff0c;C盘提示我没空间了。。。剩余0kb的那种。。。Ubuntu安装的时候默认按C盘了&#xff0c;所以还是移走腾点地…

现货白银的代码为什么不是ag

如果大家对求学时期所学的化学知识还记忆犹新&#xff0c;应该记得白银这种物质的化学元素符号是ag&#xff0c;但在参与伦敦银交易的时候&#xff0c;大家也许会发现&#xff0c;在大多数平台的交易软件中&#xff0c;它的代码并没有使用到这个简写符号。 其实在国际现货贵金属…

Git查询某次提交属于哪个分支

在Android studio&#xff08;JetBrains系列也类似&#xff09;左下角&#xff0c;可以看到所有提交信息。 选中某一次提交信息&#xff0c;右键&#xff0c;选择“Copy Revision Number”&#xff0c;如下图&#xff1a; 打开Android studio的Terminal&#xff0c;输入git b…