之前我们创建的文件都是在一个目录中,但是在我们的实际开发中,肯定不能这样设计,那么我们去创建一个目录,叫models,大致如下。
主要目录是:
· __init__.py 是一个空文件,说明models是一个package
· crud.py 数据库操作相关
· database.py 数据库配置相关
· models.py 数据库模型表
· schemas.py 模型验证
· main.py 主文件
database.py代码如下:
from sqlalchemy import create_engine
from sqlalchemy.orm import declarative_base
from sqlalchemy.orm import sessionmaker
conn = "mysql+pymysql://{username}:{password}@{host}:{port}/{database}?charset=utf8".format(
username="root", password="123456", host="10.30.10.36", port=3306, database="fastapi_learn_road")
engine = create_engine(conn)
# 该类的每个实例都是一个数据库会话,该类本身还不是数据库会话,但是一旦我们创建了SessionLocal的实例,这个实例将是实际的数据库会话
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# 创建数据库基类
Base = declarative_base()
models.py代码如下:
from sqlalchemy import Boolean, Column, ForeignKey, Integer, String
from sqlalchemy.orm import relationship
from .database import Base
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
email = Column(String(10), unique=True, index=True)
hashed_password = Column(String(100))
is_active = Column(Boolean, default=True)
items = relationship("Item", back_populates="owner")
class Item(Base):
__tablename__ = "items"
id = Column(Integer, primary_key=True, index=True)
title = Column(String(10), index=True)
description = Column(String(10), index=True)
owner_id = Column(Integer, ForeignKey("users.id"))
owner = relationship("User", back_populates="items")
schemas.py代码如下:定义请求参数模型验证与响应模型验证的Pydantic模型。
from pydantic import BaseModel
from typing import List, Optional
class BaseItem(BaseModel):
title: str
description: Optional[str] = None
class ItemModel(BaseItem):
pass
class ItemOut(BaseItem):
id: int
owner_id: int
class Config:
orm_mode = True
class BaseUser(BaseModel):
email: str
class UserModel(BaseUser):
"""请求参数模型"""
password: str
class UserOut(BaseUser):
"""响应模型"""
id: int
is_active: bool
items: List[ItemOut]
class Comfig:
orm_mode = True
crud.py代码如下:
# 之前都是把所有逻辑写到了接口函数里,其实我们应该抽出来,一起管理
from fastapi import HTTPException
from sqlalchemy.orm import Session
from .models import *
from .schemas import *
def get_user_method(db: Session, uid: int):
user = db.query(User).filter(User.id == uid).first()
if not user:
raise HTTPException(status_code=404, detail="user not exists")
return user
def create_user_method(db: Session, user: UserModel):
db_user = db.query(User).filter(User.email == user.email).first()
if db_user:
raise HTTPException(status_code=200, detail="this user already exists")
fake_hashed_password = user.password + "_hashed"
init_user = User(email=user.email, hashed_password=fake_hashed_password)
db.add(init_user)
db.commit()
db.refresh(init_user)
return init_user
def get_items_method(db: Session, skip: int = 0, limit: int = 10):
return db.query(Item).offset(skip).limit(limit).all()
def get_items_by_uid_method(db: Session, uid: int):
user = db.query(User).filter(User.id == uid).first()
if not user:
raise HTTPException(status_code=200, detail="this user is not valid")
return db.query(Item).filter(Item.owner == user).offset(0).limit(2).all()
def create_item_by_user_method(db: Session, uid: int, item: ItemModel):
init_item = Item(**item.dict(), owner_id=uid)
db.add(init_item)
db.commit()
db.refresh(init_item)
return init_item
main.py代码如下:
from fastapi import FastAPI, Depends, HTTPException
from models.crud import *
from models.database import *
app = FastAPI()
def create_db():
"""每个请求处理完毕后关闭当前连接,不同的请求使用不同的链接"""
db = SessionLocal()
try:
yield db
finally:
db.close()
@app.post("/user", response_model=UserOut)
def create_user(user: UserModel, db: Session = Depends(create_db)):
return create_user_method(db, user)
@app.get("/user", response_model=UserOut)
def get_user(uid: int, db: Session = Depends(create_db)):
return get_user_method(db, uid)
@app.post("/items/{uid}", response_model=ItemOut)
def create_item_by_user(uid: int, item: ItemModel, db: Session = Depends(create_db)):
return create_item_by_user_method(db, uid, item)
@app.get("/items", response_model=List[ItemOut])
def get_items(skip: int = 0, limit: int = 10, db: Session = Depends(create_db)):
return get_items_method(db, skip, limit)
@app.get("/items/{uid}", response_model=List[ItemOut])
def get_items_by_uid(uid: int, db: Session = Depends(create_db)):
return get_items_by_uid_method(db, uid)
if __name__ == '__main__':
import uvicorn
uvicorn.run("main:app", reload=True)
我们目前是这么改造的。后续还会持续改造的。目前我们没有对API接口main文件进行改造,下面的分享我们会对api接口做改造。