注册接口
演示注册接口的三步操作:【注册流程逻辑】
第一步:发送注册短信验证码接口请求
请求方法: put
请求地址:http://shop.lemonban.com:8107/user/sendRegisterSms
请求参数:{“mobile”:“13422337766”}
请求头部:{“Content-Type”:application/json;charset=UTF-8"}
响应结果: 空
需要去查询数据库中tz_sms_log表获取验证码信息
项目的数据库说明文档在百度网盘里,大家自己去获取。
第二步:发送校验注册短信验证码接口请求
请求方法: put
请求地址:http://shop.lemonban.com:8107/user/checkRegisterSms
请求参数:{“mobile”:“13422337766”,“validCode”:“869760”}
请求头部:{“Content-Type”:application/json;charset=UTF-8"}
响应结果:b57b9b5865584c8ca3e23f36868a2511 【注意是文本-text】
第三步:真正的注册接口请求
请求方法: put
请求地址:http://shop.lemonban.com:8107/user/registerOrBindUser
“appType”:3,“checkRegisterSmsFlag”:“b57b9b5865584c8ca3e23f36868a2511”,“mobile”:“13422337766”,“userN
请求参数:
{
请求头部:{“Content-Type”:application/json;charset=UTF-8”}
{“access_token”:“79db686e-1a83-4401-a571-
23a68ac8d45b”,“token_type”:“bearer”,“refresh_token”:“47590c95-5b0f-48c7-8d5cd093fba4f021”,“expires_in”:1295999,“pic”:null,“userId”:“eea8b4fb74514837b2f683bbadcd22aa”,“nickName”
执行注册接口的用例,现在发现有几个问题需要解决:
1、需要去获取验证码了;验证没有在接口返回值里返回,我们就需要去数据库里获取了!
- 数据库获取验证码的问题: 因为我们有封装好的数据库查询的方法可以直接使用。
- 调用一下我们之前讲的数据库的封装好的方法, 返回的结果是查询数据的字典。
- 第一条用例执行完成之后,就会再数据库里生成一个记录
- 第二条用例执行之前查询数据库 得到数据库结果
2、手机号码和用户名注册过一次后,不能重复注册,就会有问题。
- 也就是标红的这些数据是不能写死的。要么是数据库里拿的 要么是不能每次写一样的的。 要么是从上个接
口返回的提取的
"""
思路:整体思路跟我们之前的提取响应结果差不多。
1、先把前置sql语句写在excel表格里: json格式 ,key 【变量名】: value 【sql语句】
2、读取出来,是字符串,做反序列化操作,转化为字典;
3、读取这个前置sql,for循环k v 得到这个变量和sql语句,sql去数据库里查,调用我们之前封装的sql的类方法,得到的结果是个字典。
4、查完后结果存储在环境变量作为属性。属性名就是k 变量名;
5、请求参数里的mobilecode就可以从环境变量里获取这个结果数据了。-- 调用之前的替换函数。【所以,参数里变化的数据需要占位符占位,方便查找替换】==已经完成了!
"""
import json
from tools.handle_mysql import HandleMysql
from datas.db_data import my_db
from tools.envi_data import EnviData
sql_data = '''{"mobile_code":
"select mobile_code from tz_sms_log where user_phone='13422337768' order by rec_date desc limit 1;"}'''
# 第一步: 字符串做反序列化操作--转化为字典
sql_data = json.loads(sql_data)
# 第二步: 分别得到key 和value ==for遍历 items()
for k,v in sql_data.items(): # k 是变量变量名- mobile_code,v是sql语句
sql_result = HandleMysql(**my_db).query_data(v) # 结果是字典 {'mobile_code': '845305'}
# 第三步: 查完后结果存储在环境变量作为属性。属性名就是k 变量名;mobile_code
for i,j in sql_result.items(): # i 是变量名 j 是v属性值
setattr(EnviData,i,j) # 把结果存在环境变量里
print(EnviData.__dict__)
前置SQL思路
先解决第一个问题,数据里获取数据。
思考问题: 这个操作应该是在第二个接口请求发送之前做的还是发送之后做的?
在excel表格里加上一列: 前置SQL。【参考Jmeter这种工具前置处理器,就是在接口执行之前操作】
思路:整体思路跟我们之前的提取响应结果差不多。
- 1、先把前置sql语句写在excel表格里: json格式 ,key 【变量名】: value 【sql语句】
- 2、读取出来,是字符串,做反序列化操作,转化为字典;
- 3、读取这个前置sql,for循环k v 得到这个变量和sql语句,sql去数据库里查,调用我们之前封装的sql的类方法,得到的结果是个字典。
- 4、查完后结果存储在环境变量作为属性。属性名就是k 变量名;
- 5、请求参数里的mobilecode就可以从环境变量里获取这个结果数据了。-- 调用之前的替换函数。【所以,参数里变化的数据需要占位符占位,方便查找替换】==已经完成了!
封装好了前置sql的方法后,思考在哪里用?
还是发送接口请求之前用,所以依然在requests_api方法库调用这个方法。
在替换提前之前调用,因为要提取完成后,去替换参数。
然后分析第三条用例。验证码"checkRegisterSmsFlag"的value是要第二步的结果里的值
但是第三步操作返回结果是text,不是json,怎么提取?
公用之前的提取的函数: 如果是json就用jsonpath提取,如果是text就或文本 【类似我们的响应断言的思路】
所以,要二次修改 extract提取的方法。加一个判断分支:
- 1、针对键值对的值做判断,是$开头的就是jsonpath
- 2、v如果是text 就是直接获取响应文本。
- 3、结果都是存在环境变量里的。
前置SQL数据封装
第一步:发送请求前先获取数据
handle_replace.py
"""
方法优化:
1、日志加上
2、测试用例方法里调用夹具 获取返回值。
- 更新requests-api,需要做token处理:
- 设置一个默认参数:token = None
- 如果接口需要鉴权,测试用例里调用夹具,得到token,requests_api传递token参数;--requests 更新头部
- 如果接口不需要鉴权: token不传 None。 不会做更新头部的操作。
"""
import json
import requests
from tools.handle_path import pic_path
from loguru import logger
from tools.handle_extract import extract_response
from tools.handle_replace import replace_mark
from tools.handle_presql import pre_sql
def requests_api(casedata,token=None):
method = casedata["请求方法"]
url = casedata["接口地址"]
headers = casedata["请求头"]
params = casedata["请求参数"]
presql = casedata["前置SQL"]
# 在执行前置SQL之前,替换占位符数据
presql = replace_mark(presql)
# 在数据替换之前调用前置SQL方法:调用完成后,把结果放到环境变量里
pre_sql(presql)
# 在发送请求之前完成头部和参数的替换--调用替换的函数==结果是字符串
headers = replace_mark(headers)
params = replace_mark(params)
# 反序列操作: 结合判空处理,
if headers is not None:
headers = json.loads(headers)
if token is not None: # 这是做接口如果需要鉴权,传进来token 更新头部信息。
headers["Authorization"] = token # 字典新增 / 修改
if params is not None:
params = json.loads(params)
logger.info("---------------------------请求消息-----------------------------------")
logger.info(f"请求方法是{method}")
logger.info(f"请求地址是{url}")
logger.info(f"请求头部是{headers}")
logger.info(f"请求参数是{params}")
#接口请求可能是get post put等各种请求方法 分支判断
if method.lower() == "get":
resp = requests.request(method=method, url=url, params=params,headers=headers)
elif method.lower() == "post":
if headers is None:
logger.info("头部为空,检查excel表格里头部信息!")
return
# post请求:content-type的类型有关系。需要对每一种类型做处理 分支判断
if headers["Content-Type"] == "application/json":
resp = requests.request(method=method, url=url, json=params, headers=headers)
if headers["Content-Type"] == "application/x-www-form-urlencoded":
resp = requests.request(method=method, url=url, data=params, headers=headers)
if headers["Content-Type"] == "multipart/form-data":
# 发送请求的时候不能带上 'Content-Type': 'multipart/form-data' 删除之后才发送接口请求。
headers.pop("Content-Type") # 字典删除元素
filename = params["filename"] # 文件名字 值
file_obj = {"file": (filename, open(pic_path/filename, "rb"))} # 文件参数
logger.info(f"文件接口的参数是:{file_obj}")
logger.info(f"文件接口的头部是:{headers}")
resp = requests.request(method=method, url=url,headers=headers,files=file_obj)
elif method.lower() == "put":
resp = requests.request(method=method, url=url, json=params, headers=headers)
logger.info("------------------------------响应消息-----------------------------")
logger.info(f"接口响应状态码是:{resp.status_code}")
logger.info(f"接口响应体是:{resp.text}")
# 提取响应结果的数据-- 调用提取数据的函数
extract_response(resp,casedata["提取响应字段"])
return resp
第二步从获取前置sql数据,为空就不调用数据库连接,不为空就调用数据库连接
handle_presql.py
"""
1、def封装
2、参数化
3、返回值: 因为数据都存在环境变量 所以不需要返回值
4、加上日志: 但凡你想确认数据结果的地方 都可以加上日志
5、因为有些接口不需要做前置SQL,所以判空处理:
"""
import json
from tools.handle_mysql import HandleMysql
from datas.db_data import my_db
from tools.envi_data import EnviData
from loguru import logger
def pre_sql(sql_data):
if sql_data is None:
return
# 第一步: 字符串做反序列化操作--转化为字典
logger.info("---------------------前置SQL执行开始-------------------------")
sql_data = json.loads(sql_data)
logger.info(f"前置sql提取表达式为:{sql_data}")
# 第二步: 分别得到key 和value ==for遍历 items()
for k,v in sql_data.items(): # k 是变量变量名- mobile_code,v是sql语句
sql_result = HandleMysql(**my_db).query_data(v) # 结果是字典 {'mobile_code': '845305'}
# 第三步: 查完后结果存储在环境变量作为属性。属性名就是k 变量名;mobile_code
for i,j in sql_result.items(): # i 是变量名 j 是v属性值
setattr(EnviData,i,j) # 把结果存在环境变量里
logger.info(f"提取并设置环境变量之后的类属性是:{EnviData.__dict__}")
if __name__ == '__main__':
sql_data = '''{"mobile_code":
"select mobile_code from tz_sms_log where user_phone='13422337768' order by rec_date desc limit 1;"}'''
pre_sql(sql_data)
第三步连接数据库封装
handle_mysql.py
import pymysql
from pymysql.cursors import DictCursor
from loguru import logger
class HandleMysql:
def __init__(self,user,password,database,port,host):
"""
定义了两个实例属性: conn cursor ,可以用于后续实例方法共享。
"""
self.conn = pymysql.connect(
user=user,
password=password,
database=database,
port=port,
host=host,
charset="utf8mb4",
cursorclass=DictCursor)
self.cursor = self.conn.cursor()
def query_data(self,query_sql,match_num=1,size=None):
"""
:param query_sql: 查询sql语句
:param match_num: 用户获取条数 match_num=1,fetchone;match_num=2,fetchmany,match_num=-1,fetchall
:param size:当match_num=2,size是查询的条数,传参。
:return: 返回查询结果数据
"""
try:
result = self.cursor.execute(query_sql) # 结果条数 >0 才有获取详细数据必要
logger.info(f"数据库的查询结果条数为:{result}")
if result > 0:
if match_num==1:
data = self.cursor.fetchone()
logger.info(f"查询结果数据为:{data}")
return data
elif match_num == 2:
data = self.cursor.fetchmany(size = size)
logger.info(f"查询结果数据为:{data}")
return data
elif match_num == -1:
data = self.cursor.fetchall()
logger.info(f"查询结果数据为:{data}")
return data
logger.warning("请传入1,2,-1的match_num")
logger.info("数据库没有查询结果!")
except:
logger.error("数据库操作异常!")
finally:
self.cursor.close()
self.conn.close()
if __name__ == '__main__':
my_db = {
"user": "lemon_auto",
"password": "lemon!@123",
"database": "yami_shops",
"port": 3306,
"host": "mall.lemonban.com"}
sql = "select mobile_code from tz_sms_log where user_phone='13645321122' order by rec_date desc limit 1;"
result = HandleMysql(**my_db).query_data(sql)
print(result)
数据生成思路
解决第二个问题:手机号码和用户名注册过一次后,不能重复注册,就会有问题。
思路:参考JMeter 随机函数 random
- 1、生成一个随机的手机号码 | 用户名,符合手机号和用户名的格式规则,类似于Jmeter里随机函数助手:random ,Python里也有类似的库:Faker
- 2、把生成的数据去数据库里确认是否真的不重复,调用数据库的方法 : 查询这个号码不存在用户表里
–select * from tz_user where user_mobile = "13444444444"
第一步:faker(骗子),这是一个生成随机数据的工具; Faker是一个第三方库,
安装: pip install Faker
官方地址:https://faker.readthedocs.io/en/stable/
faker.python下有更多的生成随机的方法:这些方法有很多,没有必要都记住,做好笔记,以及用到了再查笔记即可。
from faker import Faker
# 实例化 -Faker本身就是一个类
# 直接各种地区的标准,可以先指定一个区域 中国就是zh_CN,生成的数据就是中国的标准 电话 银行卡之类的
fk = Faker(locale='zh_CN')
print(fk.phone_number()) #手机号码
print(fk.name()) # 人名字,如果名字有长度的要求,4-6位,还要做额外的判断处理。
print(fk.name_female()) # 女性名字
print(fk.ssn()) # 身份证
print(fk.email()) # 邮箱
print(fk.company()) # 公司名字
print(fk.city()) # 城市
print(fk.address()) # 地址
print(fk.sentence()) # 一句话
print(fk.text()) # 一段文章
print(fk.pystr(min_chars=2,max_chars=10)) # 随机字符串 最小长度和最大长度
第二步: 把生成的数据去数据库里确认有无。调用数据库的方法 : 查询这个号码不存在用户表里:
select * from tz_user where user_mobile = "13444444444"
查询结果为None,就是不存在的。那么这个号码可以用
查询结果不为None,那么就是存在数据库了,重复了,不能用,就继续重新生成一个号码,再重复上述操作。
直到查询的结果为None位置。【所以,这个过程需要用什么技术完成?】
思考问题: 现在生成数据的方法已经封装好了,在哪里用?
我注册的第一条用例的数据就需要用,也就是这个号码的位置需要用-函数生成的数据 替换。
测试用例里先占位符替换一下 - 手机号码和用户名的位置需要调用函数生成-- 用函数名替换。
#gen_unregister_phone()# == 替换函数,gen_unregister_phone() 就是一个函数名 --直接调用函数 执行函数 得到结果
函数的返回值。
生成完成后,我们需要设置到环境变量里,后面要用的时候去环境变量里获取,用这个同样的号码。
思考: 后面还能每次调用这个函数生成么?–不能,因为需要是同一个号码 重新生成号码会是新的号码
既然要替换占位符,我们之前的函数就可以用了;但是之前的函数只用于替换变量,不能替换函
数。所以要扩展这个函数的功能:
思路:需要做分支判断:
- 如果是函数【有括号()】: 得到占位符里的函数,调用函数并执行函数,生成数据,存到环境变量里 ==新加的分支方法
- 如果是变量【没有()】: 那么就直接从环境变量里获取属性 替换就可以了;–老方法 不用改
"""
用faker生成随机手机号码和用户名:
第一步: 调用faker类生成手机号码
第二步:把生成的数据去数据库里确认是否真的不重复,调用数据库的方法
- 查询这个号码不存在用户表里-- select * from tz_user where user_mobile = "13444444444"
- 结果是None,数据库不存在的;
- 结果不是None,那么数据里存在的,继续生成,重复过程。
"""
from faker import Faker
from tools.handle_mysql import HandleMysql
from datas.db_data import my_db
# Faker是一个类 实例化,传参: locale可以指定地区,生成数据符合地区的格式【手机号,银行卡 地址】
fk = Faker(locale="zh_CN")
while True:
# 第一步:调用faker类生成手机号码
phone_number = fk.phone_number()
# 第二步:把生成的数据去数据库里确认是否真的不重复
sql = f'select * from tz_user where user_mobile = "{phone_number}"'
sql_result = HandleMysql(**my_db).query_data(sql)
if sql_result is not None: # 如果数据里有这个号码 继续生成 循环
continue
else: # 如果数据里没有这个号码 得到号码 跳出循环
print(phone_number)
break
数据生成封装
第一步生成随机数据并去数据库查询是否已含有对应的数据
handle_generate.py
"""
用户名有长度要求: 4-16位长度的用户名
因为这些生成数据的方法可能需要后续进行扩展: 生成其他的数据。
所以可以把这些方法都当到一个类里。 统一管理。
思考:这个函数应该在哪里执行呢?
思路:
1、执行第一个接口的时候,需要替换掉参数里的占位符位置-- 调用函数并执行函数的结果
"""
from faker import Faker
from tools.handle_mysql import HandleMysql
from datas.db_data import my_db
class GenData:
def gen_unregister_phone(self):
fk = Faker(locale="zh_CN")
while True:
# 第一步:调用faker类生成手机号码
phone_number = fk.phone_number()
# 第二步:把生成的数据去数据库里确认是否真的不重复
sql = f'select * from tz_user where user_mobile = "{phone_number}"'
sql_result = HandleMysql(**my_db).query_data(sql)
if sql_result is not None: # 如果数据里有这个号码 继续生成 循环
continue
else: # 如果数据里没有这个号码 得到号码 跳出循环
return phone_number
def gen_unregister_name(self):
fk = Faker(locale="zh_CN")
while True:
# 第一步:调用faker类生成用户名
username = fk.user_name()
# 第二步:把生成的数据去数据库里确认是否真的不重复
sql = f'select * from tz_user where user_name = "{username}"'
sql_result = HandleMysql(**my_db).query_data(sql)
if sql_result is not None or (len(username) < 4 or len(username) > 16): # 如果数据里有这个号码 继续生成 循环
continue
else: # 如果数据里没有这个号码 得到号码 跳出循环
return username
if __name__ == '__main__':
print(GenData().gen_unregister_phone())
print('GenData().gen_unregister_name()')
result = eval('GenData().gen_unregister_name()')
print(result)
第二步在excel数据中调用对应的函数方法。
第三步有就不替换,如果没有对应的函数(需要对数据进行二次替换)
handle_replace.py
"""
1、def封装
2、参数化
3、返回值: 最终要拿到替换后的字符串 --- 头部 参数 要用于发送接口测试的
4、加上日志: 但凡你想确认数据结果的地方 都可以加上日志
5、因为有些接口不需要做数据提取,所以判空处理:
6、异常捕获: 因为有可能环境变量里没有这个属性名 和属性值
"""
import re
from loguru import logger
from tools.envi_data import EnviData
from tools.handle_generate import GenData
def replace_mark(str_data):
while True:
if str_data is None:
return
result = re.search("#(.*?)#",str_data)
if result is None: # 如果没有占位符 就是None 跳出循环
break
mark = result.group() # 结果是 #prodId# --要被替换的子字符串| #gen_unregister_phone()#
logger.info(f"要被替换的子字符串:{mark}")
if "()" in mark:
fun_name = result.group(1) # 第一个分组的值 结果是 gen_unregister_phone()
logger.info(f"要提取环境变量的函数名:{fun_name}")
# 通过eval拖引号之后,不可以直接GenData().gen_unregister_name(),要导包
gen_data = eval(f'GenData().{fun_name}') # 接口函数的返回值结果-生成的数据
logger.info(f"生成的随机的数据是:{gen_data}")
# 1、存数据到环境变量里 -- 类属性的名字 函数名去掉()
var_name = fun_name.strip("()") # 结果是 gen_unregister_phone
setattr(EnviData,var_name,gen_data) # 属性名:gen_unregister_phone 属性值: gen_data
logger.info(f"环境变量的属性值:{EnviData.__dict__}")
# 2、完成第一条的参数的替换 用刚刚生成的数据替换
str_data = str_data.replace(mark,str(gen_data))
logger.info(f"替换完成后的字符串是:{str_data}")
else:
var_name = result.group(1) # 第一个分组的值 结果是 prodId
logger.info(f"要提取环境变量的属性名:{var_name}")
try:
var_value = getattr(EnviData,var_name) # 结果 : 7717--int类型
except AttributeError as e:
logger.error(f"环境变量里不存在这个属性:{var_name}")
raise e
logger.info(f"要提取环境变量的属性值:{var_value}")
str_data = str_data.replace(mark,str(var_value))
logger.info(f"替换完成后的字符串是:{str_data}")
return str_data
if __name__ == '__main__':
# str_data = '{"basketId": 0, "count": 1, "prodId": #prodId#, "shopId": 1, "skuId": #skuId#}'
str_data = '{"mobile": "#gen_unregister_phone()#"}'
replace_mark(str_data)