Flask使用线程异步执行耗时任务

1 问题说明

1.1 任务简述

在开发Flask应用中一定会遇到执行耗时任务,但是Flask是轻量级的同步框架,即在单个请求时服务会阻被塞,直到任务完成(注意:当前请求被阻塞不会影响到其他请求)。

解决异步问题有两种思路,一种是借助外部工具实现异步,例如:消息队列(RabbitMQ)、 异步任务队列(Celery+Redis);另一种借助Python中的进程、线程或协程解决异步。我的是小项目选择因此选择了第二种方法。

经过测试,我在Flask中使用协程(gevent)会被阻塞;使用进程(multiprocessing)不会被阻塞,操作数据库出了问题(有可能是我没操作正确的问题);最后选择使用线程(threading)。

注意:使用线程会出现线程安全问题,

1.2 注意的问题

(1)线程安全

"""
!!! 注意
1 由于flask-SQLAlchemy对SQLAlchemy进行了封装,所以是线程安全的,可以在线程中直接使用;
2 原生SQLAlchemy中
        # 2.1 直接使用Session是线程不安全的,不推荐使用
        self.session = Session(self.engine)

        # 2.2 直接使用scoped_session是线程安全的,推荐使用
        # 获取sessionmaker
        session_factory = sessionmaker(engine)
        # scoped_session是线程安全的
        # 注意,此session不能使用Query对象
        session = scoped_session(session_factory)
        
!!!
"""

(2)使用数据库

# 注意:高版本的Flask-SQLAlchemy(我的版本Flask-SQLAlchemy==3.1.1,SQLAlchemy==2.0.16)必须使用”with app.app_context()“,本质原因是Flask关联的SQLAlchemy版本太高
# 否则无法插入数据库并报错,错误内容如下(低版本不会出现此问题):
"""
This typically means that you attempted to use functionality that needed
the current application. To solve this, set up an application context
with app.app_context(). See the documentation for more information.
"""

2 工程布局

项目布局如下:
在这里插入图片描述

3 源代码

(1)main.py

from blueprint import init_blueprint
from config_app import app
from config_db import init_mysql_db


# 初始化MySQL
init_mysql_db()

init_blueprint()

if __name__ == '__main__':
    app.run(host='0.0.0.0', debug=True)

(2)config_app.py

from flask import Flask
from flask_cors import CORS


def create_app():
    flask_app = Flask(__name__)
    CORS(flask_app, supports_credentials=True)
    return flask_app


app = create_app()

(3)config_db.py


from flask_sqlalchemy import SQLAlchemy
from flask_marshmallow import Marshmallow

# 添加pymysql驱动,连接MySQL数据库
import pymysql

from config_app import app

pymysql.install_as_MySQLdb()

"""
!!! 注意
1 由于flask-SQLAlchemy对SQLAlchemy进行了封装,所以是线程安全的,可以在线程中直接使用;
2 原生SQLAlchemy中
        # 2.1 直接使用Session是线程不安全的,不推荐使用
        self.session = Session(self.engine)

        # 2.2 直接使用scoped_session是线程安全的,推荐使用
        # 获取sessionmaker
        session_factory = sessionmaker(engine)
        # scoped_session是线程安全的
        # 注意,此session不能使用Query对象
        session = scoped_session(session_factory)
        
!!!
"""

# 创建MySQL单实例
mysql_db = SQLAlchemy()

# 创建Schema
mysql_schema = Marshmallow()


# 创建数据库
"""
create database async default character set utf8mb4 collate utf8mb4_unicode_ci;
"""

class MysqlConf:
    acc = "root"
    pwd = "123456"
    host = "192.168.108.200"
    port = 3306
    db = "async"


mysql_conf = MysqlConf()


