【一】序列化跟反序列化
# api接口开发,最核心最常见的一个过程就是序列化,所谓序列化就是把数据转换格式,序列化可以分两个阶段:【序列化值的是转换数据格式:序列化,返序列化】
# 序列化: 把我们识别的数据转换成指定的格式提供给别人
# 序列化: 表模型对象 ---> json格式字符串--->给前端
python后端:把python的对象【字典,列表,对象】---》转成json/xml格式字符串过程称之为序列化
例如:我们在django中获取到的数据默认是模型对象(qs/单个对象),但是模型对象数据无法直接提供给前端或别的平台使用,所以我们需要把数据进行序列化,变成字符串或者json数据,提供给别人。
# 反序列化:把别人提供的数据转换/还原成我们需要的格式。
# 反序列化: json字符串--->保存到数据库
例如:前端js提供过来的json/xml数据,对于python而言就是字符串,我们需要进行反序列化换成模型类对象,这样我们才能把数据保存到数据库中
# js 如何把对象序列化成字符串:【JSON.stringify()】,把字符串饭序列化成对象:【JSON.parse()】
【1】序列化(Serialization)
序列化是将对象转换为字节流或其他格式的过程。这通常涉及将对象的状态转换为可以存储或传输的格式,例如字节流、JSON、XML等。序列化的目的是将对象持久化,以便将其保存到文件中、通过网络发送给其他系统或在内存中进行传输。
在序列化过程中,对象的属性值被提取,并根据指定的格式转换为序列化后的表示形式。在Java中,常用的序列化技术是Java序列化(Java Serialization)或JSON序列化(使用类库如Jackson或Gson),而在其他语言中也有类似的序列化技术。
【2】反序列化(Deserialization):
反序列化是序列化的逆过程,即将序列化后的数据重新转换为对象的过程。在反序列化过程中,从序列化后的数据中提取信息,并根据原始对象的类定义重新构建对象,使其恢复到原始对象的状态。
反序列化的目的是从序列化后的数据中重新创建对象,以便对其进行操作或在程序中使用。与序列化类似,反序列化也是在存储、通信或对象传输等场景中常见的操作。
小结
总的来说,序列化和反序列化是在计算机中用于对象持久化、数据传输和通信的重要概念,它们允许将对象转换为字节流或其他格式,并在需要时重新构建对象。
具体示例
class TaskView(View):
def post(self, request):
# body gen body的使用方法
print(request.body)
ta = request.body
da = json.loads(ta)
import pickle
# 定义一个对象
class Person:
def __init__(self, name, age):
self.name = name
self.age = age
# 创建一个 Person 对象
person = Person("Alice", 30)
# 序列化对象到文件
with open("person.pickle", "wb") as file:
pickle.dump(person, file)
# 从文件中反序列化对象
with open("person.pickle", "rb") as file:
loaded_person = pickle.load(file)
# 打印反序列化后的对象属性
print("Name:", loaded_person.name)
print("Age:", loaded_person.age)
【二】FBV跟CBV的区别
FBV(Function-Based Views)和CBV(Class-Based Views)是两种不同的视图编写方式,它们有一些区别:
- 实现方式:
- FBV(Function-Based Views): 使用函数来编写视图。每个视图是一个独立的函数,接收HTTP请求并返回HTTP响应。
- CBV(Class-Based Views): 使用类来编写视图。每个视图是一个类,它可以包含多个方法来处理不同的HTTP请求方法(如GET、POST等)。
- 代码复用:
- FBV: 函数的复用性较差,通常需要将相同的代码复制粘贴到不同的视图函数中。
- CBV: 类的继承机制使得代码的复用性更好。你可以创建一个通用的基类视图,并在不同的子类视图中重写或添加特定的方法。
- 代码结构:
- FBV: 每个视图通常都是一个独立的函数,因此在文件中可能会有多个视图函数,使得文件结构较为扁平。
- CBV: 视图通常是一个类,因此在文件中可能会有多个视图类,使得文件结构更加模块化和组织化。
- 学习曲线:
- FBV: 对于初学者来说,函数式视图可能更容易理解和上手,因为它们更接近于普通的Python函数。
- CBV: 对于复杂的应用程序和视图逻辑,类视图提供了更多的结构化和组织化,但也需要一定的学习曲线。
总的来说,FBV适用于简单的视图逻辑和快速开发,而CBV适用于需要更多结构化和可扩展性的项目。选择使用哪种视图方式取决于你的项目需求和个人偏好。
【三】
【1】FBV的接口具体示例
限制条件
-
1 django —》创建表 Task 任务表
- -id
-task_id:唯一,不要用id自增–》时间戳/uuid python 中如何生成uuid 唯一(unique=True)
-task_name:任务名 建立索引 (index=True)
-task_time:任务执行时间
-task_desc:任务描述
- -id
-
2 使用mysql
-
3 写 5个 接口
- 查询所有,查询单个,新增一个,删除一个,修改一个
'''http://127.0.0.1:8000/task/add --->增加一个记录--》post请求方式
http://127.0.0.1:8000/task/get/id--->查询这条记录-——》get请求方式
http://127.0.0.1:8000/task/update/id--->更新这条记录》post请求方式
http://127.0.0.1:8000/task/delete/id--->删除---》post请求方式
http://127.0.0.1:8000/task/get/--->查询所有记录-——》get请求方式 '''
1.路由层
from django.contrib import admin
from django.urls import path
from Api01 import views
app_name = 'Api01'
urlpatterns = [
path('admin/', admin.site.urls),
path('task/add/', views.add_task, name='add_task'),
path('task/get/<str:id>', views.get_task, name='get_task'),
path('task/update/<str:id>', views.update_task, name='update_task'),
path('task/delete/<int:id>', views.delete_task, name='delete_task'),
path('task/get/', views.get_all_tasks, name='get_all_tasks'),
path('add/', views.add),
]
2.视图层
from django.shortcuts import render, HttpResponse, redirect
from django.http import JsonResponse
from Api01.models import Task
# 添加任务
def add_task(request):
if request.method == 'POST':
try:
task_id = request.POST.get('task_id')
task_name = request.POST.get('task_name')
task_time = request.POST.get('task_time')
task_desc = request.POST.get('task_desc')
Task.objects.create(task_desc=task_desc,task_time=task_time,task_name=task_name,task_id=task_id)
return JsonResponse({'code': 100, 'msg': '创建成功'})
except Exception as e:
return JsonResponse({'code': 200, 'msg': '创建失败,错误信息:{}'.format(str(e))})
else:
return JsonResponse({'code': 200, 'msg': '请求方法错误'})
# 更改任务
# views.py
def get_task(request, id):
try:
task = Task.objects.get(id=id)
result = {
'task_id': task.task_id,
'task_name': task.task_name,
'task_time': task.task_time.strftime('%Y-%m-%d'),
'task_desc': task.task_desc
}
return JsonResponse({'code': 100, 'msg': '查询成功', 'result': result})
except Task.DoesNotExist:
return JsonResponse({'code': 200, 'msg': '任务记录不存在'})
def add(requst):
return render(requst, 'succeess.html')
# 更新一条任务记录
# views.py
def update_task(request, id):
if request.method == 'POST':
try:
task_name = request.POST.get('task_name')
task_time = request.POST.get('task_time')
task_desc = request.POST.get('task_desc')
Task.objects.filter(pk=id).update(task_desc=task_desc,task_time=task_time,task_name=task_name)
return JsonResponse({'code': 100, 'msg': '更新成功'})
except Task.DoesNotExist:
return JsonResponse({'code': 200, 'msg': '任务记录不存在'})
except Exception as e:
return JsonResponse({'code': 200, 'msg': '更新失败,错误信息:{}'.format(str(e))})
else:
return JsonResponse({'code': 200, 'msg': '请求方法错误'})
# 删除一条任务记录
# views.py
def delete_task(request, id):
if request.method == 'POST':
try:
task = Task.objects.get(id=id)
task.delete()
return JsonResponse({'code': 100, 'msg': '删除成功'})
except Task.DoesNotExist:
return JsonResponse({'code': 200, 'msg': '任务记录不存在'})
except Exception as e:
return JsonResponse({'code': 200, 'msg': '删除失败,错误信息:{}'.format(str(e))})
else:
return JsonResponse({'code': 200, 'msg': '请求方法错误'})
# 查询所有任务记录
def get_all_tasks(request):
tasks = Task.objects.all()
results = []
for task in tasks:
result = {
'task_id': task.task_id,
'task_name': task.task_name,
'task_time': task.task_time.strftime('%Y-%m-%d'),
'task_desc': task.task_desc
}
results.append(result)
return JsonResponse({'code': 100, 'msg': '查询成功', 'results': results})
def delete(request,id):
if request.method == 'POST':
# 创建任务记录
Task.objects.filter(id = int(id)).delete()
return JsonResponse({'code': 100, 'msg': '创建成功'})
3.创建一个test测试接口文件
import random
import string
import threading
from threading import Thread
import time
import uuid
import requests
from fake_useragent import UserAgent
import os
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'three.settings')
import django
django.setup()
headers = {
'User-Agent': UserAgent().random,
}
base_url = 'http://127.0.0.1:8000/task/'
from datetime import date
from Api01.models import Task # 导入你的Task模型
# 创建多个Task对象并保存到数据库
def add_tasks(task_name,task_desc):
data = {
"task_name": task_name,
"task_desc": task_desc,
"task_time": time.strftime('%Y-%m-%d'),
"task_id":uuid.uuid1() # 转换为字符串
}
url = "http://127.0.0.1:8000/task/add/" # 请替换为你的服务器地址# 设置请求头
response = requests.post(url=url, data =data)
print(response.json())
def teadinfg(num_data):
data_list = []
for i in range(1,num_data + 1):
task_name = f'Task {i}'
task_desc = f'Task {i}'
data = Thread(target=add_tasks,args=(task_name,task_desc))
data_list.append(data)
data.start()
for data in data_list:
data.join()
teadinfg(50)
# 查询
def get(id):
url = base_url + f'get/{id}'
response = requests.get(url=url, headers=headers)
# print(response.text)
print(response.json())
# get('6')
# 查询全部
def get_all():
url = base_url + 'get/'
response = requests.get(url=url, headers=headers)
for info in response.json().get('results'):
print(info)
# get_all()
# 增加
def add():
task_name = input('task_name:>>>').strip()
task_desc = input('task_desc:>>>').strip()
data = {
"task_name": task_name,
"task_desc": task_desc,
"task_time": time.strftime('%Y-%m-%d'),
"task_id":uuid.uuid4() # 转换为字符串
}
url = "http://127.0.0.1:8000/task/add/" # 请替换为你的服务器地址# 设置请求头
response = requests.post(url=url, data =data)
print(response.json())
# add()
# 更新指定任务
def update(id):
task_name = input('任务名称:>>>>').strip()
task_time = time.strftime("%Y-%m-%d")
task_desc = input('任务内容:>>>>').strip()
data = {
'task_name':task_name,
'task_time':task_time,
'task_desc':task_desc,
}
url = base_url + f'update/{id}'
response = requests.post(url=url,headers=headers,data=data)
# print(response.text)
print(response.json())
# update('6')
# 删除指定任务
def delete(id):
url = base_url + f'delete/{id}'
response = requests.post(url=url, headers=headers)
print(response.json())
# delete(12)
【2】CBV的接口具体示例
1.先创建一个UserLog表
class UserLog(models.Model):
ip = models.CharField(max_length=100)
time = models.DateTimeField(auto_now_add=True)
method = models.CharField(max_length=10)
path = models.CharField(max_length=255)
user_agent = models.CharField(max_length=255)
def __str__(self):
return f"IP: {self.ip}, Time: {self.time}, Method: {self.method}, Path: {self.path}, User Agent: {self.user_agent}"
class Meta:
db_table = 'UserLog'
2.在创建一个路由分发
from django.contrib import admin
from django.urls import path,include
urlpatterns = [
path('admin/', admin.site.urls),
path('task_api/', include('task_api.urls')),
]
3.在子路由写一个正则表达式
from django.urls import path,include,re_path
from task_api.views import TaskView
urlpatterns = [
re_path(r'(?P<id>\d+)?/?$', TaskView.as_view()),
]
(?P<id>\d+)
: 这是一个命名捕获组,使用(?P<name>pattern)
的语法,其中name
是捕获组的名称,pattern
是要匹配的模式。在这里,捕获组的名称是 id,它用于捕获一个或多个数字(\d+)。?
: 这个问号表示前面的模式是可选的。换句话说,这个模式要么出现一次,要么不出现。/?
: 这表示匹配零个或一个斜杠 /。问号 ? 表示前面的斜杠是可选的。$
: 这个美元符号表示匹配字符串的结尾。
4.在视图层定义CBV
import json
from django.http import JsonResponse
from django.views import View
from .models import Task
import time
import uuid
class TaskView(View):
def post(self, request):
# body gen body的使用方法
# print(request.body)
# ta = request.body
# da = json.loads(ta)
task_name = request.POST.get('task_name')
task_desc = request.POST.get('task_desc')
task_time = time.strftime('%Y-%m-%d')
task_id = uuid.uuid4()
# 保存任务到数据库
Task.objects.create(
task_name=task_name,
task_desc=task_desc,
task_time=task_time,
task_id=task_id
)
return JsonResponse({'message': 'Task 添加成功!'})
def get(self, request, id=None):
if id:
try:
task = Task.objects.get(id=id)
print(task)
data = {
'task_name': task.task_name,
'task_desc': task.task_desc,
'task_time': task.task_time,
'task_id': task.task_id
}
print(data)
return JsonResponse(data)
except Task.DoesNotExist:
return JsonResponse({'error': 'Task 未能发现数据'}, status=404)
else:
tasks = Task.objects.all()
data = [{'task_name': task.task_name, 'task_desc': task.task_desc, 'task_time': task.task_time, 'task_id': task.task_id} for task in tasks]
return JsonResponse({'results': data})
def put(self, request, id):
try:
print(request.body)
data = request.body
calssdatda = json.loads(data)
print(calssdatda)
print(type(calssdatda))
# <class 'dict'>
task = Task.objects.get(id=id)
print(task)
task_name = calssdatda.get('task_name')
print(task_name)
task_desc = calssdatda.get('task_desc')
task_time = time.strftime('%Y-%m-%d')
# 更新任务信息
task.task_name = task_name
task.task_desc = task_desc
task.task_time = task_time
task.save()
return JsonResponse({'message': 'Task 更新成功'})
except Task.DoesNotExist:
return JsonResponse({'error': '未找到数据'}, status=404)
def delete(self,request,id):
try:
Task.objects.filter(pk=id).delete()
return JsonResponse({'message': 'Task 删除成功'})
except Task.DoesNotExist:
return JsonResponse({'error': '未找到数据'}, status=404)
5.写一个中间件进行预处理
from django.utils.deprecation import MiddlewareMixin
from task_api.models import UserLog
class LogMiddleware(MiddlewareMixin):
def process_request(self, request):
# 获取请求相关信息
# 通过访问 request.META 字典,获取了请求的客户端 IP 地址。'REMOTE_ADDR'
# 是一个 HTTP 头的键,它存储了客户端的 IP 地址。这个 IP 地址可以用来识别请求的来源。
ip = request.META.get('REMOTE_ADDR')
print( request.META)
# 获取了请求的方法
method = request.method
# 获取了请求的路径。request.get_full_path() 方法返回了当前请求的完整路径,包括查询参数(如果有)。
path = request.get_full_path()
# 获取了客户端的用户代理(User-Agent)。用户代理是一个字符串,用于标识客户端浏览器或其他用户代理程序的信息,如浏览器类型、版本号等。
user_agent = request.META.get('HTTP_USER_AGENT')
# 将信息记录到数据库中
UserLog.objects.create(ip=ip, method=method, path=path, user_agent=user_agent)
6.settings.py进行
- 添加’
task_api.middleware.LogMiddleware'
,ALLOWED_HOSTS = ["*"]
ALLOWED_HOSTS = ["*"]
MIDDLEWARE = [
'django.middleware.security.SecurityMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'django.middleware.common.CommonMiddleware',
# 'django.middleware.csrf.CsrfViewMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',
'task_api.middleware.LogMiddleware',
]
7.别人访问你的ip地址
-
先打开cmd
-
# ipconfig
-
在检查本地防火墙是否打开
-
这些只是准备工作