一、前言
-
假设在两个局域网中,生产者和消费者进行通信
-
使用同步方式,mq偶尔会因为网络策略等问题导致消息发送失败,那么这条数据就丢失了
-
这时可以使用异步方式,将数据在生产端存一份,网通时发,网断时存
二、实现思路
-
将数据进行加密,写入json文件,并存入指定目录
-
监听目录,当文件生成时,获取文件内容
-
将文件内容通过rabbitMQ转发,并删除文件
-
若mq转发过程中出现异常,捕获异常类型进行判断:
- 若为ResourceAccessException(访问网络资源异常)或AmqpConnectException(连接到AMQP服务异常),则判定为通信异常,将文件存入repeat(临时目录),等待后续发送
- 若为其他异常,则说明为数据或程序本身问题,存入error目录,作为错误记录,后续不发送
-
通过定时任务,将repeat目录中的数据定时转发
三、代码实现
- 配置文件
# 异步数据文件转发,失败文件重发时间设置,cron表达式
transmit.dataforwarding-resend=0 0/1 * * * ?
#文件本地存储地址(文件摆渡/文件生成地址。地址最后不带/)
file.local.sync.path=${user.dir}
- 数据加密,并写入json文件
public class MsgTransmit{
@Value("${file.local.sync.path}")
private String failFilePath;
@Override
public void saveLocal(Object msg) {
// 进行消息加密
String msgJson = Encryptor.create().signatureEncrypt(msg);
// 拼接文件目录
String filePath = String.format("%s%s_%s.json", failFilePath, File.separator,
String.valueOf(Instant.now().toEpochMilli()), new Date()));
try (PrintWriter writer = new PrintWriter(new FileWriter(filePath))) {
// 将加密数据写入文件
writer.print(msgJson);
} catch (IOException e) {
log.error("error: {}", e);
}
}
}
- 线程池配置(配置,无需改动,可替代)
@EnableAsync
@Configuration
public class ExecutorConfig {
@Bean("executor")
public Executor asyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(4);
executor.setMaxPoolSize(8);
executor.setQueueCapacity(200);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("async-executor-");
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
@Bean
public ExecutorService executorService() {
return TtlExecutors.getTtlExecutorService(Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2));
}
}
- 文件监听逻辑(包含文件监听、mq消息转发、异常文件存储、异常文件数据定时转发)
- 开启定时任务记得启动类加@EnableScheduling
@Slf4j
@Component
@ConditionalOnProperty(name = "transmit.type", havingValue = "2")
public class FileListener extends FileAlterationListenerAdaptor {
@Resource
private ExecutorService executorService;
@Resource
private RabbitTemplate rabbitTemplate;
@Value("${file.local.sync.path}")
private String failFilePath;
/**
* 文件修改
*
* @param file
*/
@Override
public void onFileChange(File file) {
log.info("文件监听目录[修改]:" + file.getAbsolutePath());
handleFile(file);
}
/**
* 文件创建
*
* @param file
*/
@Override
public void onFileCreate(File file) {
log.info("文件监听目录[新建]:{}", file.getAbsolutePath());
handleFile(file);
}
/**
* 目录删除
*
* @param file
*/
@Override
public void onDirectoryDelete(File file) {
log.info("文件监听[目录删除]:{}", file.getAbsolutePath());
}
/**
* 处理文件
*
* @param file 文件
*/
private void handleFile(File file) {
executorService.execute(() -> {
String path = file.getAbsolutePath();
try {
// rabbitMq发送消息,记得自定义mq队列
rabbitTemplate.convertAndSend("your MQ queue", new String(Files.readAllBytes(Paths.get(path))));
log.info("文件转发MQ成功:{}", path);
// 删除发送成功的文件
boolean del = new File(path).delete();
} catch (Exception e) {
if (file.exists()) {
// 遇到mq连接等问题,存入重发文件存储目录。其他问题,存入失败文件存储目录
String sourcePath;
if (e instanceof ResourceAccessException || e instanceof AmqpConnectException) {
sourcePath = String.format("%s/%s", this.getTmpRepeatDir(), file.getName());
} else {
sourcePath = String.format("%s/%s", this.getTmpErrorDir(), file.getName());
}
file.renameTo(new File(sourcePath));
}
log.error("文件解析异常, path:{}, error:{}.", file.getAbsolutePath(), e.getMessage());
}
});
}
/**
* 重发临时文件存储目录
*
* @return
*/
private String getTmpRepeatDir() {
return String.format("%s/%s", failFilePath + "/tempMsg", "repeat");
}
/**
* 失败临时文件存储目录
*
* @return
*/
private String getTmpErrorDir() {
return String.format("%s/%s", failFilePath + "/tempMsg", "error");
}
/**
* 文件重发定时任务逻辑
*/
@Scheduled(cron = "${transmit.dataforwarding-resend}")
public void resendTask() {
File connect = new File(this.getTmpRepeatDir());
// 获取目录中的所有文件和子目录
File[] files = connect.listFiles();
for (File file : files) {
if (file.isFile() && file.getName().endsWith(".json")) {
log.info("文件重发, path:{}", file.getAbsolutePath());
handleFile(file);
}
}
}
}
- 开启文件监听
@Slf4j
@Configuration
@ConditionalOnProperty(name = "transmit.type", havingValue = "2")
public class ApplicationRunner implements CommandLineRunner {
@Resource
private FileListener fileListener;
@Value("${file.local.sync.path}")
private String failFilePath;
@Override
public void run(String... args) {
this.createFileListener(failFilePath + "/tempMsg");
long newLastModifiedTime = System.currentTimeMillis();
File rootPath = new File(failFilePath + "/tempMsg");
// 获取目录中的所有文件和子目录
File[] files = rootPath.listFiles();
for (File file : files) {
if (file.isFile() && file.getName().endsWith(".json")) {
file.setLastModified(newLastModifiedTime);
}
}
}
/**
* 创建文件夹监听
*
* @param rootDir 监听目录
*/
private void createFileListener(String rootDir) {
File input = new File(rootDir);
if (!input.exists()) {
log.error("【文件夹监听】 目录不存在,path:【{}】", rootDir);
input.mkdirs();
}
File errorDirectory = new File(String.format("%s/%s", failFilePath + "/tempMsg", "/error"));
if (!errorDirectory.exists()) {
errorDirectory.mkdirs();
}
File connectDirectory = new File(String.format("%s/%s", failFilePath + "/tempMsg", "/repeat"));
if (!connectDirectory.exists()) {
connectDirectory.mkdirs();
}
try {
FileAlterationObserver observer = new FileAlterationObserver(new File(rootDir));
observer.addListener(fileListener);
// 轮询间隔 3 秒
long interval = TimeUnit.SECONDS.toMillis(3);
//创建文件变化监听器(默认为1000毫秒执行一次扫描)
FileAlterationMonitor monitor = new FileAlterationMonitor(interval, observer);
// 开始监控
monitor.start();
log.info("【文件夹监听】 开始监听{}文件变化--------", rootDir);
} catch (Exception e) {
log.error("【文件夹监听】 初始化失败.");
}
}
}
四、测试
- 接口调用,断点打到消费文件前,可以看到文件生成到指定目录
- 放开断点,文件消费,mq转发数据
注:该代码为实现最近某需求:互联网与某局域网通信,而mq偶发歇逼想出。
各位大佬有好想法欢迎指出。