1. 创建并激活虚拟环境
python3 -m venv venv
source venv/*/activate
2. 安装依赖包
pip install fastapi 'uvicorn[standard]' tortoise-orm 'celery[redis]' fastapi-cdn-host
3. 配置数据库连接参数
- config.py
from typing import TypedDict
class TortoiseInitParam(TypedDict):
db_url: str
modules: dict
DB_CONFIG: TortoiseInitParam = dict(
db_url="sqlite://db.sqlite3",
modules={"models": ["models"]},
)
4. 定义表结构(直接从FastAPI Examples - Tortoise ORM v0.20.0 Documentation 拷贝过来的)
- models.py
from tortoise import fields, models
from tortoise.contrib.pydantic import pydantic_model_creator
class Users(models.Model):
"""
The User model
"""
id = fields.IntField(pk=True)
#: This is a username
username = fields.CharField(max_length=20, unique=True)
name = fields.CharField(max_length=50, null=True)
family_name = fields.CharField(max_length=50, null=True)
category = fields.CharField(max_length=30, default="misc")
password_hash = fields.CharField(max_length=128, null=True)
# created_at = fields.DatetimeField(auto_now_add=True)
# modified_at = fields.DatetimeField(auto_now=True)
def full_name(self) -> str:
"""
Returns the best name
"""
if self.name or self.family_name:
return f"{self.name or ''} {self.family_name or ''}".strip()
return self.username
class PydanticMeta:
computed = ["full_name"]
exclude = ["password_hash"]
User_Pydantic = pydantic_model_creator(Users, name="User")
UserIn_Pydantic = pydantic_model_creator(Users, name="UserIn", exclude_readonly=True)
5. 配置celery
- tasks.py
#!/usr/bin/env python
import shlex
import subprocess
from pathlib import Path
import anyio
from celery import Celery
from celery.signals import worker_process_init, worker_process_shutdown
from config import DB_CONFIG
from models import Users
from tortoise import Tortoise
REDIS_URL = "redis://localhost:6379"
app = Celery(__name__, broker=REDIS_URL, backend=REDIS_URL)
async def init_db() -> None:
await Tortoise.init(db_url=DB_CONFIG["db_url"], modules=DB_CONFIG["modules"])
@worker_process_init.connect
def init_worker(**kwargs):
anyio.run(init_db)
@worker_process_shutdown.connect
def close_worker(**kwargs):
anyio.run(Tortoise.close_connections)
async def _create_user(data) -> int:
user_obj = await Users.create(**data)
return user_obj.id
@app.task
def save_user_to_db(data) -> int:
return anyio.run(_create_user, data)
def main():
subprocess.run(shlex.split(f"celery -A {Path(__file__).stem} worker"))
if __name__ == "__main__":
main()
6. FastAPI接口示例:
- main.py
#!/usr/bin/env python
from pathlib import Path
from typing import List
import celery
import fastapi_cdn_host
import uvicorn
from fastapi import FastAPI
from pydantic import BaseModel
from starlette.exceptions import HTTPException
from tortoise.contrib.fastapi import register_tortoise
from config import DB_CONFIG
from models import User_Pydantic, UserIn_Pydantic, Users
from tasks import save_user_to_db
app = FastAPI(title="Tortoise ORM FastAPI example")
fastapi_cdn_host.monkey_patch(app)
class Status(BaseModel):
message: str
@app.get("/users", response_model=List[User_Pydantic])
async def get_users():
return await User_Pydantic.from_queryset(Users.all())
@app.post("/users", response_model=User_Pydantic)
async def create_user(user: UserIn_Pydantic):
celery_task = save_user_to_db.delay(user.model_dump(exclude_unset=True))
timeout = 2
try:
user_id = celery_task.get(timeout=timeout)
except celery.exceptions.TimeoutError:
raise HTTPException(
detail=f"Failed to create user in celery within {timeout} seconds",
status_code=400,
)
return await User_Pydantic.from_queryset_single(Users.get(id=user_id))
@app.get("/user/{user_id}", response_model=User_Pydantic)
async def get_user(user_id: int):
return await User_Pydantic.from_queryset_single(Users.get(id=user_id))
@app.put("/user/{user_id}", response_model=User_Pydantic)
async def update_user(user_id: int, user: UserIn_Pydantic):
await Users.filter(id=user_id).update(**user.model_dump(exclude_unset=True))
return await User_Pydantic.from_queryset_single(Users.get(id=user_id))
@app.delete("/user/{user_id}", response_model=Status)
async def delete_user(user_id: int):
deleted_count = await Users.filter(id=user_id).delete()
if not deleted_count:
raise HTTPException(status_code=404, detail=f"User {user_id} not found")
return Status(message=f"Deleted user {user_id}")
register_tortoise(
app,
db_url=DB_CONFIG["db_url"],
modules=DB_CONFIG["modules"],
generate_schemas=True,
add_exception_handlers=True,
)
if __name__ == "__main__":
uvicorn.run(f"{Path(__file__).stem}:app", reload=True)
7. 启动后台服务
python main.py
8. 启动Celery Worker
python tasks.py
9. 打开浏览器验证效果
http://localhost:8000/docs#/default/create_user_users_post
结果如下