fastApi笔记12-OAuth2 实现密码哈希与 Bearer JWT 令牌验证

JWT

JWT(Json Web Token)是一种可以跨域的认证方案

jwt由三部分构成:

头部header:头部包含算法和token类型

核载payload:这部分用来保存自定义信息

签名signature:使用header和payload以及提供的秘钥,用header指定的算法进行签名

三个部分都是使用base64进行编码,并用.隔开

eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c

 

OAuth2 

OAuth2 规范要求使用密码流时,必须以表单格式发送username和password,并且这两个字段必须命名为username和password

OAuth2 实现密码哈希与 Bearer JWT 令牌验证

密码哈希需要使用passlib库,并且指定bcrypt算法

pip install passlib[bcrypt]

 jwt生成解析使用python-jose库

pip install python-jose[cryptography]

密码哈希

from passlib.context import CryptContext

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")


def verify_password(plain_password, hashed_password):
    """校验密码哈希"""
    return pwd_context.verify(plain_password, hashed_password)


def get_password_hash(password):
    """生成密码哈希"""
    return pwd_context.hash(password)

获取用户

在生成jwt时,会在header部分添加用户信息,认证时,需要先解析jwt,解析成功后还需要判断解析出来的用户信息在数据库是否存在。

# 数据库用户表
fake_users_db = {
    # 用户记录
    "kael": {
        "login_name": "kael",
        "hashed_password": get_password_hash("111111")
    }}


def get_user(db, username: str):
    # 这个方法实际效果应该是从数据库获取对应的用户,db应该是一个数据库操作句柄,username是登录接口传的登录名
    if username in db:
        user_dict = db[username]
        return user_dict  # 这里实际使用可以生成用户模型的实例返回



def authenticate_user(db, username: str, password: str):
    """验证用户"""
    # 判断用户是否存在
    user = get_user(db, username)
    if not user:
        return False
    # 用户密码哈希是否正确
    if not verify_password(password, user["hashed_password"]):
        return False
    return user

假设fake_users_db 是数据库表,kael是一条login_name为kael的用户记录。

jwt生成和解析

from datetime import timedelta, datetime, timezone
from jose import jwt
from jose.exceptions import JWTError
from pydantic import BaseModel
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm

# 签名部分的秘钥
# openssl rand -hex 32
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
# jwt header部分的算法
ALGORITHM = "HS256"
# 有效期
ACCESS_TOKEN_EXPIRE_MINUTES = 30

# tokenUrl这个参数用来指定进行Oauth2认证接口地址,这里用的是相对路径
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="login")


class Token(BaseModel):
    # 返回token信息的模型
    access_token: str
    token_type: str


class TokenData(BaseModel):
    # token,payload部分的模型
    username: str | None = None


def create_access_token(data: dict, expires_delta: timedelta | None = None):
    """创建token"""
    # token payload额外信息
    to_encode = data.copy()
    """
    expires_delta:这个字段来表示token的过期时间
    如果传入expires_delta,就使用,否则默认15分钟"""
    if expires_delta:
        expire = datetime.now(timezone.utc) + expires_delta
    else:
        expire = datetime.now(timezone.utc) + timedelta(minutes=15)
    to_encode.update({"exp": expire})
    # 生成token
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt


async def get_current_user(token: str = Depends(oauth2_scheme)):
    """解析token,并且校验用户是否存在"""
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        # 解析token
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        # 从sub字段获取username
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
        token_data = TokenData(username=username)
    except JWTError:
        raise credentials_exception
    user = get_user(fake_users_db, username=token_data.username)
    if user is None:
        raise credentials_exception
    return user

登录接口认证

@app.post("/login")
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
    user = authenticate_user(fake_users_db, form_data.username, form_data.password)
    print(user)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user["login_name"]}, expires_delta=access_token_expires
    )
    return Token(access_token=access_token, token_type="bearer")


@app.get("/users/me/")
async def read_users_me(user=Depends(get_current_user)):
    return user

完整代码

from fastapi import FastAPI, Depends, HTTPException, status
from passlib.context import CryptContext
from datetime import timedelta, datetime, timezone
from jose import jwt
from jose.exceptions import JWTError
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel

app = FastAPI()

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")


def verify_password(plain_password, hashed_password):
    """校验密码哈希"""
    return pwd_context.verify(plain_password, hashed_password)


def get_password_hash(password):
    """生成密码哈希"""
    return pwd_context.hash(password)


# 数据库用户表
fake_users_db = {
    # 用户记录
    "kael": {
        "login_name": "kael",
        "hashed_password": get_password_hash("111111")
    }}