# 初始化MySQL数据库
def init_mysql_db():

    # 配置MySQL数据库url
    db_url = "mysql://" + mysql_conf.acc + ":" + mysql_conf.pwd + "@" + mysql_conf.host + ":" + str(mysql_conf.port) + "/" + mysql_conf.db
    app.config["SQLALCHEMY_DATABASE_URI"] = db_url

    # 关闭sqlalchemy自动跟踪数据库
    app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False

    # 显示底层执行的SQL语句
    app.config['SQLALCHEMY_ECHO'] = True

    # 解决‘No application found. Either work inside a view function or push an application context.’
    app.app_context().push()

    # 初始化app
    mysql_db.init_app(app)

    # 初始化schema
    mysql_schema.init_app(app)


# 初始化table
def init_table():
    # 删除表
    mysql_db.drop_all()
    # 创建表
    mysql_db.create_all()

(4)dao.py

import datetime

from config_db import mysql_db as db


# Create table of bm_record
class User(db.Model):
    # Record table
    __tablename__ = "as_user"

    id = db.Column("us_id", db.Integer, nullable=False, primary_key=True, autoincrement=True)
    name = db.Column("us_name", db.String(100))
    age = db.Column("us_age", db.Integer)
    create_time = db.Column("us_create_time", db.DateTime, default=datetime.datetime.now)


# 插入数据
def insert_record_dao(user: User):
    # Add data
    db.session.add(user)
    # Commit data
    db.session.commit()

(5)blueprint.py

# 构建蓝本
import threading
import time
from concurrent.futures import ThreadPoolExecutor

from flask import Blueprint, jsonify

from config_app import app
from config_db import init_table
from dao import insert_record_dao, User

user = Blueprint("user", __name__)


# 注册蓝本
def init_blueprint():
    app.register_blueprint(user, url_prefix='/user')


@user.route("/initdb")
def init_db():
    init_table()
    return jsonify("success")


@user.route("/add")
def add_user():
    user: User = User()
    user.name = "zhangsan"
    user.age = 12

    time.sleep(10)
    insert_record_dao(user)

    return jsonify(user.id)


executor = ThreadPoolExecutor(3)
@user.route("/thread_pool")
def run_task_thread_pool():
    executor.submit(_run_thread_pool)
    return jsonify("success")


# 执行线程
def _run_thread_pool():
    print("Run thread pool")
    time.sleep(2)
    user: User = User()
    user.name = "thread pool"
    user.age = 12

    # 注意:必须使用”with app.app_context()“,否则无法插入数据库,并且不会报错
    with app.app_context():
        insert_record_dao(user)

    print("End thread pool")
    pass


@user.route("/thread")
def run_task_thread():
    task = threading.Thread(target=_run_thread, name='thread-01')
    task.start()

    return jsonify("success")


# 执行线程
def _run_thread():
    print("Run thread")
    time.sleep(2)

    user: User = User()
    user.name = "thread"
    user.age = 12

    # 注意:高版本的Flask-SQLAlchemy(我的版本Flask-SQLAlchemy==3.1.1,SQLAlchemy==2.0.16)必须使用”with app.app_context()“,本质原因是Flask关联的SQLAlchemy版本太高
    # 否则无法插入数据库并报错,错误内容如下(低版本不会出现此问题):
    """
    This typically means that you attempted to use functionality that needed
    the current application. To solve this, set up an application context
    with app.app_context(). See the documentation for more information.
    """
    with app.app_context():
        insert_record_dao(user)

    print("End thread")
    pass

4 请求截图

(1)初始化数据库

在这里插入图片描述

(2)添加用户

在这里插入图片描述

(3)使用线程池

在这里插入图片描述

(4)使用线程

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/211723.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

已解决AttributeError: module ‘gradio‘ has no attribute ‘outputs‘

问题描述 Traceback (most recent call last): File "/media/visionx/monica/project/ResShift/app.py", line 118, in <module> gr.outputs.File(label"Download the output")AttributeError: module gradio has no attribute outputs 解决办…

vscode 调试jlink

