update 前置函数,优化缓存方式

This commit is contained in:
tlh717 2023-03-15 10:04:55 +08:00
parent c5ba069536
commit 27590cfdad
7 changed files with 32 additions and 47 deletions

View File

@ -97,7 +97,7 @@ exit # 退出虚拟环境
* expectation: 用例输出
* response: 接口返回数据体
创建好了yaml文件,就可以创建测试用例文件了
创建好了yaml测试数据,就可以创建测试用例文件了
### 创建测试用例文件
![img.png](files/testcase.png)
@ -115,9 +115,9 @@ exit # 退出虚拟环境
* 测试函数必须test_*开头,重点说各个入参的字段
* requests: 封装好的实例化请求方法根据请求方式直接调用requests.post或requests.get如图中的res所示
* requests: 封装好的实例化请求方法根据请求方式直接调用requests.request如图中的res所示
* url: 由根目录config.yml文件中读取的路由及该测试用例对应的yaml文件中读取的path拼接无需填写只需正常传入
* env: 请求地址+请求方法组成的元祖数据使用时确保两边配置文件填写正确即可正常传入请求地址config.yml配置文件中的host以及对应测试用例中的path拼接请求方法对应测试用例中的method
* headers: 以静态方法存放在utils.requests_control文件的RestClient类中可按需更改
@ -133,9 +133,16 @@ exit # 退出虚拟环境
* sql: 数据库实例化方法可以通过sql.query('xxxx'), sql.execute('xxxxx')方式调用使用时直接使用yaml文件中的sql语句sql.query(inputs['sql'])(当然也可以直接写)
* cache: 共享的缓存数据这个使用起来主要注意例如在test_1中我需要使用某个数据给test_2使用那么可以在test_1中使用cache['key'] = 1 这样的方式把数据存入全局的共享数据中test_2使用时就用字典取值的方式cache['key']即可
,那么就有人问了,那多个数据依赖怎么办呢?目前的办法是将会产出依赖数据的接口打上@pytest.mark.run(order=1)的标签并且存入共享数据时的key必须不重复
这样可以理解为用例执行开始前有一个空字典,在测试过程中不断向其中添加依赖数据,而在后面执行的用例只需要从里面取就可以了
* cache: 缓存依赖数据,使用方式如下
* 添加缓存数据:
![img.png](files/testcache.png)
使用cache.add_cache()方法把数据添加到test_data目录下的cache.yml文件中
* 使用缓存数据:
![img.png](files/testcache2.png)
使用cache.get_cache()方法使用缓存数据,注意添加缓存数据的用例执行必须在使用缓存数据前,
可以使用@pytest.mark.run(order=1)来控制
### 运行测试用例

View File

@ -1,7 +1,6 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import time
import pytest
from utils.path import root
from itertools import zip_longest
@ -15,7 +14,7 @@ from utils.read_yaml_control import HandleYaml
def pytest_generate_tests(metafunc):
"""参数化测试函数"""
case, url, inputs, expectation = [], [], [], []
case, env, inputs, expectation = [], [], [], []
markers = metafunc.definition.own_markers
for marker in markers:
if marker.name == 'datafile':
@ -24,18 +23,18 @@ def pytest_generate_tests(metafunc):
for data in test_data['tests']:
if data['inputs'].get('json', None) is not None:
for k, v in list(data['inputs'].get('json', {}).items()):
if str(v)[0:2] == '{{' and str(v)[-2:] == '}}':
val = str(v)[2:-2]
if '{{' in str(v) and '}}' in str(v):
val = str(v)[v.find('{{') + 2:v.find('}}')]
data['inputs']['json'][k] = Execute(val)() # 替换yaml文件json中{{xxx}}xxx的值
if data['inputs'].get('file', {}):
data['inputs']['file'] = eval(f"open_file('{data['inputs'].get('file', {})}')") # 直接返回open_file对象
case.append(data.get('case', None) if data.get('case', None) is not None else {})
url.append(
str(host()) + str(test_data['common_inputs'].get('path', None)) if test_data['common_inputs'].get(
'path', None) is not None else {})
env.append((str(host()) + str(test_data['common_inputs'].get('path', None)) if test_data[
'common_inputs'].get(
'path', None) is not None else {}, str(test_data['common_inputs'].get('method', None)).lower()))
inputs.append(data.get('inputs', None) if data.get('inputs', None) is not None else {})
expectation.append(data.get('expectation', None) if data.get('expectation', None) is not None else {})
metafunc.parametrize("case, url, inputs, expectation", zip_longest(case, url, inputs, expectation),
metafunc.parametrize("case, env, inputs, expectation", zip_longest(case, env, inputs, expectation),
scope='function')

BIN
files/testcache.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 198 KiB

BIN
files/testcache2.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 144 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 67 KiB

After

Width:  |  Height:  |  Size: 139 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 48 KiB

After

Width:  |  Height:  |  Size: 137 KiB

View File

@ -15,55 +15,34 @@ class RestClient:
urllib3.disable_warnings()
self.sesson = requests.session() # 创建会话对象
self.sesson.cookies = cookie
def get(self, url, headers=None, **kwargs):
return self.request(url, "get", headers, verify=False, **kwargs)
def post(self, url, data=None, json=None, headers=None, **kwargs):
return self.request(url, "post", data, json, headers, verify=False, **kwargs)
def options(self, url, **kwargs):
return self.request(url, "options", verify=False, **kwargs)
def head(self, url, **kwargs):
return self.request(url, "head", **kwargs)
def put(self, url, data=None, **kwargs):
return self.request(url, "put", data, verify=False, **kwargs)
def patch(self, url, data=None, json=None, **kwargs):
return self.request(url, "patch", data, json, verify=False, **kwargs)
def delete(self, url, **kwargs):
return self.request(url, "delete", verify=False, **kwargs)
self._res = None
@Log(True)
def request(self, url, request_method, data=None, json=None, headers=None, **kwargs):
def request(self, env: set, data=None, json=None, headers=None, **kwargs):
url, request_method = env
if request_method == 'get':
res = self.sesson.get(url, headers=headers, **kwargs)
self._res = self.sesson.get(url, headers=headers, **kwargs)
elif request_method == 'post':
res = self.sesson.post(url, json, data, headers=headers, **kwargs)
self._res = self.sesson.post(url, json, data, headers=headers, **kwargs)
elif request_method == 'options':
res = self.sesson.options(url, **kwargs)
self._res = self.sesson.options(url, **kwargs)
elif request_method == 'head':
res = self.sesson.head(url, **kwargs)
self._res = self.sesson.head(url, **kwargs)
elif request_method == 'put':
res = self.sesson.put(url, data, **kwargs)
self._res = self.sesson.put(url, data, **kwargs)
elif request_method == 'patch':
data = json.dump(json) if json else ...
res = self.sesson.patch(url, data, **kwargs)
self._res = self.sesson.patch(url, data, **kwargs)
elif request_method == 'delete':
res = self.sesson.delete(url, **kwargs)
return res
self._res = self.sesson.delete(url, **kwargs)
return self._res
@staticmethod
def headers():
"""全局headers"""
headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36',
'ci-token': token
'token': token
}
return headers