def get_user(db, username: str):
    # 这个方法实际效果应该是从数据库获取对应的用户,db应该是一个数据库操作句柄,username是登录接口传的登录名
    if username in db:
        user_dict = db[username]
        return user_dict


# 签名部分的秘钥
# openssl rand -hex 32
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
# jwt header部分的算法
ALGORITHM = "HS256"
# 有效期
ACCESS_TOKEN_EXPIRE_MINUTES = 30

# tokenUrl这个参数用来指定进行Oauth2认证接口地址,这里用的是相对路径
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="login")


def authenticate_user(db, username: str, password: str):
    """验证用户"""
    # 判断用户是否存在
    user = get_user(db, username)
    if not user:
        return False
    # 用户密码哈希是否正确
    if not verify_password(password, user["hashed_password"]):
        return False
    return user


class Token(BaseModel):
    access_token: str
    token_type: str


class TokenData(BaseModel):
    username: str | None = None


def create_access_token(data: dict, expires_delta: timedelta | None = None):
    # token payload额外信息
    to_encode = data.copy()
    """
    expires_delta:这个字段来表示token的过期时间
    如果传入expires_delta,就使用,否则默认15分钟"""
    if expires_delta:
        expire = datetime.now(timezone.utc) + expires_delta
    else:
        expire = datetime.now(timezone.utc) + timedelta(minutes=15)
    to_encode.update({"exp": expire})
    # 生成token
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt


async def get_current_user(token: str = Depends(oauth2_scheme)):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        # 解析token
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        # 从sub字段获取username
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
        token_data = TokenData(username=username)
    except JWTError:
        raise credentials_exception
    user = get_user(fake_users_db, username=token_data.username)
    if user is None:
        raise credentials_exception
    return user


@app.post("/login")
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
    user = authenticate_user(fake_users_db, form_data.username, form_data.password)
    print(user)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user["login_name"]}, expires_delta=access_token_expires
    )
    return Token(access_token=access_token, token_type="bearer")


@app.get("/users/me/")
async def read_users_me(user=Depends(get_current_user)):
    return user

 

 

 未认证请求users/me会提示未认证

 

认证成功再次请求users/me会返回用户数据

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/431427.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

12c 32k strings新特性 varchar2/nvarchar2 32K

12c 32k strings新特性 varchar2/nvarchar2 32K 1、查看 SQL> show parameter MAX_STRING_SIZENAME TYPE VALUE ------------------------------------ ----------- ------------------------------ max_string_size …

Rust 开发的高性能 Python 包管理工具,可替换 pip、pip-tools 和 virtualenv

最近,我在 Python 潮流周刊 中分享了一个超级火爆的项目,这还不到一个月,它在 Github 上已经拿下了 8K star 的亮眼成绩,可见其受欢迎程度极高!国内还未见有更多消息,我趁着周末把一篇官方博客翻译出来了&a…

12:Logstash|Web日志实时分析

Logstash|Web日志实时分析 logstashlogstash工作结构安装Logstash编写logstash配置文件步骤一:codec类插件插件帮助手册Logstash input插件步骤一:file模块插件filter grok插件Web日志实时分析部署beats与filebeat步骤一:filter grok模块插件logstash 一个数据采集、加工处…

基于巨控GRM561/562/563Y西门子1200PLC发邮件

巨控GRM560,GRM600系列同比之前的GRM530,除短信,微信,电话语音播报增加了邮件发送功能,简单介绍一下PLC发邮件。 1在博途中建立好DB块 2.打开GRMDEV6,新建工程,做好数据采集,这里以DB4.D0&#…

Day17:信息打点-APP资产知识产权应用监控静态提取动态抓包动态调试

目录 案例1:名称获取APP信息(爱企查/小蓝本/七麦/点点) 案例2:URL网站备案查APP 案例3:APP提取信息-静态分析 案例3:APP提取信息-动态抓包 案例4:APP提取信息-动态调试 思维导图 章节知识…

JavaWeb03-HTTP协议,Tomcat,Servlet

目录 一、HTTP协议 1.概述 2.特点 3.请求数据格式 (1)请求行 (2)请求头 (3)请求体 (4)常见请求头 (5)GET和POST请求区别 4.响应数据格式 &#xf…

Python一些可能用的到的函数系列125 FSM工具transitions

说明 首先FSM是一个很有用的工具,在程序设计中,某个对象会对应若干不同的状态,在这个状态下,同样的方法会有不一样的行为。 python有个transitions包可以做这个,过去一直不想用,主要是感觉有点鸡肋。 本质…

基于springboot+vue的个人博客系统

