simple-jwt快速入门(包含自定制)
目录
- simple-jwt快速入门(包含自定制)
- 安装
- 路由层
- 视图层
- 全局配置
- 前端传入参数
- 配置文件
- 定制登录返回格式
- 定制payload格式
- 自定制签发-认证
安装
pip install djangorestframework-simplejwt
路由层
from rest_framework_simplejwt.views import TokenObtainPairView
urlpatterns = [
path('login/', TokenObtainPairView.as_view(), name='token_obtain_pair'),
]
- 浏览器路径随便填,但是必须要调用
TokenObtainPairView
类 TokenObtainPairView
:登录接口TokenRefreshView
:刷新tokenTokenVerifyView
:校验token
视图层
# views.py
from rest_framework.permissions import IsAuthenticated
from rest_framework_simplejwt.authentication import JWTAuthentication
class UserDetail(ModelViewSet):
authentication_classes = [JWTAuthentication]
permission_classes = [IsAuthenticated]
-
在需要登录才能访问的视图类中加入
JWTAuthentication
和IsAuthenticated
验证 -
此时相当于用DRF自带的验证视图,他会根据django自带的auth表为我们进行校验,并返回access和refresh
全局配置
REST_FRAMEWORK = {
# DRF默认权限类
'DEFAULT_PERMISSION_CLASSES': (
'rest_framework.permissions.IsAuthenticated',
),
# DRF默认认证类
'DEFAULT_AUTHENTICATION_CLASSES': (
'rest_framework_simplejwt.authentication.JWTAuthentication',
),
}
前端传入参数
由于simple-jwt会默认校验auth表中的数据,因此不需要再额外编写视图类,直接传入参数即可(参数要和数据库字段一致)
- access:校验用的信息,在访问需要登录才能使用的视图方法时需要将它放入浏览器
- Key:必须填Authorization
- Value:必须填Bearer+空格+access参数
配置文件
simple-jwt有默认的配置参数
rest_framework_simplejwt—settings.py
DEFAULTS = {
# Access Token的有效期
"ACCESS_TOKEN_LIFETIME": timedelta(minutes=5),
# Refresh Token的有效期
"REFRESH_TOKEN_LIFETIME": timedelta(days=1),
# 是否自动刷新Refresh Token
"ROTATE_REFRESH_TOKENS": False,
# 刷新Refresh Token时是否将旧Token加入黑名单,如果设置为False,则旧的刷新令牌仍然可以用于获取新的访问令牌
"BLACKLIST_AFTER_ROTATION": False,
# 如为True,则在每次使用访问令牌进行身份验证时,更新用户最后登录时间
"UPDATE_LAST_LOGIN": False,
# 加密算法
"ALGORITHM": "HS256",
# 签名秘钥,这里使用Django的SECRET_KEY
"SIGNING_KEY": settings.SECRET_KEY,
# 用于验证jwt签名的秘钥返回的内容
"VERIFYING_KEY": "",
# JWT中的"Audience"声明,用于指定该JWT的预期接收者
"AUDIENCE": None,
# JWT中的"Issuer"声明,用于指定该JWT的发行者
"ISSUER": None,
# 用于序列化JWT负载的JSON编码器。默认为Django的JSON编码器
"JSON_ENCODER": None,
# 包含公钥的URL,用于验证JWT签名
"JWK_URL": None,
# 允许的时钟偏差量,以秒为单位
"LEEWAY": 0,
# 用于指定JWT在HTTP请求头中使用的身份验证方案。默认为"Bearer"
"AUTH_HEADER_TYPES": ("Bearer",),
# 包含JWT的HTTP请求头的名称。默认为"HTTP_AUTHORIZATION"
"AUTH_HEADER_NAME": "HTTP_AUTHORIZATION",
# 用户模型中用作用户ID的字段。默认为"id"
"USER_ID_FIELD": "id",
# JWT负载中包含用户ID的声明。默认为"user_id"
"USER_ID_CLAIM": "user_id",
# 用于指定用户身份验证规则的函数或方法。默认使用Django的默认身份验证方法进行身份验证
"USER_AUTHENTICATION_RULE": "rest_framework_simplejwt.authentication.default_user_authentication_rule",
# 用于指定可以使用的令牌类。默认为"rest_framework_simplejwt.tokens.AccessToken"
"AUTH_TOKEN_CLASSES": ("rest_framework_simplejwt.tokens.AccessToken",),
# JWT负载中包含令牌类型的声明。默认为"token_type"
"TOKEN_TYPE_CLAIM": "token_type",
# JWT负载中包含JWT ID的声明。默认为"jti"
"JTI_CLAIM": "jti",
# 用于指定可以使用的用户模型类
"TOKEN_USER_CLASS": "rest_framework_simplejwt.models.TokenUser",
# 在使用滑动令牌时,JWT负载中包含刷新令牌过期时间的声明
"SLIDING_TOKEN_REFRESH_EXP_CLAIM": "refresh_exp",
# 滑动令牌的生命周期。默认为5分钟
"SLIDING_TOKEN_LIFETIME": timedelta(minutes=5),
# 滑动令牌可以用于刷新的时间段。默认为1天
"SLIDING_TOKEN_REFRESH_LIFETIME": timedelta(days=1),
# 用于生成access和刷refresh的序列化器
"TOKEN_OBTAIN_SERIALIZER": "rest_framework_simplejwt.serializers.TokenObtainPairSerializer",
# 用于刷新访问令牌的序列化器
"TOKEN_REFRESH_SERIALIZER": "rest_framework_simplejwt.serializers.TokenRefreshSerializer",
# 用于验证令牌的序列化器
"TOKEN_VERIFY_SERIALIZER": "rest_framework_simplejwt.serializers.TokenVerifySerializer",
# 用于列出或撤销已失效JWT的序列化器
"TOKEN_BLACKLIST_SERIALIZER": "rest_framework_simplejwt.serializers.TokenBlacklistSerializer",
# 用于生成滑动令牌的序列化器
"SLIDING_TOKEN_OBTAIN_SERIALIZER": "rest_framework_simplejwt.serializers.TokenObtainSlidingSerializer",
# 用于刷新滑动令牌的序列化器
"SLIDING_TOKEN_REFRESH_SERIALIZER": "rest_framework_simplejwt.serializers.TokenRefreshSlidingSerializer",
# 是否启用撤销令牌
"CHECK_REVOKE_TOKEN": False,
# 在令牌中用于标识撤销令牌的声明
"REVOKE_TOKEN_CLAIM": "hash_password",
}
定制登录返回格式
- 自定义一个序列化类,重写validate方法
class CommonTokenObtainSerializer(TokenObtainPairSerializer):
def validate(self, attrs):
# 获取父类中的原数据
dic = super().validate(attrs)
data = {'code': 100,
'msg': '登录成功',
'username': self.user.username,
'refresh': dic.get('refresh'),
'access': dic.get('access')
}
return data
- 配置文件注册
# settings.py
SIMPLE_JWT = {
'TOKEN_OBTAIN_SERIALIZER': 'user.userserializers.CommonTokenObtainSerializer'
}
定制payload格式
class CommonTokenObtainSerializer(TokenObtainPairSerializer):
@classmethod
def get_token(cls, user):
# 获取父类得到的token
token = super().get_token(user)
print(token.payload)
token['name'] = user.username
return token
def validate(self, attrs):
dic = super().validate(attrs)
data = {'code': 100,
'msg': '登录成功',
'username': self.user.username,
'refresh': dic.get('refresh'),
'access': dic.get('access')
}
return data
- 获取payload属性得到
{'token_type': 'refresh', 'exp': 1713627459, 'iat': 1713541059, 'jti': '8cf1b417155d4ca59965b6625b73831d', 'user_id': 2}
- 因此可以直接
token['name']
为payload添加属性 - 重写后jwt的第三段也会跟着改变
自定制签发-认证
- 自己生成JWT
- 用钩子函数将自己要签发的JWT放入序列化类
- 将签发的JWT放入浏览器再被自定制的认证类接受
- 校验JWT的合法性然后返回
# views.py
import uuid
from datetime import datetime
from rest_framework.decorators import action
from rest_framework.response import Response
from rest_framework.viewsets import ModelViewSet, GenericViewSet
from user import models
from user.MyAuthentication import JWTOurAuth
from user.MySerializers import UserSerializer
from rest_framework_simplejwt.authentication import JWTAuthentication
class UserView(GenericViewSet):
queryset = models.UserLog.objects.all()
serializer_class = UserSerializer
authentication_classes = [JWTAuthentication]
permission_classes = [] # 局部禁用权限校验
@action(methods=['POST'], detail=False)
def login(self, request):
username = (request.data.get('username'))
password = (request.data.get('password'))
serializer = self.get_serializer(data=request.data)
# 用户密码正确则签发,并发送给浏览器META中,顺带返回JWT
if models.UserLog.objects.get(username=username).check_password(password):
if serializer.is_valid():
request.META['MY_TOKEN'] = serializer.validated_data['JWT']
return Response({"msg": '登录成功', 'result': serializer.validated_data})
else:
return Response({"msg": '账号或密码错误'})
class UserDetail(ModelViewSet):
queryset = models.UserLog.objects.all()
serializer_class = UserSerializer
authentication_classes = [JWTOurAuth] # 使用自定义的认证类
@action(methods=['GET'], detail=False)
def findall(self, request):
res = self.get_queryset()
serializer = self.get_serializer(instance=res, many=True)
return Response({'msg': '查询成功', 'result': serializer.data})
# serializer.py
from datetime import datetime
from rest_framework import serializers
from .MyJWT import MyJWT
from .models import UserLog
class UserSerializer(serializers.Serializer):
username = serializers.CharField()
password = serializers.CharField()
def validate(self, attrs):
# 模拟JWT第二段
userinfo = {'name': attrs["username"], 'time_field': str(datetime(2024, 1, 1, 12, 0)), 'time_expires': str(
datetime(2024, 1, 2, 12, 0)), 'level': UserLog.objects.get(username=attrs["username"]).level}
jwt_obj = MyJWT(payload=userinfo) # 将用户数据放入自定制的JWT生成类中生成JWT
JWT = jwt_obj.Issuance()
attrs['JWT'] = JWT # 将生成的JWT返回
return attrs
# MyJWT.py 用于生成JWT格式字符串
from datetime import datetime
import json
import base64
import hashlib
class MyJWT:
def __init__(self, payload=None, token=None):
self.header = {'type': 'jwt', 'alg': 'md5'}
self.payload = payload
self.token = token
def get_base64(self, info):
return base64.b64encode((json.dumps(info)).encode(encoding='utf-8'))
def get_md5(self, header, payload):
md5 = hashlib.md5()
md5.update(header)
md5.update(payload)
md5.update(b'7777') # 盐
return md5.hexdigest()
def get_token(self, header, payload):
base64_header = self.get_base64(header)
base64_payload = self.get_base64(payload)
token = f'{base64_header.decode(encoding="utf8")}.{base64_payload.decode(encoding="utf8")}.{self.get_md5(base64_header, base64_payload)}'
return token
def check_token(self, token):
base64_header, base64_payload, sign = token.split('.')
token = self.get_md5(base64_header.encode('utf8'), base64_payload.encode('utf8'))
if sign == token:
return True
else:
return False
def Issuance(self):
token = self.get_token(self.header, self.payload)
return token
def Validate(self, token):
if self.check_token(token):
return True
else:
return False
@staticmethod
def get_user(Token):
if 'name' not in json.loads(base64.b64decode(Token.split('.')[1]).decode(encoding='utf8')):
return False
return json.loads(base64.b64decode(Token.split('.')[1]).decode(encoding='utf8'))['name']
# 使用方法---放入数据(第二段)即可调用Issuance方法获取生成的JWT,调用Validate方法校验参数Token是否与原数据匹配
# userinfo = {'name': '张三', 'age': 18, 'time_field': str(datetime(2024, 1, 1, 12, 0)), 'time_expires': str(
# datetime(2024, 1, 2, 12, 0)), 'level': 1}
# jwt = MyJWT(payload=userinfo)
# Token = jwt.Issuance()
# user = MyJWT.get_user(Token)
# Token_validate = jwt.Validate(Token)
# MyAuthentication.py
from rest_framework.exceptions import AuthenticationFailed
from rest_framework_simplejwt.authentication import JWTAuthentication
from .MyJWT import MyJWT
class JWTOurAuth(JWTAuthentication):
def authenticate(self, request):
# 从浏览器获取token
if 'HTTP_TOKEN' not in request.META:
raise AuthenticationFailed("请添加token")
jwt = request.META['HTTP_TOKEN']
# 如果验证通过则直接返回用户与jwt字符串
jwt_obj = MyJWT()
JWT_validate = jwt_obj.Validate(jwt)
if JWT_validate:
return jwt_obj.get_user(jwt), jwt
raise AuthenticationFailed('你的JWT有误')
if 'HTTP_TOKEN' not in request.META:
raise AuthenticationFailed("请添加token")
jwt = request.META['HTTP_TOKEN']
# 如果验证通过则直接返回用户与jwt字符串
jwt_obj = MyJWT()
JWT_validate = jwt_obj.Validate(jwt)
if JWT_validate:
return jwt_obj.get_user(jwt), jwt
raise AuthenticationFailed('你的JWT有误')