JWT
JWT(Json Web Token)是一种可以跨域的认证方案
jwt由三部分构成:
头部header:头部包含算法和token类型
核载payload:这部分用来保存自定义信息
签名signature:使用header和payload以及提供的秘钥,用header指定的算法进行签名
三个部分都是使用base64进行编码,并用.隔开
eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c
OAuth2
OAuth2 规范要求使用密码流时,必须以表单格式发送username和password,并且这两个字段必须命名为username和password
OAuth2 实现密码哈希与 Bearer JWT 令牌验证
密码哈希需要使用passlib库,并且指定bcrypt算法
pip install passlib[bcrypt]
jwt生成解析使用python-jose库
pip install python-jose[cryptography]
密码哈希
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def verify_password(plain_password, hashed_password):
"""校验密码哈希"""
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
"""生成密码哈希"""
return pwd_context.hash(password)
获取用户
在生成jwt时,会在header部分添加用户信息,认证时,需要先解析jwt,解析成功后还需要判断解析出来的用户信息在数据库是否存在。
# 数据库用户表
fake_users_db = {
# 用户记录
"kael": {
"login_name": "kael",
"hashed_password": get_password_hash("111111")
}}
def get_user(db, username: str):
# 这个方法实际效果应该是从数据库获取对应的用户,db应该是一个数据库操作句柄,username是登录接口传的登录名
if username in db:
user_dict = db[username]
return user_dict # 这里实际使用可以生成用户模型的实例返回
def authenticate_user(db, username: str, password: str):
"""验证用户"""
# 判断用户是否存在
user = get_user(db, username)
if not user:
return False
# 用户密码哈希是否正确
if not verify_password(password, user["hashed_password"]):
return False
return user
假设fake_users_db 是数据库表,kael是一条login_name为kael的用户记录。
jwt生成和解析
from datetime import timedelta, datetime, timezone
from jose import jwt
from jose.exceptions import JWTError
from pydantic import BaseModel
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
# 签名部分的秘钥
# openssl rand -hex 32
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
# jwt header部分的算法
ALGORITHM = "HS256"
# 有效期
ACCESS_TOKEN_EXPIRE_MINUTES = 30
# tokenUrl这个参数用来指定进行Oauth2认证接口地址,这里用的是相对路径
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="login")
class Token(BaseModel):
# 返回token信息的模型
access_token: str
token_type: str
class TokenData(BaseModel):
# token,payload部分的模型
username: str | None = None
def create_access_token(data: dict, expires_delta: timedelta | None = None):
"""创建token"""
# token payload额外信息
to_encode = data.copy()
"""
expires_delta:这个字段来表示token的过期时间
如果传入expires_delta,就使用,否则默认15分钟"""
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=15)
to_encode.update({"exp": expire})
# 生成token
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
async def get_current_user(token: str = Depends(oauth2_scheme)):
"""解析token,并且校验用户是否存在"""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
# 解析token
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
# 从sub字段获取username
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception
user = get_user(fake_users_db, username=token_data.username)
if user is None:
raise credentials_exception
return user
登录接口认证
@app.post("/login")
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
print(user)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user["login_name"]}, expires_delta=access_token_expires
)
return Token(access_token=access_token, token_type="bearer")
@app.get("/users/me/")
async def read_users_me(user=Depends(get_current_user)):
return user
完整代码
from fastapi import FastAPI, Depends, HTTPException, status
from passlib.context import CryptContext
from datetime import timedelta, datetime, timezone
from jose import jwt
from jose.exceptions import JWTError
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel
app = FastAPI()
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def verify_password(plain_password, hashed_password):
"""校验密码哈希"""
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
"""生成密码哈希"""
return pwd_context.hash(password)
# 数据库用户表
fake_users_db = {
# 用户记录
"kael": {
"login_name": "kael",
"hashed_password": get_password_hash("111111")
}}
def get_user(db, username: str):
# 这个方法实际效果应该是从数据库获取对应的用户,db应该是一个数据库操作句柄,username是登录接口传的登录名
if username in db:
user_dict = db[username]
return user_dict
# 签名部分的秘钥
# openssl rand -hex 32
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
# jwt header部分的算法
ALGORITHM = "HS256"
# 有效期
ACCESS_TOKEN_EXPIRE_MINUTES = 30
# tokenUrl这个参数用来指定进行Oauth2认证接口地址,这里用的是相对路径
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="login")
def authenticate_user(db, username: str, password: str):
"""验证用户"""
# 判断用户是否存在
user = get_user(db, username)
if not user:
return False
# 用户密码哈希是否正确
if not verify_password(password, user["hashed_password"]):
return False
return user
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: str | None = None
def create_access_token(data: dict, expires_delta: timedelta | None = None):
# token payload额外信息
to_encode = data.copy()
"""
expires_delta:这个字段来表示token的过期时间
如果传入expires_delta,就使用,否则默认15分钟"""
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=15)
to_encode.update({"exp": expire})
# 生成token
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
# 解析token
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
# 从sub字段获取username
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception
user = get_user(fake_users_db, username=token_data.username)
if user is None:
raise credentials_exception
return user
@app.post("/login")
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
print(user)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user["login_name"]}, expires_delta=access_token_expires
)
return Token(access_token=access_token, token_type="bearer")
@app.get("/users/me/")
async def read_users_me(user=Depends(get_current_user)):
return user
未认证请求users/me会提示未认证
认证成功再次请求users/me会返回用户数据