说明
最近做了几个实验,将结论梳理一下,方便以后翻看。
- 1 flask-celery 主要用于数据流的同步任务,其执行由flask-aps发起,基于IO并发的方法,达到资源的高效利用,满足业务上的需求。
- 2 目前部署环境有算网机和anygpu
内容
1 环境
现在看起来,将flask-celery部署在宿主机上是可行的,一般来说我会先安装miniconda3,有些容器本身就已经是anaconda3的就不动了。
apt的升级是可选的
sudo apt update
sudo apt upgrade -y
wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh
bash https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh
~/miniconda3/bin/conda init
source ~/.bashrc
conda 24.4.0
which conda
/root/miniconda3/bin/conda
然后,一般就在base环境下安装包,如果本身已经是容器环境(租用机有些不允许再安装软件),就直接安装相关python3包
pip3 install ipython -i https://mirrors.aliyun.com/pypi/simple/
pip3 install requests -i https://mirrors.aliyun.com/pypi/simple/
wget -NO Basefuncs-1.10-py3-none-any.whl http://YOURS/downup/view008_download_from_folder/pys.Basefuncs-1.10-py3-none-any.whl
pip3 install Basefuncs-1.10-py3-none-any.whl
pip3 install clickhouse_driver -i https://mirrors.aliyun.com/pypi/simple/
pip3 install pandas -i https://mirrors.aliyun.com/pypi/simple/
pip3 install numpy -i https://mirrors.aliyun.com/pypi/simple/
pip3 install redis -i https://mirrors.aliyun.com/pypi/simple/
pip3 install pydantic -i https://mirrors.aliyun.com/pypi/simple/
pip3 install nest_asyncio -i https://mirrors.aliyun.com/pypi/simple/
pip3 install aiohttp -i https://mirrors.aliyun.com/pypi/simple/
pip3 install Flask -i https://mirrors.aliyun.com/pypi/simple/
pip3 install Flask-APScheduler -i https://mirrors.aliyun.com/pypi/simple/
pip3 install celery -i https://mirrors.aliyun.com/pypi/simple/
pip3 install gunicorn -i https://mirrors.aliyun.com/pypi/simple/
pip3 install mongoengine -i https://mirrors.aliyun.com/pypi/simple/
pip3 install apscheduler -i https://mirrors.aliyun.com/pypi/simple/
pip3 install tornado -i https://mirrors.aliyun.com/pypi/simple/
pip3 install Pillow -i https://mirrors.aliyun.com/pypi/simple/
pip3 install markdown -i https://mirrors.aliyun.com/pypi/simple/
pip3 install pymysql -i https://mirrors.aliyun.com/pypi/simple/
pip3 install gevent -i https://mirrors.aliyun.com/pypi/simple/
pip3 install akshare -i https://mirrors.aliyun.com/pypi/simple/
接下来需要在指定位置创建文件
- 1 如果是在算网机,那么直接拉取项目文件,一般放在 /opt/project_notes/flask_celery 下面
- 2 如果是在租用机,一般建立在/opt/flask_celery 下面,采用vim + 拷贝的方式
systemd
一般在宿主机级别可以有这个服务的控制,但是很多算力机也是没有的,只租用容器(怎么有种卖艺不卖身的感觉 - - !)。
如果没有systemd,那么就需要切到对应目录下,直接用后台启动的方式
直接用命令启动的方式,切换到对应目录下
cd /opt/project_notes/flask_celery
或
cd /opt/flask_celery
vim server_single_v2.py
# 如果采用了非标的redis配置,需要到程序里面修改一下地址
celery_broker = 'redis://:YOURS@127.0.0.1:24008/1'
# 前台启动命令
gunicorn server_single_v2:app -b 0.0.0.0:24104
celery -A server_single_v2.celery_ worker
写的时候刚打开,一堆积压的任务立即扑面而来。
我突然意识到celery的异步可能是线程级,而不是协程级的。(因为worker编号看起来很像我的cpu核数编号)
算了,先放过自己 。用一段时间看看,之后可以自己创建一个协程级的服务来进行流转和调度。我主要的应用应该就是在于异步请求API。我想以后有一块很大的改造就是将取数/查询部分的服务都使用异步进行优化。可以参考tornado同步转异步几种方式,先从搭建异步tornado开始。
编辑启动脚本vim ~/start_flask_celery.sh
。理论上需要source才能activate,虽然即使不activate也可以运行,但我猜主要是因为在服务里面写了conda环境,所以默认进入了base。
#!/bin/bash
# 激活 base 环境(或你创建的特定环境)
source /root/miniconda3/etc/profile.d/conda.sh
conda activate base
#!/bin/bash
#anaconda环境
# 运行 Python 服务脚本
# 算网机
cd /opt/project_notes/flask_celery
# 租用机
# cd /opt/flask_celery
nohup gunicorn server_single_v2:app -b 0.0.0.0:24104 >/dev/null 2>&1 &
nohup celery -A server_single_v2.celery_ worker >/dev/null 2>&1 &
如果没有systemd,那么就开机的时候手动执行一下脚本,一般还是尽量注册为systemd服务,注意一下两种不同环境的配置。
先修改脚本执行权限 chmod +x ~/start_flask_celery.sh
,vim /lib/systemd/system/flask_celery.service
[Unit]
Description=flask_celery_service
After=network.target network-online.target syslog.target
Wants=network.target network-online.target
[Service]
#启动服务的命令
Type=forking
ExecStart=/root/start_flask_celery.sh
Restart=always
RestartSec=5
# miniconda
Environment="PATH=/root/miniconda3/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin"
# anaconda
#Environment="PATH=/root/anaconda3/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin"
[Install]
WantedBy=multi-user.target
然后配置重载、启动和自启动
systemctl daemon-reload
systemctl start flask_celery
systemctl enable flask_celery
systemctl status flask_celery
┌─root@m7:~
└─ $ systemctl status flask_celery
● flask_celery.service - flask_celery_service
Loaded: loaded (/lib/systemd/system/flask_celery.service; enabled; vendor preset: enabled)
Active: active (running) since Sat 2024-06-15 13:03:51 CST; 1s ago
Process: 428 ExecStart=/root/start_flask_celery.sh (code=exited, status=0/SUCCESS)
Tasks: 38 (limit: 4915)
CGroup: /system.slice/flask_celery.service
├─475 /root/miniconda3/bin/python /root/miniconda3/bin/gunicorn server_single_v2:app -b 0.0.0.0:24104
├─476 /root/miniconda3/bin/python /root/miniconda3/bin/celery -A server_single_v2.celery_ worker
├─556 /root/miniconda3/bin/python /root/miniconda3/bin/gunicorn server_single_v2:app -b 0.0.0.0:24104
├─864 /root/miniconda3/bin/python /root/miniconda3/bin/celery -A server_single_v2.celery_ worker
├─924 /root/miniconda3/bin/python /root/miniconda3/bin/celery -A server_single_v2.celery_ worker
├─949 /root/miniconda3/bin/python /root/miniconda3/bin/celery -A server_single_v2.celery_ worker
└─975 /root/miniconda3/bin/python /root/miniconda3/bin/celery -A server_single_v2.celery_ worker
6月 15 13:03:51 m7 systemd[1]: Starting flask_celery_service...
6月 15 13:03:51 m7 systemd[1]: Started flask_celery_service.
测试
import requests as req
para_dict = {'arg1':111,
'arg2':222}
resp = req.post('http://127.0.0.1:24104/sum_post/',json = para_dict )
print('每个任务耗时10秒')
import time
tick1 = time.time()
print(req.get('http://127.0.0.1:24104/get_result/%s' % resp.text).text)
tick2 = time.time()
print('actually takes %.2f' % (tick2 - tick1 ))
每个任务耗时10秒
333
actually takes 10.02