目录
allAPI
__init__.py是空文件,目的让python知道这个文件夹是个包
ClassInfo.py
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from API.database import get_db, Base
from sqlalchemy import Column, String, Integer
app04 = APIRouter()
# 定义班级信息模型
class ClassInfo(Base):
__tablename__ = 'ClassInfo'
class_id = Column(String(40), primary_key=True, nullable=False)
cls_rank = Column(String(40), nullable=False)
head_teacher = Column(String(40), nullable=False)
student_count = Column(Integer, nullable=False)
# 定义班级信息视图模型
class ViewClassInfo(Base):
__tablename__ = 'ViewClassInfo'
class_id = Column(String(40), primary_key=True)
cls_rank = Column(String(40))
head_teacher = Column(String(40))
student_count = Column(Integer)
# 获取班级信息列表
@app04.get("/classes")
async def read_classes(db: Session = Depends(get_db)):
try:
classes = db.query(ViewClassInfo).all()
if classes:
return {"success": True, "data": classes}
else:
return {"success": True, "data": [], "message": "No data available"}
except Exception as e:
return {"success": False, "message": "Failed to fetch classes", "error": str(e)}
from pydantic import BaseModel
# 定义用于接收班级ID列表的 Pydantic 模型
class ClassIds(BaseModel):
class_ids: list
# 删除班级信息
@app04.delete("/classes")
async def delete_classes(data: ClassIds, db: Session = Depends(get_db)):
not_found_ids = [] # 用于存储不存在的班级ID
try:
for class_id in data.class_ids: # 从模型中提取班级ID列表
class_info = db.query(ClassInfo).filter(ClassInfo.class_id == class_id).first()
if class_info:
db.delete(class_info)
else:
not_found_ids.append(class_id) # 如果班级ID不存在,添加到列表中
if not_found_ids:
raise HTTPException(status_code=404, detail=f"班级ID不存在: {', '.join(not_found_ids)}")
db.commit()
return {"success": True, "message": "删除成功"}
except Exception as e:
db.rollback()
return {"success": False, "message": "删除失败", "error": str(e)}
# 定义用于接收班级信息的 Pydantic 模型
class ClassInfoModel(BaseModel):
class_id: str
cls_rank: str
head_teacher: str
student_count: int
# 添加班级信息
@app04.post("/classes")
async def add_class(data: ClassInfoModel, db: Session = Depends(get_db)):
try:
# 检查班级ID是否已存在
existing_class = db.query(ClassInfo).filter(ClassInfo.class_id == data.class_id).first()
if existing_class:
raise HTTPException(status_code=400, detail="班级ID已存在")
# 添加新班级信息
new_class = ClassInfo(
class_id=data.class_id,
cls_rank=data.cls_rank,
head_teacher=data.head_teacher,
student_count=data.student_count
)
db.add(new_class)
db.commit()
return {"success": True, "message": "添加成功"}
except HTTPException as e:
# 直接将HTTPException的详情返回给前端
return {"success": False, "message": "添加失败", "error": str(e)}
except Exception as e:
db.rollback()
return {"success": False, "message": "添加失败", "error": str(e)}
enrollment_api.py
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from pydantic import BaseModel
from API.database import get_db, Base
from sqlalchemy import Column, String, Date
# 创建API路由
app03 = APIRouter()
# 定义学生学籍信息模型
class StudentEnrollment(Base):
__tablename__ = 'StudentEnrollment'
student_id = Column(String(8), primary_key=True, nullable=False)
stu_name = Column(String(8), nullable=False)
gender = Column(String(6), nullable=False)
class_ = Column(String(20), nullable=False)
birth_date = Column(String(80), nullable=False)
school = Column(String(200), nullable=False)
# 创建视图模型
class ViewStudentEnrollment(Base):
__tablename__ = 'ViewStudentEnrollment'
student_id = Column(String(8), primary_key=True)
stu_name = Column(String(8))
gender = Column(String(6))
class_ = Column(String(20))
birth_date = Column(String(80))
school = Column(String(200))
# 定义用于接收学生信息的 Pydantic 模型
class StudentEnrollmentModel(BaseModel):
student_id: str
stu_name: str
gender: str
class_: str
birth_date: str
school: str
# 获取学生学籍信息列表
@app03.get("/enrollments")
async def read_enrollments(db: Session = Depends(get_db)):
try:
enrollments = db.query(ViewStudentEnrollment).all()
if enrollments:
return {"success": True, "data": enrollments}
else:
return {"success": True, "data": [], "message": "No data available"}
except Exception as e:
return {"success": False, "message": "Failed to fetch enrollments", "error": str(e)}
# 定义用于接收学号列表的 Pydantic 模型
class StudentIds(BaseModel):
student_ids: list
# 删除学生学籍信息
@app03.delete("/enrollments")
async def delete_enrollments(data: StudentIds, db: Session = Depends(get_db)):
not_found_ids = [] # 用于存储不存在的学号
try:
for student_id in data.student_ids: # 从模型中提取学号列表
enrollment = db.query(StudentEnrollment).filter(StudentEnrollment.student_id == student_id).first()
if enrollment:
db.delete(enrollment)
else:
not_found_ids.append(student_id) # 如果学号不存在,添加到列表中
if not_found_ids:
raise HTTPException(status_code=404, detail=f"学号不存在: {', '.join(not_found_ids)}")
db.commit()
return {"success": True, "message": "删除成功"}
except Exception as e:
db.rollback()
return {"success": False, "message": "删除失败", "error": str(e)}
# 添加学生学籍信息
@app03.post("/enrollments")
async def add_enrollment(data: StudentEnrollmentModel, db: Session = Depends(get_db)):
try:
# 检查学号是否已存在
existing_enrollment = db.query(StudentEnrollment).filter(StudentEnrollment.student_id == data.student_id).first()
if existing_enrollment:
raise HTTPException(status_code=400, detail="学号已存在")
# 添加新学生学籍信息
new_enrollment = StudentEnrollment(
student_id=data.student_id,
stu_name=data.stu_name,
gender=data.gender,
class_=data.class_,
birth_date=data.birth_date,
school=data.school
)
db.add(new_enrollment)
db.commit()
return {"success": True, "message": "添加成功"}
except HTTPException as e:
# 直接将HTTPException的详情返回给前端
return {"success": False, "message": "添加失败", "error": str(e)}
except Exception as e:
db.rollback()
return {"success": False, "message": "添加失败", "error": str(e)}
LoginANDEnroll.py
from fastapi import FastAPI, HTTPException, status, Request, APIRouter, Depends
from pydantic import BaseModel
from sqlalchemy import create_engine, Column, Integer, String, DateTime, Boolean
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from API.database import SessionLocal, User, get_db # 从 database.py 导入 SessionLocal, User 和 get_db
from sqlalchemy.orm import Session
from datetime import datetime
app01 = APIRouter()
# # 创建数据库连接字符串
# SQLALCHEMY_DATABASE_URL = "mysql+mysqlconnector://root:123456@localhost/datas"
# engine = create_engine(SQLALCHEMY_DATABASE_URL)
#
# # 创建会话本地工厂
# SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
#
# # 创建声明式基类
# Base = declarative_base()
# 定义用户模型
# class User(Base):
# __tablename__ = "users"
#
# id = Column(Integer, primary_key=True, index=True)
# username = Column(String(50), unique=True, index=True)
# password = Column(String(50))
# registration_date = Column(DateTime)
# is_admin = Column(Boolean, default=False) # 添加一个字段来判断是否为管理员
# 创建数据库表
# Base.metadata.create_all(bind=engine)
# # 创建一个依赖项函数来管理数据库会话
# def get_db():
# db = SessionLocal()
# try:
# yield db
# finally:
# db.close()
# 用户登录请求模型
class Login(BaseModel):
username: str
password: str
# 登录接口
@app01.post("/login")
async def login(login: Login, db: Session = Depends(get_db)):
if not login.username or not login.password:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="用户名和密码不能为空")
# session = SessionLocal()
user = db.query(User).filter(User.username == login.username).first()
if user and user.password == login.password:
return {"success": True, "user_type": "admin" if user.is_admin else "user"}
else:
return {"success": False} # 返回一个标准的 JSON 响应
# 用户注册请求模型
class Register(BaseModel):
username: str
password: str
# 然后在路由函数中使用这个依赖项
@app01.post("/register")
async def register(user: Register, db: Session = Depends(get_db)):
# 检查用户是否已存在
existing_user = db.query(User).filter(User.username == user.username).first()
if existing_user:
return {"success": False, "message": "用户已存在"}
# 创建新用户
new_user = User(
username=user.username,
password=user.password, # 在实际应用中,应该对密码进行哈希处理
registration_date=datetime.now()
)
db.add(new_user)
db.commit()
db.refresh(new_user)
return {"success": True, "message": "注册成功", "user_id": new_user.id}
# # 定义生命周期事件处理器,事件处理器不能写在这
# async def shutdown_event(app: FastAPI) -> None:
# await SessionLocal.close_all()
#
#
# # 注册事件处理器
# app01.add_event_handler("shutdown", shutdown_event)
# # 运行应用
# if __name__ == "__main__":
# import uvicorn
#
# uvicorn.run(app, host="localhost", port=8000)
# # uvicorn.run(app, host="localhost", port=63342)
student_scores.pr
from fastapi import APIRouter, HTTPException, Depends
from sqlalchemy.orm import Session
from API.database import get_db, Base
from sqlalchemy import Column, Integer, String
app02 = APIRouter()
# 定义学生成绩模型
class StudentScoreFromView(Base):
__tablename__ = 'StudentScoresWithTotalRank'
student_id = Column(String, primary_key=True)
stu_name = Column(String)
total_score = Column(Integer)
score_rank = Column(Integer)
class_ = Column(String) # 注意这里使用 class_ 而不是 class,因为 class 是 Python 的保留字
# 新模型,引用实际的 StudentScores 表
class StudentScore(Base):
__tablename__ = 'StudentScores'
student_id = Column(String, primary_key=True)
stu_name = Column(String)
total_score = Column(Integer)
score_rank = Column(Integer)
class_ = Column(String) # 使用 class_ 避免与 Python 关键字冲突
# 获取学生成绩列表
@app02.get("/studentscores")
async def read_student_scores(db: Session = Depends(get_db)):
try:
scores = db.query(StudentScoreFromView).all()
if scores:
return {"success": True, "data": scores}
else:
return {"success": True, "data": [], "message": "No data available"}
except Exception as e:
return {"success": False, "message": "Failed to fetch scores", "error": str(e)}
from pydantic import BaseModel
# 定义用于接收学号列表的 Pydantic 模型
class StudentIds(BaseModel):
student_ids: list
# 删除学生成绩
from fastapi import HTTPException
@app02.delete("/studentscores")
async def delete_student_scores(data: StudentIds, db: Session = Depends(get_db)):
not_found_ids = [] # 用于存储不存在的学号
try:
for student_id in data.student_ids: # 从模型中提取学号列表
student_score = db.query(StudentScore).filter(StudentScore.student_id == student_id).first()
if student_score:
db.delete(student_score)
else:
not_found_ids.append(student_id) # 如果学号不存在,添加到列表中
if not_found_ids:
raise HTTPException(status_code=404, detail=f"学号不存在: {', '.join(not_found_ids)}")
db.commit()
return {"success": True, "message": "删除成功"}
except Exception as e:
db.rollback()
return {"success": False, "message": "删除失败", "error": str(e)}
# 定义用于接收学生信息的 Pydantic 模型
class StudentScoreModel(BaseModel):
student_id: str
stu_name: str
total_score: int
score_rank: int
class_: str
# 添加学生成绩
@app02.post("/studentscores")
async def add_student_score(data: StudentScoreModel, db: Session = Depends(get_db)):
try:
# 检查学号是否已存在
existing_student = db.query(StudentScore).filter(StudentScore.student_id == data.student_id).first()
if existing_student:
raise HTTPException(status_code=400, detail="学号已存在")
# 添加新学生成绩
new_student = StudentScore(
student_id=data.student_id,
stu_name=data.stu_name,
total_score=data.total_score,
score_rank=data.score_rank,
class_=data.class_
)
db.add(new_student)
db.commit()
return {"success": True, "message": "添加成功"}
except HTTPException as e:
# 直接将HTTPException的详情返回给前端
return {"success": False, "message": "添加失败", "error": str(e)}
except Exception as e:
db.rollback()
return {"success": False, "message": "添加失败", "error": str(e)}
database.py
# 创建数据库连接字符串
SQLALCHEMY_DATABASE_URL = "mysql+mysqlconnector://root:123456@localhost/datas"
engine = create_engine(SQLALCHEMY_DATABASE_URL)
此部分需连接自己的mysql
from sqlalchemy import create_engine, Column, Integer, String, DateTime, Boolean
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
# 创建数据库连接字符串
SQLALCHEMY_DATABASE_URL = "mysql+mysqlconnector://root:123456@localhost/datas"
engine = create_engine(SQLALCHEMY_DATABASE_URL)
# 创建会话本地工厂
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# 创建声明式基类
Base = declarative_base()
# 定义用户模型
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
username = Column(String(50), unique=True, index=True)
password = Column(String(50))
registration_date = Column(DateTime)
is_admin = Column(Boolean, default=False) # 添加一个字段来判断是否为管理员
# 创建数据库表
Base.metadata.create_all(bind=engine)
# 创建一个依赖项函数来管理数据库会话
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
main.py
from fastapi import FastAPI
import uvicorn
from fastapi.staticfiles import StaticFiles
from fastapi.middleware.cors import CORSMiddleware
from database import SessionLocal
from allAPI.LoginANDEnroll import app01
from allAPI.student_scores import app02
from allAPI.enrollment_api import app03
from allAPI.ClassInfo import app04
app = FastAPI()
# 添加 CORS 中间件
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # 允许所有源
allow_credentials=True,
allow_methods=["*"], # 允许所有方法
allow_headers=["*"], # 允许所有头
)
app.include_router(app01, prefix="/WJH", tags=["登录与注册"])
app.include_router(app02, prefix="/WJH", tags=["成绩信息"])
app.include_router(app03, prefix="/WJH", tags=["学籍信息"])
app.include_router(app04, prefix="/WJH", tags=["班级信息"])
# 定义生命周期事件处理器
async def shutdown_event(app: FastAPI) -> None:
await SessionLocal.close_all()
# 注册事件处理器
app.add_event_handler("shutdown", shutdown_event)
if __name__ == '__main__':
uvicorn.run("main:app", port=8000, reload=True, host="127.0.0.1")
# uvicorn.run("main:app", port=8000, reload=True, host="localhost")