在做大模型开发的过程中,相信很多小伙伴都是对大模型开发感兴趣,却对 fastapi 这个框架并不熟悉,但是,实际开发的项目确需要用户鉴权,这时候就会很头疼,查阅官方文档发现,官方虽然有例子,但是写的非常简单,只有一个自定义的verify_token函数,函数内容还需要自己实现,如果投入学习成本在 fastapi 的用户认证上就显得得不偿失。而 fastapi 是有现成的用户认证的框架的,比如 fastapi-users。今天,我就带领大家完成一个使用 fastapi-users来实现带有用户认证功能的Langserve接口。
以上是官方的认证鉴权的方法,实现过程需要自己来完成。
实现步骤:
下载fastapi-users
pip install 'fastapi-users[sqlalchemy]'
代码目录结构
创建 langserve 项目的方法我就不再介绍了,感兴趣的同学,可以看我的这篇公众号文章:LangServe全面使用指南
.
├── app
│ ├── db.py
│ ├── __init__.py
│ ├── __pycache__
│ ├── schemas.py
│ ├── server.py
│ └── users.py
├── Dockerfile
├── packages
│ └── README.md
├── poetry.lock
├── pyproject.toml
├── README.md
└── test.db
其中,db.py、users.py、schemas.py都是新建的 python 文件,这个目录结构也是fastapi-users推荐的拆分方式。
代码编写
db.py
from typing import AsyncGenerator
from fastapi import Depends
from fastapi_users.db import SQLAlchemyBaseUserTableUUID, SQLAlchemyUserDatabase
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from sqlalchemy.orm import DeclarativeBase
DATABASE_URL = "sqlite+aiosqlite:///./test.db"
class Base(DeclarativeBase):
pass
class User(SQLAlchemyBaseUserTableUUID, Base):
pass
engine = create_async_engine(DATABASE_URL)
async_session_maker = async_sessionmaker(engine, expire_on_commit=False)
async def create_db_and_tables():
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
async def get_async_session() -> AsyncGenerator[AsyncSession, None]:
async with async_session_maker() as session:
yield session
async def get_user_db(session: AsyncSession = Depends(get_async_session)):
yield SQLAlchemyUserDatabase(session, User)
schemas.py
import uuid
from fastapi_users import schemas
class UserRead(schemas.BaseUser[uuid.UUID]):
pass
class UserCreate(schemas.BaseUserCreate):
pass
class UserUpdate(schemas.BaseUserUpdate):
pass
users.py
import uuid
from typing import Optional
from fastapi import Depends, Request
from fastapi_users import BaseUserManager, FastAPIUsers, UUIDIDMixin
from fastapi_users.authentication import (
AuthenticationBackend,
BearerTransport,
JWTStrategy,
)
from fastapi_users.db import SQLAlchemyUserDatabase
from app.db import User, get_user_db
SECRET = "SECRET"
class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
reset_password_token_secret = SECRET
verification_token_secret = SECRET
async def on_after_register(self, user: User, request: Optional[Request] = None):
print(f"User {user.id} has registered.")
async def on_after_forgot_password(
self, user: User, token: str, request: Optional[Request] = None
):
print(f"User {user.id} has forgot their password. Reset token: {token}")
async def on_after_request_verify(
self, user: User, token: str, request: Optional[Request] = None
):
print(f"Verification requested for user {user.id}. Verification token: {token}")
async def get_user_manager(user_db: SQLAlchemyUserDatabase = Depends(get_user_db)):
yield UserManager(user_db)
bearer_transport = BearerTransport(tokenUrl="auth/jwt/login")
def get_jwt_strategy() -> JWTStrategy:
return JWTStrategy(secret=SECRET, lifetime_seconds=3600)
auth_backend = AuthenticationBackend(
name="jwt",
transport=bearer_transport,
get_strategy=get_jwt_strategy,
)
fastapi_users = FastAPIUsers[User, uuid.UUID](get_user_manager, [auth_backend])
current_active_user = fastapi_users.current_user(active=True)
注意: 这些都是fastapi-users官方用例部分,最主要的是 server.py中的结合部分。
from fastapi import FastAPI, Depends
from fastapi.responses import RedirectResponse
from langserve import add_routes
from contextlib import asynccontextmanager
from app.db import User, create_db_and_tables
from app.schemas import UserCreate, UserRead, UserUpdate
from app.users import auth_backend, current_active_user, fastapi_users
from langchain.chat_models import ChatOpenAI
from langchain_community.chat_models.moonshot import MoonshotChat
@asynccontextmanager
async def lifespan(app: FastAPI):
# Not needed if you setup a migration system like Alembic
await create_db_and_tables()
yield
app = FastAPI(
lifespan=lifespan,
#dependencies=[Depends(current_active_user)]
)
app.include_router(
fastapi_users.get_auth_router(auth_backend), prefix="/auth/jwt", tags=["auth"]
)
app.include_router(
fastapi_users.get_register_router(UserRead, UserCreate),
prefix="/auth",
tags=["auth"],
)
app.include_router(
fastapi_users.get_reset_password_router(),
prefix="/auth",
tags=["auth"],
)
app.include_router(
fastapi_users.get_verify_router(UserRead),
prefix="/auth",
tags=["auth"],
)
app.include_router(
fastapi_users.get_users_router(UserRead, UserUpdate),
prefix="/users",
tags=["users"],
)
@app.get("/authenticated-route")
async def authenticated_route(user: User = Depends(current_active_user)):
return {"message": f"Hello {user.email}!"}
add_routes(
app,
MoonshotChat(),
path="/openai",
dependencies=[Depends(current_active_user)],
)
@app.get("/")
async def redirect_root_to_docs():
return RedirectResponse("/docs")
# Edit this to add the chain you want to add
# add_routes(app, NotImplemented)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
这里,我用的是kimi的api接口,最主要的改变是在add_routes中增加了dependencies参数,引入了fastapi-users封装好的current_active_user。
add_routes(
app,
MoonshotChat(),
path="/openai",
dependencies=[Depends(current_active_user)], #这句代码是关键
)
最后,看下实际调用 invoke 接口后的效果吧!只需要在调用的接口的 header 中加入Authorization即可,对应的值是bearer加 token。这个 token 是调用 login 接口后返回的。
总结
怎么样?是不是很简单?如果有不理解的地方或者需要本代码的,欢迎与我联系。