文章目录 软件使用说明1、启动GDB Server2、下载gdb3、vscode配置4、调试 软件 vscodejlink - (JLinkGDBServer.exe)gcc-arm-none-eabi-10-2020-q4-major (arm-none-eabi-gdb.exe) 使用说明 vscode通过TCP端口调用JLinkGDBServer通过jlink连接和操作设备&#xff0c;vscode不…

创建腾讯云存储桶---上传图片--使用cos-sdk完成上传

创建腾讯云存储桶—上传图片 注册腾讯云账号https://cloud.tencent.com/login 登录成功&#xff0c;选择右边的控制台 点击云产品&#xff0c;选择对象存储 创建存储桶 填写名称&#xff0c;选择公有读&#xff0c;私有写一直下一步&#xff0c;到创建 选择安全管理&#…

无人机助力电力设备螺母缺销智能检测识别,python基于YOLOv7开发构建电力设备螺母缺销小目标检测识别系统

传统作业场景下电力设备的运维和维护都是人工来完成的&#xff0c;随着现代技术科技手段的不断发展&#xff0c;基于无人机航拍飞行的自动智能化电力设备问题检测成为了一种可行的手段&#xff0c;本文的核心内容就是基于YOLOv7来开发构建电力设备螺母缺销检测识别系统&#xf…

智能优化算法应用:基于黄金正弦算法无线传感器网络(WSN)覆盖优化 - 附代码

智能优化算法应用&#xff1a;基于黄金正弦算法无线传感器网络(WSN)覆盖优化 - 附代码 文章目录 智能优化算法应用&#xff1a;基于黄金正弦算法无线传感器网络(WSN)覆盖优化 - 附代码1.无线传感网络节点模型2.覆盖数学模型及分析3.黄金正弦算法4.实验参数设定5.算法结果6.参考…

熬夜会秃头——beta冲刺Day3

这个作业属于哪个课程2301-计算机学院-软件工程社区-CSDN社区云这个作业要求在哪里团队作业—beta冲刺事后诸葛亮-CSDN社区这个作业的目标记录beta冲刺Day3团队名称熬夜会秃头团队置顶集合随笔链接熬夜会秃头——Beta冲刺置顶随笔-CSDN社区 目录 一、团队成员会议总结 1、成员…

【UE】UEC++获取屏幕颜色GetPixelFromCursorPosition()

目录 【UE】UE C 获取屏幕颜色GetPixelFromCursorPosition() 一、函数声明与定义 二、函数的调用 三、运行结果 【UE】UE C 获取屏幕颜色GetPixelFromCursorPosition() 一、函数声明与定义 创建一个蓝图方法库方法 GetPixelFromCursorPosition()&#xff0c;并给他指定UF…

使用 STM32 微控制器读取光电传感器数据的实现方法

本文介绍了如何使用 STM32 微控制器读取光电传感器数据的实现方法。通过配置和使用STM32的GPIO和ADC功能&#xff0c;可以实时读取光电传感器的模拟信号并进行数字化处理。本文将介绍硬件连接和配置&#xff0c;以及示例代码&#xff0c;帮助开发者完成光电传感器数据的读取。 …

算法工程师面试八股(搜广推方向)

文章目录 机器学习线性和逻辑回归模型逻辑回归二分类和多分类的损失函数二分类为什么用交叉熵损失而不用MSE损失&#xff1f;偏差与方差Layer Normalization 和 Batch NormalizationSVM数据不均衡特征选择排序模型树模型进行特征工程的原因GBDTLR和GBDTRF和GBDTXGBoost二阶泰勒…

MATLAB R2022b 安装

文章用于学习记录 文章目录 前言下载解压安装包总结 前言 下载解压安装包 MATLAB R2022b —— A9z3 装载(Mount) MATLAB_R2022b_Win64.iso 打开装载好的 DVD 驱动器并找到 setup&#xff0c;单击鼠标右键以管理员身份运行&#xff1a; 点击窗口右上角的 高级选项下拉框&#…

Docker 镜像及其命令

