1 问题说明
1.1 任务简述
在开发Flask应用中一定会遇到执行耗时任务,但是Flask是轻量级的同步框架,即在单个请求时服务会阻被塞,直到任务完成(注意:当前请求被阻塞不会影响到其他请求)。
解决异步问题有两种思路,一种是借助外部工具实现异步,例如:消息队列(RabbitMQ)、 异步任务队列(Celery+Redis);另一种借助Python中的进程、线程或协程解决异步。我的是小项目选择因此选择了第二种方法。
经过测试,我在Flask中使用协程(gevent)会被阻塞;使用进程(multiprocessing)不会被阻塞,操作数据库出了问题(有可能是我没操作正确的问题);最后选择使用线程(threading)。
注意:使用线程会出现线程安全问题,
1.2 注意的问题
(1)线程安全
"""
!!! 注意
1 由于flask-SQLAlchemy对SQLAlchemy进行了封装,所以是线程安全的,可以在线程中直接使用;
2 原生SQLAlchemy中
# 2.1 直接使用Session是线程不安全的,不推荐使用
self.session = Session(self.engine)
# 2.2 直接使用scoped_session是线程安全的,推荐使用
# 获取sessionmaker
session_factory = sessionmaker(engine)
# scoped_session是线程安全的
# 注意,此session不能使用Query对象
session = scoped_session(session_factory)
!!!
"""
(2)使用数据库
# 注意:高版本的Flask-SQLAlchemy(我的版本Flask-SQLAlchemy==3.1.1,SQLAlchemy==2.0.16)必须使用”with app.app_context()“,本质原因是Flask关联的SQLAlchemy版本太高
# 否则无法插入数据库并报错,错误内容如下(低版本不会出现此问题):
"""
This typically means that you attempted to use functionality that needed
the current application. To solve this, set up an application context
with app.app_context(). See the documentation for more information.
"""
2 工程布局
项目布局如下:
3 源代码
(1)main.py
from blueprint import init_blueprint
from config_app import app
from config_db import init_mysql_db
# 初始化MySQL
init_mysql_db()
init_blueprint()
if __name__ == '__main__':
app.run(host='0.0.0.0', debug=True)
(2)config_app.py
from flask import Flask
from flask_cors import CORS
def create_app():
flask_app = Flask(__name__)
CORS(flask_app, supports_credentials=True)
return flask_app
app = create_app()
(3)config_db.py
from flask_sqlalchemy import SQLAlchemy
from flask_marshmallow import Marshmallow
# 添加pymysql驱动,连接MySQL数据库
import pymysql
from config_app import app
pymysql.install_as_MySQLdb()
"""
!!! 注意
1 由于flask-SQLAlchemy对SQLAlchemy进行了封装,所以是线程安全的,可以在线程中直接使用;
2 原生SQLAlchemy中
# 2.1 直接使用Session是线程不安全的,不推荐使用
self.session = Session(self.engine)
# 2.2 直接使用scoped_session是线程安全的,推荐使用
# 获取sessionmaker
session_factory = sessionmaker(engine)
# scoped_session是线程安全的
# 注意,此session不能使用Query对象
session = scoped_session(session_factory)
!!!
"""
# 创建MySQL单实例
mysql_db = SQLAlchemy()
# 创建Schema
mysql_schema = Marshmallow()
# 创建数据库
"""
create database async default character set utf8mb4 collate utf8mb4_unicode_ci;
"""
class MysqlConf:
acc = "root"
pwd = "123456"
host = "192.168.108.200"
port = 3306
db = "async"
mysql_conf = MysqlConf()
# 初始化MySQL数据库
def init_mysql_db():
# 配置MySQL数据库url
db_url = "mysql://" + mysql_conf.acc + ":" + mysql_conf.pwd + "@" + mysql_conf.host + ":" + str(mysql_conf.port) + "/" + mysql_conf.db
app.config["SQLALCHEMY_DATABASE_URI"] = db_url
# 关闭sqlalchemy自动跟踪数据库
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
# 显示底层执行的SQL语句
app.config['SQLALCHEMY_ECHO'] = True
# 解决‘No application found. Either work inside a view function or push an application context.’
app.app_context().push()
# 初始化app
mysql_db.init_app(app)
# 初始化schema
mysql_schema.init_app(app)
# 初始化table
def init_table():
# 删除表
mysql_db.drop_all()
# 创建表
mysql_db.create_all()
(4)dao.py
import datetime
from config_db import mysql_db as db
# Create table of bm_record
class User(db.Model):
# Record table
__tablename__ = "as_user"
id = db.Column("us_id", db.Integer, nullable=False, primary_key=True, autoincrement=True)
name = db.Column("us_name", db.String(100))
age = db.Column("us_age", db.Integer)
create_time = db.Column("us_create_time", db.DateTime, default=datetime.datetime.now)
# 插入数据
def insert_record_dao(user: User):
# Add data
db.session.add(user)
# Commit data
db.session.commit()
(5)blueprint.py
# 构建蓝本
import threading
import time
from concurrent.futures import ThreadPoolExecutor
from flask import Blueprint, jsonify
from config_app import app
from config_db import init_table
from dao import insert_record_dao, User
user = Blueprint("user", __name__)
# 注册蓝本
def init_blueprint():
app.register_blueprint(user, url_prefix='/user')
@user.route("/initdb")
def init_db():
init_table()
return jsonify("success")
@user.route("/add")
def add_user():
user: User = User()
user.name = "zhangsan"
user.age = 12
time.sleep(10)
insert_record_dao(user)
return jsonify(user.id)
executor = ThreadPoolExecutor(3)
@user.route("/thread_pool")
def run_task_thread_pool():
executor.submit(_run_thread_pool)
return jsonify("success")
# 执行线程
def _run_thread_pool():
print("Run thread pool")
time.sleep(2)
user: User = User()
user.name = "thread pool"
user.age = 12
# 注意:必须使用”with app.app_context()“,否则无法插入数据库,并且不会报错
with app.app_context():
insert_record_dao(user)
print("End thread pool")
pass
@user.route("/thread")
def run_task_thread():
task = threading.Thread(target=_run_thread, name='thread-01')
task.start()
return jsonify("success")
# 执行线程
def _run_thread():
print("Run thread")
time.sleep(2)
user: User = User()
user.name = "thread"
user.age = 12
# 注意:高版本的Flask-SQLAlchemy(我的版本Flask-SQLAlchemy==3.1.1,SQLAlchemy==2.0.16)必须使用”with app.app_context()“,本质原因是Flask关联的SQLAlchemy版本太高
# 否则无法插入数据库并报错,错误内容如下(低版本不会出现此问题):
"""
This typically means that you attempted to use functionality that needed
the current application. To solve this, set up an application context
with app.app_context(). See the documentation for more information.
"""
with app.app_context():
insert_record_dao(user)
print("End thread")
pass