智能BI(后端)-- 系统异步化

文章目录

  • 系统问题分析
  • 什么是异步化?
  • 业务流程分析
    • 标准异步化的业务流程
    • 系统业务流程
  • 线程池
    • 为什么需要线程池?
    • 线程池两种实现方式
    • 线程池的参数
    • 线程池的开发
  • 项目异步化改造

系统问题分析

问题场景:调用的服务能力有限,或者接口的处理(或返回)时长较长时,就应该考虑异步化了

什么是异步化?

不用等一件事做完,就可以做另外一件事,等第一件事完成时,可以收到一个通知

业务流程分析

标准异步化的业务流程

  1. 当用户要进行耗时很长的操作时,点击提交后,不需要在界面空等,而是应该把这个任务保存到数据库中记录下来
  2. 用户要执行新任务时:
    a. 任务提交成功:
    ⅰ. 若程序存在空闲线程,可以立即执行此任务
    ⅱ. 若所有线程均繁忙,任务将入队列等待处理
    b. 任务提交失败:比如所有线程都在忙碌且任务队列满了
    ⅰ.选择拒绝此任务,不再执行
    ⅱ.通过查阅数据库记录,发现提交失败的任务,并在程序空闲时将这些任务取出执行
  3. 程序(线程)从任务队列中取出任务依次执行,每完成一项任务,就更新任务状态。
  4. 用户可以查询任务的执行状态,或者在任务执行成功或失败时接收通知(例如:发邮件、系统消息提示或短信),从而优化体验
  5. 对于复杂且包含多个环节的任务,在每个小任务完成时,要在程序(数据库中))记录任务的执行状态(进度)。

系统业务流程

  1. 用户点击智能分析页提交按钮时,先把图表立刻保存到数据库中(作为一个任务)
  2. 用户可以在图表管理查看所有图表(已生成的,生成中的,生成失败的)的信息和状态
  3. 用户可以修改生成失败的图表信息,点击重新生成,以尝试再次创建图表
    在这里插入图片描述

问题分析?

  1. 任务队列的最大容量应该设置为多少
  2. 程序怎么从任务队列中取出任务去执行?这个任务队列的流程怎么实现?怎么保证程序最多同时执行多少个任务?

线程池实现

线程池

为什么需要线程池?

  1. 线程的管理比较复杂
  2. 任务存取比较复杂
  3. 线程池可以帮你轻松管理线程,协调任务的执行过程

线程池两种实现方式

  1. Spring中,可以用ThreadPoolTaskExrcutor配合@Async注解来实现(不推荐)
  2. 在Java中,可以使用JUC并发编程包中的ThreadPoolExecutor来实现非常灵活地自定义线程池

线程池的参数

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {

现状:AI生成能力的并发是只允许4个任务同时去执行,AI能力允许20个任务排队
corePoolSize(核心线程数):正常情况下,我们的系统应该能同时工作的线程数
maximumPoolSize(最大线程数):极限情况下,我们的线程池所拥有的线程
keepAliveTime(空闲线程存活时间):非核心线程在没有任务的情况下,过多久要删除,从而释放无用的线程资源
unit(空闲线程存活时间的单位):分钟,秒
workQueue(工作队列):用于存放给线程执行的任务,存在一个队列的长度(一定要设置)
threadFactory(线程工厂):控制每个线程的生成,线程的属性
RejectedExecutionHandler(拒绝策略):任务队列满的时候,我们采取什么措施

线程池的开发

自定义线程池配置

package com.yupi.springbootinit.config;

import org.jetbrains.annotations.NotNull;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

@Configuration
public class ThreadPoolExecutorConfig {
    @Bean
    public ThreadPoolExecutor threadPoolExecutor(){
        // 创建一个线程工厂
        ThreadFactory threadFactory = new ThreadFactory(){
            // 初始化线程数为 1
            private int count = 1;
            // 创建一个新的线程
            @Override
            // 每当线程池需要创建新线程时,就会调用newThread方法
            // @NotNull Runnable r 表示方法参数 r 应该永远不为null,
            public Thread newThread(@NotNull Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("线程" + count ++);
                return thread;
            }
        };
        // 创建一个新的线程池,线程池核心大小为2,最大线程数为4,
        // 非核心线程空闲时间为100秒,任务队列为阻塞队列,长度为4,使用自定义的线程工厂创建线
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2,4,100, TimeUnit.SECONDS,new ArrayBlockingQueue<>(4),threadFactory);
        return threadPoolExecutor;
    }
}

