场景:
新需求还未开发时,使用mock提早介入测试,等后边开发后,进行调试
- 三方接口返回效率低,使用mock技术走通流程
1.mock方式
(1)如果会写django或flask,可以写简单对应的代码
(2)一些接口工具具备这个功能(postman)
(3)一些现成的框架直接使用:Moco框架
构造一个依赖的服务,并给予他预期的服务返回值,适用范围广,更加适合 集成测试
Moco框架
类似一个Mock的工具框架,一个简单搭建模拟服务器的程序库/工具,下载就是一个jar包
特点:
- 简单的配置request、response等即可满足要求
- 支持http、https、socket协议,可以说是非常的灵活性
- 支持在request中设置Headers,Cookies,StatusCode等
- 对get\post\put\delete等请求方式都支持
- 无需环境配置,有Java环境即可
- 修改配置后,立即生效。只需要维护接口,也就是契约即可
- 支持多种数据格式,如JSON\Text\XML\File等
- 可与其他工具集成,如Junit\Maven等
2.Mock服务搭建
(1)需要安装jdk
(2)下载moco的jar包
moco下载
提取码:8eem
(3)启动服务,jar包名称根据下载的jar包写
http代表这个模拟的是http请求,
-p 9090是定义端口号
-c test.json是编辑的json文件名
java -jar moco-runner-0.11.0-standalone.jar http -p 9090 -c test.json
如果cmd乱码就用:`
``java -jar -Dfile.encoding=utf-8 moco-runner-0.11.0-standalone.jar http -p 9090 -c demo.json```
(4)验证
/demo1:是json文件中定义的uri
访问:http://localhost:9090/demo1
3.Mock请求构建
start.bat
@echo on
java -jar -Dfile.encoding=utf-8 moco-runner-0.11.0-standalone.jar http -p 9090 -c demo.json
@echo off
pause
demo.json
[
{
"description":"11",
"request":{
"uri":"/shop",
"method":"Get"
},
"response":{
"text":"Hello,baby"
}
}
]
浏览器输入:http://127.0.0.1:9090/shop
4.配置不同的请求
(1)约定uri
[
{
"description":"约定uri",
"request":{
"uri":"/shop"
},
"response":{
"text":"Hello,baby"
}
}
]
(2)约定请求参数
[
{
"description":"约定请求参数",
"request":{
"queries":{
"key1":"123",
"key2":"cvf"
}
},
"response":{
"text":"Hello,baby"
}
}
]
(3)约定请求方法
[
{
"description":"约定请求方法",
"request":{
"method":"DELETE"
},
"response":{
"text":"Hello,baby"
}
}
]
(4)约定请求头
[
{
"description":"约定请求头",
"request":{
"headers":{
"Content-Type":"application/xml"
}
},
"response":{
"text":"Hello,baby"
}
}
]
(5)约定请求体参数-form
[
{
"description":"约定请求体参数-form",
"request":{
"forms":{
"key1":"anc"
}
},
"response":{
"text":"Hello,baby"
}
}
]
(6)约定请求体参数-json
[
{
"description":"约定请求体参数-json",
"request":{
"json":{
"key1":"anc",
"key2":"anc"
}
},
"response":{
"text":"Hello,baby"
}
}
]
(7)返回响应头和json
[
{
"description":"返回响应头和json",
"request":{
"json":{
"key1":"anc",
"key2":"anc"
}
},
"response":{
"headers":{
"Content-Type":"application/json"
},
"json":{
"code":"200"
}
}
}
]
代码测试
HOST = 'http://127.0.0.1:9090'
import requests
def test():
url=f'{HOST}/shop_up'
payload={'key':'abc'}
# form格式:data=payload json格式:json=payload
resp=requests.post(url,json=payload)
print(resp.text)
if __name__ == '__main__':
test()
要先执行jar包启动服务,在运行上述代码
5.异步接口实战
(1)同步和异步
同步:不利于性能提升,需要等待响应
异步:发送请求后,响应后续给出,可以先做其他的
eg:体检拿结果,可以A项做完后,直接做B项的同时等A的结果,不用先等A的结果,再去做B。
(2)异步接口实现
- 业务场景
店铺向平台申请退单请求
平台接受请求,核实信息
平台3个工作日内告知结果 - 具体实现
通过提交申请的接口给服务端
服务器立即返回这个申请id
后续使用查询的接口,带上id查询结果
看是否有返回结果 - mock+异步查询技术
拿到申请和查询订单结果接口的文档,获取url、参数、响应等信息
(1)申请退单接口
HOST = 'http://127.0.0.1:9090'
import requests
import time
# ----------------1.创建申诉接口---------------
def create_order(inData):
url = f'{HOST}/api/order/create/'
payload = inData
resp = requests.post(url, json=payload)
return resp.json()
# -------2.查询结果接口--------
"""
查询接口注意事项:
1.使用id去查--id是creat_order创建返回的
2.次数/频率--频率 interval
3.超时机制--timeout
"""
def get_order_result(orderID, interval=5, timeout=30):
"""
:param orderID: 订单id
:param interval: 查询频率
:param timeout: 超时时间
:return: 查询结果
"""
url = f'{HOST}/api/order/get_result/'
payload = {"order_id": orderID}
# 获取开始查询的时间和结束时间
starTime = time.time() # 单位是s
endTime = starTime + timeout
cnt=0#查询的次数--初始值为0
# 判断查询时间是否超时
while time.time()<endTime:
resp = requests.get(url, params=payload)
cnt+=1
# 如果有响应数据,直接结束循环,break
if resp.text:
print(f"第{cnt}次查询,已有结果,退出查询>>>",resp.text)
break
else:
print(f'第{cnt}次查询,暂无结果,请耐心等待>>>')
time.sleep(5)#等5s
print('====该查询函数执行完毕===')
return resp.text
if __name__ == '__main__':
testData = {
"user_id": "zz123",
"goods_id": "20240424",
"num": 1,
"amount": 200.6
}
res = create_order(testData)
print(res)
id = res['order_id']
# 2.调用查询接口
res2 = get_order_result(id)
print(res2)
结果:异步接口已经实现,但执行效率低
原因:
(1)查询接口使用sleep–5s,导致自动化测试效率低
优化:
(1)代码层面原因:当执行sleep()时,代码不会处理其他事情
requests.xx方法和sleep()属于io阻塞
深挖细节
cpu:
- io阻塞模式
- cpu密集性
(2)具体解决方案
改为多线程,等待的时候,做其他接口,等待时间到了之后,再回来继续执行下一步操作
主线程:main–下面的代码
子线程:就是get_order_result函数
HOST = 'http://127.0.0.1:9090'
import requests
import time
import threading #多线程--内置库
# ----------------1.创建申诉接口---------------
def create_order(inData):
url = f'{HOST}/api/order/create/'
payload = inData
resp = requests.post(url, json=payload)
return resp.json()
# -------2.查询结果接口--------
"""
查询接口注意事项:
1.使用id去查--id是creat_order创建返回的
2.次数/频率--频率 interval
3.超时机制--timeout
"""
def get_order_result(orderID, interval=5, timeout=30):
"""
:param orderID: 订单id
:param interval: 查询频率
:param timeout: 超时时间
:return: 查询结果
"""
url = f'{HOST}/api/order/get_result/'
payload = {"order_id": orderID}
# 获取开始查询的时间和结束时间
starTime = time.time() # 单位是s
endTime = starTime + timeout
cnt=0#查询的次数--初始值为0
# 判断查询时间是否超时
while time.time()<endTime:
resp = requests.get(url, params=payload)
cnt+=1
# 如果有响应数据,直接结束循环,break
if resp.text:
print(f"第{cnt}次查询,已有结果,退出查询>>>",resp.text)
break
else:
print(f'第{cnt}次查询,暂无结果,请耐心等待>>>')
time.sleep(5)#等5s
print('====该查询函数执行完毕===')
return resp.text
if __name__ == '__main__':
testData = {
"user_id": "zz123",
"goods_id": "20240424",
"num": 1,
"amount": 200.6
}
res = create_order(testData)
print(res)
id = res['order_id']
#--------多线程操作---------
#创建子线程
"""
threading.Thread(target,args)
target:需要把哪一个函数做为子线程执行的任务,就写该函数名
args:该函数需要传入的参数-以元组形式
"""
t1=threading.Thread(target=get_order_result,args=(id,))
# 主线程如果结束或异常退出,子线程就直接退出
t1.setDaemon(True)#守护线程
t1.start()
#---------这些是模拟其他接口的测试操作
for one in range(20):
time.sleep(1)#模拟其他接口执行的时间
print(f'{one}---正在执行其他接口')
#--------------------------