文章目录 镜像Docker 镜像加载原理联合文件系统bootfs和rootfs镜像分层 镜像分层的优势容器层常用命令 镜像 镜像是一种轻量级、可执行的独立软件包&#xff0c;它包含运行某个软件所需的所有内容&#xff0c;我们把应用程序和配置依赖打包好形成一个可交付的运行环境&#xff…

AirServer怎么用?如何AirServer进行手机投屏

什么是 AirServer&#xff1f; AirServer 是适用于 Mac 和 PC 的先进的屏幕镜像接收器。 它允许您接收 AirPlay 和 Google Cast 流&#xff0c;类似于 Apple TV 或 Chromecast 设备。AirServer 可以将一个简单的大屏幕或投影仪变成一个通用的屏幕镜像接收器 &#xff0c;是一款…

深入理解Java中的锁机制

引言 大家好&#xff0c;我是小黑。今天咱们来聊聊Java中的锁机制&#xff0c;这可是并发编程的核心。你知道吗&#xff0c;在并发编程的世界里&#xff0c;正确地使用锁就像是掌握了一把神奇的钥匙&#xff0c;它能帮咱们在多线程的混战中保持秩序&#xff0c;防止数据被乱改…

实用工具网站合集值得收藏![搜嗖工具箱]

最近一段时间有点忙&#xff0c;一直没有更新在此给大家说声抱歉哈&#xff0c;有些小伙伴儿私信说想要用到的工具&#xff0c;茶壶儿也会尽可能满足大家&#xff01;今天我们要分享的工具主要有以下几款&#xff0c;我们来一起看一下吧&#xff1f; 一帧秒创 https://aigc.y…

2015年五一杯数学建模C题生态文明建设评价问题解题全过程文档及程序

2015年五一杯数学建模 C题 生态文明建设评价问题 原题再现 随着我国经济的迅速发展&#xff0c;生态文明越来越重要&#xff0c;生态文明建设被提到了一个前所未有的高度。党的十八大报告明确提出要大力推进生态文明建设&#xff0c;报告指出“建设生态文明&#xff0c;是关系…

93基于matlab的萤火虫算法优化支持向量机(GSA-SVM)分类模型

基于matlab的萤火虫算法优化支持向量机&#xff08;GSA-SVM&#xff09;分类模型&#xff0c;以分类精度为优化目标优化SVM算法的参数c和g&#xff0c;输出分类可视化结果。数据可更换自己的&#xff0c;程序已调通&#xff0c;可直接运行。 93萤火虫算法优化支持向量机 (xiaoh…

网上商城、宠物商城源码(Java)

javaWebjsp网上书城以及宠物商城源码&#xff0c;功能有购物车、收藏以及下单等等功能 带后台管理功能 运行示意图&#xff1a;

Docker中部署并启动RabbitMQ

目的 由于最近频繁更换云服务器&#xff0c;导致环境啥的都需要重新配置&#xff0c;关于RabbitMQ&#xff0c;我在看其他博主的文章时&#xff0c;总是不能第一时间找到想要的配置方法&#xff08;也不是没有&#xff0c;只是花的时间太久&#xff09;&#xff0c;于是打算自己…

前端入门(四)Ajax、Promise异步、Axios通信、vue-router路由、组件库

文章目录 AjaxAjax特点 Promise 异步编程&#xff08;缺&#xff09;Promise基本使用状态 - PromiseState结果 - PromiseResult AxiosVue中使用AxiosAxios请求方式getpostput和patchdelete并发请求 Vue路由 - vue-router单页面Web应用&#xff08;single page web application&…

如何确定短线的买入卖出时机?

短线投资制胜的一个关键能力&#xff0c;就是精准地找到买入卖出时机。那么&#xff0c;怎么样才能获得这种关键能力呢&#xff1f; 在这节课里&#xff0c;我们将给大家梳理一下常见的短线买入卖出时机&#xff0c;并通过案例讲解帮助大家理解。话不多说&#xff0c;赶紧进入主…