测试controller层(注意线上环境不要暴露出去)

package com.yupi.springbootinit.controller;

import cn.hutool.json.JSONUtil;
import io.netty.handler.codec.serialization.ObjectEncoder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Profile;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * 队列测试controller
 */
@RestController
@RequestMapping("/queue")
@Slf4j
@Profile({"dev","local"}) // 只在开发环境和本地环境生效
public class QueueController {


    @Resource
    private ThreadPoolExecutor threadPoolExecutor;

    @GetMapping("/add")
    // 接收一个参数name,然后将任务添加到线程池中
    public void add(String name){
    // 使用CompletableFuture运行一个异步任务
        CompletableFuture.runAsync(()->{

            log.info("任务执行中:" + name + "执行人:" + Thread.currentThread().getName());
            try {
                // 让线程休眠10分钟,模拟长时间运行的任务
                Thread.sleep(600000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
             异步任务在threadPoolExecutor中执行
        },threadPoolExecutor);
    }


    @GetMapping("/get")
    public String  get(){
        Map<String, Object> map = new HashMap<>();
        int size = threadPoolExecutor.getQueue().size();
        map.put("队列长度",size);
        long taskCount = threadPoolExecutor.getTaskCount();
        map.put("任务总数",taskCount);
        long completedTaskCount = threadPoolExecutor.getCompletedTaskCount();
        map.put("已完成的任务总数",completedTaskCount);
        int activeCount = threadPoolExecutor.getActiveCount();
        map.put("正在工作的线程数",activeCount);
        return JSONUtil.toJsonStr(map);
    }
}

项目异步化改造

/**
 * 智能分析(异步)
 *
 * @param multipartFile
 * @param genChartByAiRequest
 * @param request
 * @return
 */
@PostMapping("/gen/async")
public BaseResponse<BiResponse> genChartByAiAsync(@RequestPart("file") MultipartFile multipartFile,
                                                  GenChartByAiRequest genChartByAiRequest, HttpServletRequest request) {
    String name = genChartByAiRequest.getName();
    String goal = genChartByAiRequest.getGoal();
    String chartType = genChartByAiRequest.getChartType();

    // 校验
    ThrowUtils.throwIf(StringUtils.isBlank(goal), ErrorCode.PARAMS_ERROR, "目标为空");
    ThrowUtils.throwIf(StringUtils.isNotBlank(name) && name.length() > 100, ErrorCode.PARAMS_ERROR, "名称过长");

    // 校验文件
    long size = multipartFile.getSize();
    String originalFilename = multipartFile.getOriginalFilename();

    // 校验文件大小
    final long ONE_MB = 1024 * 1024L;
    ThrowUtils.throwIf(size > ONE_MB, ErrorCode.PARAMS_ERROR, "文件超过 1M");

    // 校验文件大小缀 aaa.png
    String suffix = FileUtil.getSuffix(originalFilename);
    final List<String> validFileSuffixList = Arrays.asList("xlsx", "xls");
    ThrowUtils.throwIf(!validFileSuffixList.contains(suffix), ErrorCode.PARAMS_ERROR, "文件后缀非法");

    User loginUser = userService.getLoginUser(request);

    // 限流判断,每个用户一个限流器
    redisLimiterManager.doRateLimit("genChartByAi_" + loginUser.getId());

    // 指定一个模型id(把id写死,也可以定义成一个常量)
    long biModelId = 1659171950288818178L;
    // 分析需求:
    // 分析网站用户的增长情况
    // 原始数据:
    // 日期,用户数
    // 1号,10
    // 2号,20
    // 3号,30

    // 构造用户输入
    StringBuilder userInput = new StringBuilder();
    userInput.append("分析需求:").append("\n");

    // 拼接分析目标
    String userGoal = goal;
    if (StringUtils.isNotBlank(chartType)) {
        userGoal += ",请使用" + chartType;
    }
    userInput.append(userGoal).append("\n");
    userInput.append("原始数据:").append("\n");
    // 压缩后的数据
    String csvData = ExcelUtils.excelToCsv(multipartFile);
    userInput.append(csvData).append("\n");

    // 先把图表保存到数据库中
    Chart chart = new Chart();
    chart.setName(name);
    chart.setGoal(goal);
    chart.setChartData(csvData);
    chart.setChartType(chartType);
    // 插入数据库时,还没生成结束,把生成结果都去掉
//        chart.setGenChart(genChart);
//        chart.setGenResult(genResult);
    // 设置任务状态为排队中
    chart.setStatus("wait");
    chart.setUserId(loginUser.getId());
    boolean saveResult = chartService.save(chart);
    ThrowUtils.throwIf(!saveResult, ErrorCode.SYSTEM_ERROR, "图表保存失败");

    // 在最终的返回结果前提交一个任务
    // todo 建议处理任务队列满了后,抛异常的情况(因为提交任务报错了,前端会返回异常)
    CompletableFuture.runAsync(() -> {
        // 先修改图表任务状态为 “执行中”。等执行成功后,修改为 “已完成”、保存执行结果;执行失败后,状态修改为 “失败”,记录任务失败信息。(为了防止同一个任务被多次执行)
        Chart updateChart = new Chart();
        updateChart.setId(chart.getId());
        // 把任务状态改为执行中
        updateChart.setStatus("running");
        boolean b = chartService.updateById(updateChart);
        // 如果提交失败(一般情况下,更新失败可能意味着你的数据库出问题了)
        if (!b) {
            handleChartUpdateError(chart.getId(), "更新图表执行中状态失败");
            return;
        }

        // 调用 AI
        String result = aiManager.doChat(biModelId, userInput.toString());
        String[] splits = result.split("【【【【【");
        if (splits.length < 3) {
            handleChartUpdateError(chart.getId(), "AI 生成错误");
            return;
        }
        String genChart = splits[1].trim();
        String genResult = splits[2].trim();
        // 调用AI得到结果之后,再更新一次
        Chart updateChartResult = new Chart();
        updateChartResult.setId(chart.getId());
        updateChartResult.setGenChart(genChart);
        updateChartResult.setGenResult(genResult);
        updateChartResult.setStatus("succeed");
        boolean updateResult = chartService.updateById(updateChartResult);
        if (!updateResult) {
            handleChartUpdateError(chart.getId(), "更新图表成功状态失败");
        }
    },threadPoolExecutor);

    BiResponse biResponse = new BiResponse();
//        biResponse.setGenChart(genChart);
//        biResponse.setGenResult(genResult);
    biResponse.setChartId(chart.getId());
    return ResultUtils.success(biResponse);
}
// 上面的接口很多用到异常,直接定义一个工具类
private void handleChartUpdateError(long chartId, String execMessage) {
    Chart updateChartResult = new Chart();
    updateChartResult.setId(chartId);
    updateChartResult.setStatus("failed");
    updateChartResult.setExecMessage(execMessage);
    boolean updateResult = chartService.updateById(updateChartResult);
    if (!updateResult) {
        log.error("更新图表失败状态失败" + chartId + "," + execMessage);
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/630291.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

strcpy函数详解

strcpy函数详解 1.函数简介2.strcpy函数的使用2.1使用方法一2.1使用方法二 3.strcpy在使用过程中的注意事项3.1被复制字符必须以\0结尾3.2目标空间必须能够大于源字符串长度3.3目标空间必须可变 1.函数简介 strcpy函数包含在<string.h>库函数中&#xff0c;是将一个字符…

共享文件夹(以及问题解决方法)

目录 文件夹共享 第一步&#xff0c;将文件夹共享 第二步&#xff0c;设置用户权限 第三步&#xff0c;打开网络发现 第四步&#xff0c;访问 网络中没有设备问题 控制面板&#xff0c;启动 重启 还是不行&#xff1f;计算机管理&#xff0c;启动 FDResPub服务&#x…

波搜索算法(WSA)-2024年SCI新算法-公式原理详解与性能测评 Matlab代码免费获取

​ 声明&#xff1a;文章是从本人公众号中复制而来&#xff0c;因此&#xff0c;想最新最快了解各类智能优化算法及其改进的朋友&#xff0c;可关注我的公众号&#xff1a;强盛机器学习&#xff0c;不定期会有很多免费代码分享~ 目录 原理简介 一、初始化阶段 二、全…

JumpServer堡垒机应用(v3.10.8) 下

目录 JumpServer堡垒机简单式部署与管理(v3.10.8) 上-CSDN博客 一. 资产管理 1.1创建资产 1.2 给资产主机创建用户 1.2.1 普通账户&#xff1a; 1.2.2 特权账户&#xff1a; 1.2.3 创建用户 二. 命令过滤 2.1 创建命令组 2.2 创建命令过滤 ​编辑 三. 创建资产授权 …

大模型算法(一):从Transformer到ViT再到LLaMA

单任务/单领域模型 深度学习最早的研究集中在针对单个领域或者单个任务设计相应的模型。 对于CV计算机视觉领域&#xff0c;最常用的模型是CNN卷积模型。其中针对计算机视觉中的不同具体任务例如分类任务&#xff0c;目标检测任务&#xff0c;图像分割任务&#xff0c;以CNN作…

Linux的常用指令 和 基础知识穿插巩固(巩固知识必看)

目录 前言 ls ls 扩展知识 ls -l ls -a ls -al cd cd 目录名 cd .. cd ~ cd - pwd 扩展知识 路径 / cp [选项] “源文件名” “目标文件名” mv [选项] “源文件名” “目标文件名” rm 作用 用法 ./"可执行程序名" mkdir rmdir touch m…

Springboot+Vue项目-基于Java+MySQL的制造装备物联及生产管理ERP系统(附源码+演示视频+LW)

大家好&#xff01;我是程序猿老A&#xff0c;感谢您阅读本文&#xff0c;欢迎一键三连哦。 &#x1f49e;当前专栏&#xff1a;Java毕业设计 精彩专栏推荐&#x1f447;&#x1f3fb;&#x1f447;&#x1f3fb;&#x1f447;&#x1f3fb; &#x1f380; Python毕业设计 &…

Leetcode—3146. 两个字符串的排列差【简单】

2024每日刷题&#xff08;135&#xff09; Leetcode—3146. 两个字符串的排列差 实现代码 class Solution { public:int findPermutationDifference(string s, string t) {int maps[26];int mapt[26];for(int i 0; i < s.size(); i) {int idxs s[i] - a;int idxt t[i] …

案例|200多套设备实时监测,守护江西彰湖水库安全

中型水库作为水利建设的重要组成部分&#xff0c;在防洪、供水、农业灌溉、改善民生和生态效益等方面都具有重要意义。国务院发布《关于切实加强水库除险加固和运行管护工作的通知》&#xff0c;重点提出要提升信息化管理能力&#xff0c;要加快建设水库雨水情测报、大坝安全监…

判断上三角矩阵 分数 15

题目展示&#xff1a; 代码展示&#xff1a; 点这里&#xff0c;输入题目名称即可检索更多题目答案 ​#include<stdio.h>int main() {//T-tint t 0;scanf("%d",&t);while(t--)//循环t次&#xff0c;处理t个矩阵{int n 0;scanf("%d",&n);…

C语言学习【printf函数和scanf函数】

C语言学习【printf函数和scanf函数】 printf()函数和scanf()函数可以让用户与程序交流&#xff0c;是输入/输出函数 printf()函数 请求printf()函数打印数据的指令要与待打印数据的类型相匹配。例如&#xff0c;打印整数时使用%d&#xff0c;打印字符时使用%c。这些符号被称…

字符串_字符函数和字符串函数

C语言中对字符和字符串的处理很是频繁&#xff0c;但是C语言本身是没有字符串类型的&#xff0c;字符串通常放在常量字符串中或者字符数组中。 字符串常量适用于那些对它不做修改的字符串函数。 目录 1.函数介绍 1.1strlen 1.1.1strlen函数的模拟实现 1.2strcpy 1.2.1st…

性能测试学习二

瓶颈的精准判断 TPS曲线 tps图 响应时间图 拐点在哪里呢? 这是一个阶梯式增加的场景,拐点在第二个压力阶梯上就出现了,因为响应时间增加了,tps增加的却不多,在第三个阶段时,tps增加的就更少了,响应时间也在不断增加,所以性能瓶颈在加剧,越往后越明显【tps的增长,…

【35分钟掌握金融风控策略29】贷中模型调额调价策略

目录 贷中客户风险管理和客户运营体系 用信审批策略 用信审批策略决策流与策略类型 贷中预警策略 对存量客户进行风险评级 基于客户的风险评级为客户匹配相应的风险缓释措施和建议 调额策略 基于定额策略的调额策略 基于客户在贷中的风险表现的调额策略 调价策略 存…

鸿蒙开发接口Ability框架:【ApplicationContext】

ApplicationContext ApplicationContext模块提供开发者应用级别的的上下文的能力&#xff0c;包括提供注册及取消注册应用内组件生命周期的监听接口。 说明&#xff1a; 开发前请熟悉鸿蒙开发指导文档&#xff1a; gitee.com/li-shizhen-skin/harmony-os/blob/master/README.m…

留学资讯 | 2024英国学生签证申请需要满足哪些条件?

英国移民局于2020年9月10日发布了《移民规则变更声明: HC 707》&#xff0c;对学生签证制度进行了全面改革。该法案于2020年10月5日正式生效。根据此法案&#xff0c;新的学生签证——The Student and Child Student Routes学生和儿童学生路线&#xff0c;将替代原先的Tier 4学…

基于java的超级玛丽游戏的设计与实现(论文 + 源码)

Java的超级玛丽游戏.zip资源-CSDN文库https://download.csdn.net/download/JW_559/89313347 基于java的超级玛丽游戏的设计与实现 摘要 近年来&#xff0c;Java作为一种新的编程语言&#xff0c;以其简单性、可移植性和平台无关性等优点&#xff0c;得到了广泛地应用。J2SE称…

链接表存储图(C++注释详解): 构建表 深度优先遍历 (DFS)

链接表的结构体单元: #define size 100 typedef struct node {int idx;//下一个节点的索引int wt;//权重, 也可根据实际情景存储边的信息struct node* next; }Node; Node* hd[size]; // 存储图的邻接表 链接表的的构建: int main() {int n, m;cin >> n >> m; //…

SOLIDWORKS 2024零件特征功能增强

如大家所知&#xff0c;达索系统SOLIDWORKS每年都会发布新版本以主动响应客户的需求。现有客户使用的版本并不一样&#xff0c;所以在文档数据交流方面存在一定困难。同时工厂中的其它部门都会与产品研发部门进行协作&#xff0c;所以我们需要更强大的软件功能快速接收和处理模…

AnyMP4 Video Converter for Mac/Win - 视频转换的卓越之选

在当今数字化的时代&#xff0c;视频内容无处不在&#xff0c;而拥有一款强大的视频转换器就显得至关重要。AnyMP4 Video Converter for Mac/win 正是这样一款出类拔萃的工具&#xff0c;为您带来高效、便捷的视频转换体验。 这款视频转换器具备令人惊叹的功能。它支持广泛的视…