博主主页:猫头鹰源码 博主简介:Java领域优质创作者、CSDN博客专家、阿里云专家博主、公司架构师、全网粉丝5万、专注Java技术领域和毕业设计项目实战,欢迎高校老师\讲师\同行交流合作 ​主要内容:毕业设计(Javaweb项目|小程序|Pyt…

Vue自定义商品发布组件

文章目录 一、代码展示二、代码解读三、结果展示 一、代码展示 <template><div><a-popover trigger"hover" :getPopupContainer"triggerNode > {return triggerNode.parentNode || document.body;}"><template #content><d…

Redis面试总结

概述 1. Redis是什么&#xff1f;简述它的优缺点&#xff1f; Redis本质上是一个Key-Value类型的内存数据库&#xff0c;很像Memcached&#xff0c;整个数据库加载在内存当中操作&#xff0c;定期通过异步操作把数据库中的数据flush到硬盘上进行保存。 因为是纯内存操作&…

[Angular 基础] - routing 路由(下)

[Angular 基础] - routing 路由(下) 之前部分 Angular 笔记&#xff1a; [Angular 基础] - 自定义指令&#xff0c;深入学习 directive [Angular 基础] - service 服务 [Angular 基础] - routing 路由(上) 使用 route 书接上回&#xff0c;继续折腾 routing 按照最初的 wi…

ffmpeg使用命令实现音视频分离

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、简单介绍二、具体操作三、验证1.源文件2.视频文件3.音频文件 四、补充总结 前言 有时候由于特殊需要可能需要将一个视频&#xff08;带音频&#xff09;的…

洛谷:P3068 [USACO13JAN] Party Invitations S(枚举、前缀和)

这题我们数据范围太大&#xff0c;用二维肯定是不行的&#xff0c;我们可以采用一维线性存储。 如题意&#xff0c;我们可以将每组奶牛编号都存在一维数组里面&#xff0c;只需记录每组的头尾指针就可以了。 如题中样例我们就可以存储成1 3 3 4 1 2 3 4 5 6 7 4 3 2 1 然后第…

docker部署aria2-pro

前言 我平时有一些下载视频和一些资源文件的需求&#xff0c;有时候需要离线下载&#xff0c;也要速度比较快的方式 之前我是用家里的玩客云绝育之后不再写盘当下载机用的&#xff0c;但是限制很多 我发现了aria2 这个下载器非常适合我&#xff0c;而有个大佬又在原来的基础…

基于 Vue3打造前台+中台通用提效解决方案(上)

基于 Vue3打造前台+中台通用提效解决方案 1、项目架构 本项目使用vite + vue3来实现前中台解决方案 2、为什么使用vite ? 因为,之前的项目一直都是使用webpack作为构建工具;vite出来这么久了,也没有用过;所以想在当前项目下进行使用; 2.1、为什么vite比webpack块? …

android开发文档下载,你的技术真的到天花板了吗

Android 基础 1.Activity 1、 什么是 Activity&#xff1f; 2、 请描述一下 Activity 生命周期 …… 2.Service 3.Broadcast Receiver32 4.ContentProvider 5.ListView 6.Intent 7.Fragment 1.Fragment 跟 Activity 之间是如何传值的 2.描述一下 Fragment 的生命周期 3.Fragme…

Qt多弹窗实现包括QDialog、QWidget、QMainWindow

1.相关说明 独立Widget窗口、嵌入式Widget、嵌入式MainWindow窗口、独立MainWindow窗口等弹窗的实现 相关界面包含关系 2.相关界面 3.相关代码 mainwindow.cpp #include "mainwindow.h" #include "ui_mainwindow.h" #include "tformdoc.h" #incl…

容器云平台巡检实战:运维进阶技巧与策略

1 docker容器日常巡检 通过以下方式进行检查&#xff1a; 1.1 docker/podman ps查看容器状态 Docker/podman ps -a 查看容器状态STATUS&#xff1a; Exited(0)&#xff1a;表示容器正常退出 Exited(其他数字)&#xff1a;容器异常退出&#xff0c;需要通过log 查看原因 Up…

080|为什么阿里的价值观值得你关注?

在阿里巴巴20周年年会现场&#xff0c;万众瞩目之下&#xff0c;马云和张勇完成了阿里巴巴董事长职务的交接。 不过你也知道&#xff0c;这次接棒在一年前就已经公布了&#xff0c;在年会上只是一个仪式。在20周年年会过后&#xff0c;我找到了互联网圈的资深媒体人阳淼&#…

julia语言使用PyCall包调用Python代码及Python包

Julia语言虽然好&#xff0c;但是包管理方面和生态环境感觉还有一点小小的缺陷&#xff0c;但是Julia可以调用Python丰富的包&#xff0c;用起来很方便。 安装PyCall 在安装之前先确认下Julia和Python的版本&#xff0c;我使用的稳定版本的 Julia1.6.7&#xff0c;Python版本是…