下载中心
文章目录
- 下载中心
- 一. 概要
- 二. 实现逻辑
- 下载中心
- 一. 概要
- 二. 实现逻辑
- 三. 主要代码逻辑
- 1.生成任务
- 2.消费任务
- 3.查询方法是如何存入内存中的
- 4.DCGenerateComponent 反射调用查询数据方法
- 总结
一. 概要
功能概览:将文件下载修改为异步下载,引入mq队列进行削峰解耦
整体步骤:
- 请求后端接口
- 将需要下载的任务以数据的方式存储在数据库中
- 将任务编号发往对应的mq队列中等待消费
- mq消费生成文件
- mq根据对应任务id,查询出任务具体参数
- 利用具体的任务参数查询出对应的分页数据
- 生成对应的文件,上传oss
- 在下载中心进行文件下载
二. 实现逻辑
下载中心
文章目录
- 下载中心
- 一. 概要
- 二. 实现逻辑
- 下载中心
- 一. 概要
- 二. 实现逻辑
- 三. 主要代码逻辑
- 1.生成任务
- 2.消费任务
- 3.查询方法是如何存入内存中的
- 4.DCGenerateComponent 反射调用查询数据方法
- 总结
一. 概要
功能概览:将文件下载修改为异步下载,引入mq队列进行削峰解耦
整体步骤:
- 请求后端接口
- 将需要下载的任务以数据的方式存储在数据库中
- 将任务编号发往对应的mq队列中等待消费
- mq消费生成文件
- mq根据对应任务id,查询出任务具体参数
- 利用具体的任务参数查询出对应的分页数据
- 生成对应的文件,上传oss
- 在下载中心进行文件下载
二. 实现逻辑
-
流程图
-
泳道图
三. 主要代码逻辑
1.生成任务
-
DownloadCenterComponent
接口中定义了addMsg(发送消息)
方法/** * 新版的下载中心发送消息的接口 * 在消费时,通过下载类型,拿到查询数据的方法,执行并分页查询数据 * * @param downloadKey 下载的key * @param fileNameItems 替换文件名字中占位符的参数 例子在数据库中配置了filename * filename字段并不是一个完整的文件名,需要拼接上文件的后缀名和替换占位符 * 例子:filename = "销售单数据_%s_%s", * 那么fileNameItems就传两个字符串--->fileNameItems[2]=["20220504","BuyerName"] * fileType = "csv" * 最后组成的完整的文件名字 : 销售单数据_20220504_BuyerName.csv * (PS:如果单个文件超过最大行数,那么文件名就会变成 * 销售单数据_20220504_BuyerName_1.csv) * 销售单数据_20220504_BuyerName_2.csv) * 销售单数据_20220504_BuyerName_3.csv * 最后打成一个压缩包 销售单数据_20220504_BuyerName.zip) * @param <T> query参数的类型 */ <T extends PageQuery> void addMsg(String downloadKey, T query, String... fileNameItems); <T extends PageQuery> void addMsg(DownloadCenterFileConfigEnum downloadKey, T query, String... fileNameItems);
DownloadCenterComponentImpl
实现addMsg
@Override @Transactional(rollbackFor = Exception.class) public <T extends PageQuery> void addMsg(String downloadKey, T query, String... fileNameItems) { //判断下载的key是否是合法的,是否包含在我们定义的枚举中 boolean flag = DownloadCenterFileConfigEnum.checkDownloadKey(downloadKey); if (!flag) { throw new OristandException("下载类型不存在"); } Integer userId = getCurrUserId(query); //根据key去查找对应的配置 tb_download_file_config /*DownloadFileConfigEntity中包含filename(文件名称),fileType(csv还是xlsx),queueType(快慢队列,SLOW、FAST)以及行数限制等(默认单次最大行为50000,单sheet 20w行) */ DownloadFileConfigEntity config = iDownloadFileConfigService.getByDownloadKey(downloadKey); String filename = config.getFilename(); String fileType = config.getFileType(); //构建任务体 DownloadListDO downloadListDo = DownloadListDO.builder() .userId(userId) .type(-1) .downloadKey(downloadKey) .queryParam(JSON.toJSONString(query, SerializerFeature.WriteMapNullValue)) .fileName(getWholeFileName(filename, fileType, fileNameItems)) .progress(new BigDecimal(REQUEST_DOWNLOAD_PROGRESS)) .downloadVersion("2.0") .build(); //根据配置选择快慢队列的枚举 DownloadCenterQueueTypeEnum queueType = DownloadCenterQueueTypeEnum.getTypeByCode(config.getQueueType()); //根据快慢队列选择对应的routingKey和Virtual host(虚拟分组) String routingKey = downLoadMQConfig.getRoutingKeyByQueueType(queueType); String vhost = downLoadMQConfig.getVhostByQueueType(queueType); saveAndSendMsg(downloadListDo, routingKey, vhost); } //保存并发送消息到队列 private void saveAndSendMsg(DownloadListDO downloadListDo, String routingKey, String vhost) { //后台文件下载列表保存对应文件数据 tb_download_list downloadCenterService.save(downloadListDo); //将文件列表主键和mq队列等信息发送到队列 rabbitmqSendService.addMsg( RabbitMqConfig.ExchangeEnum.DEFAULT_DIRECT_EXCHANGE.getCode(), routingKey, vhost, JSON.toJSONString(downloadListDo.getId()) ); }
2.消费任务
-
DownloadCenterComponent
接口中定义了downloadConsumer(发送消息)
方法/** * 真正的消费接口 * * @param downloadId tb_download_list表的主键 */ void downloadConsumer(Integer downloadId);
DownloadCenterComponentImpl
实现downloadConsumer
@Override public void downloadConsumer(Integer downloadId) { //根据id查到downloadId tb_download_list表中的数据 DownloadListDO downloadListDO = downloadCenterService.getById(downloadId); String fileName = downloadListDO.getFileName(); // 下载类型 String downloadKey = downloadListDO.getDownloadKey(); // 获取配置此类任务的配置 DownloadFileConfigEntity config = iDownloadFileConfigService.getByDownloadKey(downloadKey); //组装文件的名称,如替换%s这种 fileName = getWholeFileName(fileName.substring(0, fileName.lastIndexOf(StrUtil.DOT)), config.getFileType()); //判断文件是否是xlsx或者cvs类型,不是则抛出异常 throw new OristandException("该文件格式暂不支持生成"); checkConfig(config); // 获取当时任务的下载查询参数 当时存入的时候是json String queryParam = downloadListDO.getQueryParam(); // 查询的方法,这个方法是缓存在内存中的,想bean一样,一直在内存中 DownloadMethodDTO downloadMethod = downloadCenterDataSource.getDownloadMethod(downloadKey); // 根据方法参数的类型 Class<?>[] queryTypes = downloadMethod.getQueryTypes(); // 查询参数查询参数一定继承PageQuery PageQuery queryObj = null; if (queryTypes.length != 0) { Class<?> pageQueryClazz = null; for (Class<?> queryType : queryTypes) { //判断PageQuery是否是queryType他的一个父类或父接口,或是否是同一个类或同一个接口 if (PageQuery.class.isAssignableFrom(queryType)) { pageQueryClazz = queryType; } } //断言不为空 assert pageQueryClazz != null; //将查询参数由json转回对象 queryObj = (PageQuery) JSON.parseObject(queryParam, pageQueryClazz); } assert queryObj != null; // 这个是查询方法的返回类型,需要根据这个类型来分情况处理 Class<?> clazz = downloadMethod.getDownloadObjClazz(); String fileType = config.getFileType(); Integer queryRow = Math.min(config.getQueryRow(), config.getSheetRow()); Integer sheetRow = Math.max(config.getQueryRow(), config.getSheetRow()); // 处理默认情况 if (queryRow < 0) { queryRow = tempFileConfig.getBatchSize(); } // 如果查询的结果是字节流,就单独处理,否则他的返回结果一定可以得到一个list // 判断返回值是否是字节流 int page = 1; int limit = queryRow; //判断是否字节流 if (clazz.isAssignableFrom(ByteArrayOutputStream.class)) { limit = sheetRow; } // 处理各种返回类型,目前有这几种 // PageBean 旧的 // LayTableData 旧的 // List<?> 旧的 // List<Object[]> 目前就只有这几种类型,这种情况特殊,表头是数据库里面配置的。 List<ByteArrayOutputStream> bosList = new ArrayList<>(); boolean isGo; //判断xlsx处理方式还是csv处理方式 DCGenerateComponent generateComponent = downloadCenterGenerateFileComponent.chooseFileType(fileType); do { DownloadCenterDTO dto = new DownloadCenterDTO(); // 设置参数 queryObj.setPage(page); queryObj.setLimit(limit); // 设置downloadKey dto.setDownloadKey(downloadKey); // 设置分页查询参数 dto.setQuery(queryObj); // 设置查询总量 dto.setQueryTotal(sheetRow); dto.setSheetName(fileName.substring(0, fileName.lastIndexOf(StrUtil.DOT))); //根据不同文件类型进入一些模板基础处理,dto里面有查询的参数 DownloadCenterPage outputStreamInfo = generateComponent.getOutputStreamInfo(dto); boolean end = outputStreamInfo.isEnd(); ByteArrayOutputStream bos = outputStreamInfo.getBos(); if (bos != null && bos.toByteArray().length != 0) { // 满足两个条件,bos不等于null,字节数不等于0 bosList.add(bos); } isGo = !end; page += (sheetRow / queryRow); // 设置序号 dto.setSerialNumber((page - 1) * limit + 1); } while (isGo); //保证事务生效 DownloadCenterComponent downloadCenterComponent = (DownloadCenterComponent) AopContext.currentProxy(); try { //将流文件上传到oss中,并修改了任务状态 downloadCenterComponent.uploadFileToOss(bosList, downloadListDO); } catch (IOException e) { log.error("下载中心上传oss失败", e); throw new OristandException("下载中心上传oss失败", e); } }
-
上传oss,并修改状态
downloadCenterComponent.uploadFileToOss(bosList, downloadListDO);
@Override @Transactional(rollbackFor = Exception.class) public void uploadFileToOss(List<ByteArrayOutputStream> osList, DownloadListDO downloadListDO) throws IOException { String fileName = downloadListDO.getFileName(); String downloadKey = downloadListDO.getDownloadKey(); // 获取配置 DownloadFileConfigEntity config = iDownloadFileConfigService.getByDownloadKey(downloadKey); fileName = getWholeFileName(fileName.substring(0, fileName.lastIndexOf(StrUtil.DOT)), config.getFileType()); OssFileModel model; BigDecimal fileSize = BigDecimal.ZERO; if (osList.size() == 1) { ByteArrayOutputStream bos = osList.get(0); InputStream is = new ByteArrayInputStream(bos.toByteArray()); fileSize = new BigDecimal(is.available()).divide(new BigDecimal(ONE_ZERO_TWO_FOUR), 2, RoundingMode.HALF_UP); model = uploadFileToOss(is, fileName); } else { String zipPath = goZip(osList, fileName); Path path = Paths.get(zipPath); try (InputStream is = Files.newInputStream(path)) { fileSize = new BigDecimal(is.available()).divide(new BigDecimal(ONE_ZERO_TWO_FOUR), 2, RoundingMode.HALF_UP); model = uploadFileToOss(is, changeFileNameToZip(fileName)); } catch (IOException e) { log.error("下载中心读取临时压缩文件失败", e); throw new OristandException(e); } finally { Files.deleteIfExists(path); } } // 文件下载完,如果状态是已取消,将文件状态改为取消,进度条回退到50% Integer downloadStatus = DownloadStatusEnum.GENERATED.getCode(); BigDecimal progress = new BigDecimal(COMPLETE_DOWNLOAD_PROGRESS); DownloadListDO temp = downloadCenterService.getById(downloadListDO.getId()); if (temp != null && DownloadStatusEnum.CANCELED.getCode().equals(temp.getStatus())) { downloadStatus = DownloadStatusEnum.CANCELED.getCode(); progress = new BigDecimal(REQUEST_DOWNLOAD_PROGRESS); } // 得到OssModel 更新数据 Integer id = downloadListDO.getId(); downloadCenterService.update( Wrappers.lambdaUpdate(DownloadListDO.class) .eq(DownloadListDO::getId, id) .set(DownloadListDO::getStatus, downloadStatus) .set(DownloadListDO::getFileName, model.getFileName()) .set(DownloadListDO::getProgress, progress) .set(DownloadListDO::getFileSavePath, model.getOssKey()) .set(DownloadListDO::getFileSize, fileSize) ); }
3.查询方法是如何存入内存中的
DownLoadCenterAnno
注解
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
@Documented
public @interface DownLoadCenterAnno {
/**
* 下载类型的枚举
*/
DownloadCenterFileConfigEnum downloadKey();
/**
* 你输出的对象叫是什么类型的
*/
Class<?> downloadObj() default Object.class;
/**
* 兼容一些序号要写到文件里面的,就是上面类里面的字段
*/
String serialNumberField() default StrUtil.EMPTY;
}
//使用方式 downloadObj 会在bean加载后将这个对象放到内存中记录
@DownLoadCenterAnno(
downloadKey = DownloadCenterFileConfigEnum.LOGISTICS_CLAIMS_TOTAL_PAGES,
downloadObj = ClaimsApplyVO.class)
public PageBean<ClaimsApplyVO> listClaimsApplyList(ClaimsApplyQuery query) {
....
}
枚举 DownloadCenterFileConfigEnum
/**
* 数据库中所有的download_key都配在这边,避免直接写魔法值
* 规范:下划线分隔,包装key唯一
*
* @author super syj
*/
@AllArgsConstructor
@Getter
public enum DownloadCenterFileConfigEnum {
LOGISTICS_CLAIMS_TOTAL_PAGES("logistics_claims_total_pages"),
;
private final String downloadKey;
/**
* 校验下载的key是否合法
*
* @param downloadKey downloadKey
* @return 返回true,true合法,false不合法
*/
public static boolean checkDownloadKey(String downloadKey) {
return Arrays.stream(values())
.map(DownloadCenterFileConfigEnum::getDownloadKey)
.collect(Collectors.toList())
.contains(downloadKey);
}
}
DownloadCenterDataSourceImpl实现了BeanPostProcessor
接口的postProcessAfterInitialization
方法,这个方法会在每个bean对象的初始化方法调用之后被回调。
@Slf4j
@Component
public class DownloadCenterDataSourceImpl implements DownloadCenterDataSource, BeanPostProcessor {
/**
* 线程安全,保证key唯一
*/
private final static Map<String, DownloadMethodDTO> METHOD_MAP = new ConcurrentHashMap<>();
@Override
public Object postProcessAfterInitialization(Object bean, @NotNull String beanName) throws BeansException {
//获取所有非继承方法
Method[] declaredMethods = bean.getClass().getDeclaredMethods();
List<Method> methods = Arrays.stream(declaredMethods).filter(method -> {
DownLoadCenterAnno anno = AnnotationUtils.findAnnotation(method, DownLoadCenterAnno.class);
return Objects.nonNull(anno);
}).collect(Collectors.toList());
for (Method method : methods) {
//查找对应注解中的信息
DownLoadCenterAnno anno = AnnotationUtils.findAnnotation(method, DownLoadCenterAnno.class);
assert anno != null;
DownloadCenterFileConfigEnum configEnum = anno.downloadKey();
if (configEnum == null) {
continue;
}
String key = configEnum.getDownloadKey();
Class<?>[] queryTypes = method.getParameterTypes();
//封装DownloadMethodDTO 对象DTO存放在map中,key为枚举的DownloadKey
DownloadMethodDTO methodDTO = DownloadMethodDTO.builder()
.beanClazz(bean.getClass())
.beanName(beanName)
.method(method)
.downloadObjClazz(anno.downloadObj())
.queryTypes(queryTypes)
.serialNumberField(anno.serialNumberField())
.build();
METHOD_MAP.putIfAbsent(key, methodDTO);
}
return BeanPostProcessor.super.postProcessAfterInitialization(bean, beanName);
}
@Override
public DownloadMethodDTO getDownloadMethod(String downloadKey) {
return METHOD_MAP.get(downloadKey);
}
}
-
初始化后Map内数据,根据downloadKey取出对应的方法
4.DCGenerateComponent 反射调用查询数据方法
-
ExcelDCGenerateComponentImpl
实现getOutputStreamInfo
@Override public DownloadCenterPage getOutputStreamInfo(DownloadCenterDTO dto) { String downloadKey = dto.getDownloadKey(); //获取查询方法 DownloadMethodDTO downloadMethod = downloadCenterDataSource.getDownloadMethod(downloadKey); //查询任务配置 DownloadFileConfigEntity config = iDownloadFileConfigService.getByDownloadKey(downloadKey); String fileHeaders = config.getFileHeaders(); Method method = downloadMethod.getMethod(); Class<?> head = downloadMethod.getDownloadObjClazz(); // 下载方法所在的bean Object bean = SpringContextHolder.getBean(downloadMethod.getBeanClazz()); PageQuery query = dto.getQuery(); String lan = query.getLan(); boolean isGo = true; ByteArrayOutputStream bos = new ByteArrayOutputStream(); ExcelWriter writer = EasyExcel.write(bos) // 默认样式 .registerWriteHandler(ExcelUtil.createDefaultHeadStyle()) // 中英文转换 .registerWriteHandler(new HeadZhEnAdaptiveHandler(lan)) // 列宽自动 .registerWriteHandler(new CustomCellWriteWeightConfig()) // 时间转换 .registerConverter(new LocalDateConverter()) .registerConverter(new LocalDateTimeConverter()) .build(); ExcelWriterSheetBuilder sheetBuilder = EasyExcel.writerSheet().sheetNo(0).sheetName(dto.getSheetName()); if (head.isAssignableFrom(Object[].class)) { sheetBuilder.head(changeFileHeaders(fileHeaders)); } else { sheetBuilder.head(head); } WriteSheet writeSheet = sheetBuilder.build(); // 已经查询的数量 long alreadyQueriedNum = 0; // 需要查询的总量,如果已经查询的数量,大于需要查询的总量,就跳出循环 long queryTotal = dto.getQueryTotal(); // 标记变量,是否全部结束 boolean isEnd = false; Integer page = query.getPage(); Integer limit = query.getLimit(); do { Object[] funObj = new Object[]{query}; Object result; List<?> data = null; try { result = method.invoke(bean, funObj); } catch (IllegalAccessException | InvocationTargetException e) { log.error("excel,下载中心执行查询方法失败,可能是查询方法本身报错,invoke方法出现异常时,不会把反射方法的异常抛出来", e); throw new OristandException(e); } if (result instanceof LayTableData<?>) { data = layTableDataFun.apply((LayTableData<?>) result); } if (result instanceof PageBean<?>) { data = pageBeanFun.apply((PageBean<?>) result); } if (result instanceof List<?>) { data = listFun.apply((List<?>) result); } //设置序号 DownloadCenterUtil.dealWithListData(data, head, dto.getSerialNumber(), downloadMethod.getSerialNumberField()); data = CollUtil.isEmpty(data) ? Collections.emptyList() : data; int queryNum = data.size(); dto.setSerialNumber(dto.getSerialNumber() + queryNum); if (queryNum != 0 || page == 1) { if (head.isAssignableFrom(Object[].class)) { List<List<Object>> finalData = data.stream() .filter(Objects::nonNull) .map((Function<Object, List<Object>>) o -> Arrays.asList((Object[]) o)) .collect(Collectors.toList()); writer.write(finalData, writeSheet); } else { writer.write(data, writeSheet); } } query.setPage(++page); query.setLimit(limit); alreadyQueriedNum += queryNum; if (alreadyQueriedNum >= queryTotal) { isGo = false; } if (CollUtil.isEmpty(data) || !Objects.equals(query.getLimit(), data.size())) { isEnd = true; isGo = false; } } while (isGo); if (alreadyQueriedNum != 0 || (page - 1) == 1) { writer.finish(); } return DownloadCenterPage.builder() .bos(bos) .isEnd(isEnd) .build(); }
总结
核心模块在【manage处理下载】这一部分:
-
根据 downloadId 获取下载的所有信息。
-
根据downloadKey从数据源池子【Map】中获取【DownLoadCenterFunc】查询数据的方法
-
获取参数,设置分页,把参数填入 goQuery 方法,获取PageBean
-
把数据写入文件
-
判断文件中已经写入的行数,如大于200000,重新创建一个excel
-
判断数据是否全部查询完毕,全部查询完毕顺序执行 ,没有查完接着循环查询数据,回到 3
-
获得所有创建的excel或者csv集合
-
打包成压缩包
-
上传oss
-
关闭流
-
把oss返回的文件路径存表,并修改状态为已完成