增加统一自定义异常类
This commit is contained in:
parent
94d1694b06
commit
0596a58892
|
@ -16,72 +16,77 @@ from common.utils.decorators import request_retry_on_exception
|
|||
|
||||
|
||||
class Pyt(LoadModulesFromFolder):
|
||||
session = requests.Session()
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.request = None
|
||||
self.response = None
|
||||
|
||||
@request_retry_on_exception()
|
||||
def http_client(self, host, url, method, **kwargs):
|
||||
"""
|
||||
发送 http 请求
|
||||
@param host: 域名
|
||||
@param url: 接口 __url
|
||||
@param method: http 请求方法
|
||||
@param kwargs: 接受 requests 原生的关键字参数
|
||||
@return: 响应对象
|
||||
"""
|
||||
# 关闭 https 警告信息
|
||||
urllib3.disable_warnings()
|
||||
|
||||
if not url:
|
||||
raise ValueError("URL cannot be None")
|
||||
__url = f'{host}{url}' if not re.match(r"https?", url) else url
|
||||
|
||||
# 增加兼容
|
||||
# 处理 headers 参数为字符串类型的情况
|
||||
if 'headers' in kwargs and isinstance(kwargs['headers'], str):
|
||||
kwargs['headers'] = json.loads(kwargs['headers'])
|
||||
|
||||
# 处理 json 参数为字符串类型的情况
|
||||
if 'json' in kwargs and isinstance(kwargs['json'], str):
|
||||
kwargs['json'] = json.loads(kwargs['json'])
|
||||
|
||||
# 处理 files 参数的情况
|
||||
fs = []
|
||||
if 'files' in kwargs:
|
||||
file_paths = kwargs['files']
|
||||
if isinstance(file_paths, str):
|
||||
file_paths = json.loads(file_paths)
|
||||
files = []
|
||||
file_utils = FileUtils()
|
||||
for i, file_path in enumerate(file_paths):
|
||||
file_type = mimetypes.guess_type(file_path)[0]
|
||||
file_path_completion = file_utils.get_file_path(file_path)
|
||||
f = open(file_path_completion, 'rb')
|
||||
fs.append(f)
|
||||
files.append(
|
||||
('file', (f'{file_path}', f, file_type))
|
||||
)
|
||||
kwargs['files'] = files
|
||||
|
||||
# 发送请求
|
||||
self.request = requests.Request(method, __url, **kwargs)
|
||||
self.response = self.session.send(self.request.prepare(), timeout=30, verify=True)
|
||||
[i.close() for i in fs if len(fs) > 0]
|
||||
return self.response
|
||||
session = requests.Session()
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.request = None
|
||||
self.response = None
|
||||
self.response_json = None
|
||||
|
||||
@request_retry_on_exception()
|
||||
def http_client(self, host, url, method, **kwargs):
|
||||
"""
|
||||
发送 http 请求
|
||||
@param host: 域名
|
||||
@param url: 接口 __url
|
||||
@param method: http 请求方法
|
||||
@param kwargs: 接受 requests 原生的关键字参数
|
||||
@return: 响应对象
|
||||
"""
|
||||
# 关闭 https 警告信息
|
||||
urllib3.disable_warnings()
|
||||
|
||||
if not url:
|
||||
raise ValueError("URL cannot be None")
|
||||
__url = f'{host}{url}' if not re.match(r"https?", url) else url
|
||||
|
||||
# 增加兼容
|
||||
# 处理 headers 参数为字符串类型的情况
|
||||
if 'headers' in kwargs and isinstance(kwargs['headers'], str):
|
||||
kwargs['headers'] = json.loads(kwargs['headers'])
|
||||
|
||||
# 处理 json 参数为字符串类型的情况
|
||||
if 'json' in kwargs and isinstance(kwargs['json'], str):
|
||||
kwargs['json'] = json.loads(kwargs['json'])
|
||||
|
||||
# 处理 files 参数的情况
|
||||
fs = []
|
||||
if 'files' in kwargs:
|
||||
file_paths = kwargs['files']
|
||||
if isinstance(file_paths, str):
|
||||
file_paths = json.loads(file_paths)
|
||||
files = []
|
||||
file_utils = FileUtils()
|
||||
for i, file_path in enumerate(file_paths):
|
||||
file_type = mimetypes.guess_type(file_path)[0]
|
||||
file_path_completion = file_utils.get_file_path(file_path)
|
||||
f = open(file_path_completion, 'rb')
|
||||
fs.append(f)
|
||||
files.append(
|
||||
('file', (f'{file_path}', f, file_type))
|
||||
)
|
||||
kwargs['files'] = files
|
||||
|
||||
# 发送请求
|
||||
self.request = requests.Request(method, __url, **kwargs)
|
||||
self.response = self.session.send(self.request.prepare(), timeout=30, verify=True)
|
||||
[i.close() for i in fs if len(fs) > 0]
|
||||
try:
|
||||
self.response_json = self.response.json()
|
||||
except Exception as e:
|
||||
self.response_json = None
|
||||
return self.response
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
hst = 'https://kkk.ll.com'
|
||||
url = '/bsp/test/'
|
||||
method = 'post'
|
||||
kwargs = {
|
||||
'headers': {},
|
||||
'data': {},
|
||||
'files': ['test.txt']
|
||||
}
|
||||
pyt = Pyt()
|
||||
pyt.http_client(hst, url, method, **kwargs)
|
||||
hst = 'https://kkk.ll.com'
|
||||
url = '/bsp/test/'
|
||||
method = 'post'
|
||||
kwargs = {
|
||||
'headers': {},
|
||||
'data': {},
|
||||
'files': ['test.txt']
|
||||
}
|
||||
pyt = Pyt()
|
||||
pyt.http_client(hst, url, method, **kwargs)
|
||||
|
|
|
@ -14,6 +14,7 @@ from common.crypto.encrypt_data import EncryptData
|
|||
from common.database.mysql_client import MysqlClient
|
||||
from common.log_utils.mylogger import MyLogger
|
||||
from common.utils.decorators import singleton
|
||||
from common.utils.exceptions import *
|
||||
from common.validation.extractor import Extractor
|
||||
from common.validation.load_and_execute_script import LoadScript
|
||||
from common.validation.validator import Validator
|
||||
|
@ -38,16 +39,12 @@ class Action(Extractor, LoadScript, Validator, MysqlClient):
|
|||
compiled = compile(ast_obj, '<string>', 'exec')
|
||||
exec(compiled, {"pm": self})
|
||||
except SyntaxError as e:
|
||||
error_message = f'动态代码语法异常: {e}'
|
||||
self._handle_error(error_message)
|
||||
except Exception as e:
|
||||
error_message = f"动态代码执行异常: {e}"
|
||||
self._handle_error(error_message)
|
||||
ExecuteDynamiCodeError(code, e)
|
||||
except ExecuteDynamiCodeError as e:
|
||||
ExecuteDynamiCodeError(code, e)
|
||||
|
||||
return self.variables
|
||||
|
||||
def _handle_error(self, error_message):
|
||||
print(f'发现异常: {error_message}')
|
||||
|
||||
@property
|
||||
def variables(self, key=None):
|
||||
return self.__variables if not key else self.__variables.get(key)
|
||||
|
@ -68,10 +65,9 @@ class Action(Extractor, LoadScript, Validator, MysqlClient):
|
|||
return headers, request_data
|
||||
|
||||
def send_request(self, host, url, method, teardown_script, **kwargs):
|
||||
response = self.http_client(host, url, method, **kwargs)
|
||||
self.update_environments("responseTime", response.elapsed.total_seconds() * 1000) # 存响应时间
|
||||
self.execute_dynamic_code(response, teardown_script)
|
||||
return response
|
||||
self.http_client(host, url, method, **kwargs)
|
||||
self.update_environments("responseTime", self.response.elapsed.total_seconds() * 1000) # 存响应时间
|
||||
self.execute_dynamic_code(self.response, teardown_script)
|
||||
|
||||
@staticmethod
|
||||
def base_info(item):
|
||||
|
@ -142,9 +138,8 @@ class Action(Extractor, LoadScript, Validator, MysqlClient):
|
|||
if sleep_time:
|
||||
try:
|
||||
time.sleep(sleep_time)
|
||||
except Exception as e:
|
||||
self.logger.error("暂时时间必须是数字")
|
||||
raise e
|
||||
except MyBaseException as e:
|
||||
raise MyBaseException(f"暂停时间必须是数字!")
|
||||
|
||||
def exc_sql(self, item):
|
||||
sql, sql_params_dict = self.sql_info(item)
|
||||
|
@ -152,11 +147,14 @@ class Action(Extractor, LoadScript, Validator, MysqlClient):
|
|||
if sql:
|
||||
try:
|
||||
execute_sql_results = self.execute_sql(sql)
|
||||
except DatabaseExceptionError as e:
|
||||
raise DatabaseExceptionError(sql, str(e))
|
||||
else:
|
||||
if execute_sql_results and sql_params_dict:
|
||||
self.substitute_data(execute_sql_results, jp_dict=sql_params_dict)
|
||||
except Exception as e:
|
||||
self.logger.error(f'执行 sql 失败:{sql},异常信息:{e}')
|
||||
raise e
|
||||
try:
|
||||
self.substitute_data(execute_sql_results, jp_dict=sql_params_dict)
|
||||
except ParameterExtractionError as e:
|
||||
ParameterExtractionError(sql_params_dict, str(e))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
|
|
@ -12,46 +12,46 @@ from functools import wraps
|
|||
|
||||
|
||||
def singleton(cls):
|
||||
"""
|
||||
Args:
|
||||
cls:被装饰类
|
||||
Returns:
|
||||
"""
|
||||
instance = {}
|
||||
|
||||
@wraps(cls)
|
||||
def get_instance(*args, **kwargs):
|
||||
if cls not in instance:
|
||||
instance[cls] = cls(*args, **kwargs)
|
||||
return instance[cls]
|
||||
|
||||
return get_instance
|
||||
"""
|
||||
Args:
|
||||
cls:被装饰类
|
||||
Returns:
|
||||
"""
|
||||
instance = {}
|
||||
|
||||
@wraps(cls)
|
||||
def get_instance(*args, **kwargs):
|
||||
if cls not in instance:
|
||||
instance[cls] = cls(*args, **kwargs)
|
||||
return instance[cls]
|
||||
|
||||
return get_instance
|
||||
|
||||
|
||||
def request_retry_on_exception(retries=2, delay=1.5):
|
||||
def request_decorator(func):
|
||||
e = None
|
||||
|
||||
@wraps(func)
|
||||
def wrapper(*args, **kwargs):
|
||||
nonlocal e
|
||||
for i in range(retries):
|
||||
try:
|
||||
print(f"第{i}次发送请求的参数: {kwargs}")
|
||||
response = func(*args, **kwargs)
|
||||
print(f"请求地址 --> {response.request.url}")
|
||||
print(f"请求头 --> {response.request.headers}")
|
||||
print(f"请求 body --> {response.request.body}")
|
||||
print(f"接口状态--> {response.status_code}")
|
||||
print(f"接口耗时--> {response.elapsed}")
|
||||
print(f"接口响应--> {response.text}")
|
||||
except Exception as error:
|
||||
e = error
|
||||
time.sleep(delay)
|
||||
else:
|
||||
return response
|
||||
raise Exception(f"请求重试**{retries}**次失败,请检查!!{e}")
|
||||
|
||||
return wrapper
|
||||
|
||||
return request_decorator
|
||||
def request_decorator(func):
|
||||
e = None
|
||||
|
||||
@wraps(func)
|
||||
def wrapper(*args, **kwargs):
|
||||
nonlocal e
|
||||
for i in range(retries):
|
||||
try:
|
||||
print(f"第{i + 1}次发送请求的参数: {kwargs}")
|
||||
response = func(*args, **kwargs)
|
||||
print(f"请求地址 --> {response.request.url}")
|
||||
print(f"请求头 --> {response.request.headers}")
|
||||
print(f"请求 body --> {response.request.body}")
|
||||
print(f"接口状态--> {response.status_code}")
|
||||
print(f"接口耗时--> {response.elapsed}")
|
||||
print(f"接口响应--> {response.text}")
|
||||
except Exception as error:
|
||||
e = error
|
||||
time.sleep(delay)
|
||||
else:
|
||||
return response
|
||||
raise Exception(f"请求重试**{retries}**次失败,请检查!!{e}")
|
||||
|
||||
return wrapper
|
||||
|
||||
return request_decorator
|
||||
|
|
|
@ -0,0 +1,71 @@
|
|||
#!/usr/bin/env python
|
||||
# encoding: utf-8
|
||||
"""
|
||||
@author: kira
|
||||
@contact: 262667641@qq.com
|
||||
@file: exceptions.py
|
||||
@time: 2023/8/1 9:12
|
||||
@desc:
|
||||
"""
|
||||
|
||||
|
||||
class MyBaseException(Exception):
|
||||
def __init__(self, msg):
|
||||
self.msg = msg
|
||||
|
||||
def __str__(self):
|
||||
return self.msg
|
||||
|
||||
|
||||
class RequestSendingError(MyBaseException):
|
||||
"""请求异常"""
|
||||
ERROR_CODE = 1001
|
||||
|
||||
def __init__(self, url, reason):
|
||||
msg = f"请求异常:URL={url}, 原因={reason}"
|
||||
super().__init__(msg)
|
||||
|
||||
|
||||
class DatabaseExceptionError(MyBaseException):
|
||||
"""数据库异常"""
|
||||
ERROR_CODE = 1002
|
||||
|
||||
def __init__(self, operation, reason):
|
||||
msg = f"数据库异常:操作={operation}, 原因={reason}"
|
||||
super().__init__(msg)
|
||||
|
||||
|
||||
class ParameterExtractionError(MyBaseException):
|
||||
"""参数提取异常"""
|
||||
ERROR_CODE = 1003
|
||||
|
||||
def __init__(self, parameter_path, reason):
|
||||
msg = f"参数提取异常:参数路径={parameter_path}, 原因={reason}"
|
||||
super().__init__(msg)
|
||||
|
||||
|
||||
class ParameterReplacementError(MyBaseException):
|
||||
"""参数替换异常"""
|
||||
ERROR_CODE = 1004
|
||||
|
||||
def __init__(self, parameter_name, reason):
|
||||
msg = f"参数替换异常:参数名称={parameter_name}, 原因={reason}"
|
||||
super().__init__(msg)
|
||||
|
||||
|
||||
class AssertionFailedError(MyBaseException):
|
||||
"""断言异常"""
|
||||
ERROR_CODE = 1005
|
||||
|
||||
def __init__(self, assertion_name, actual_value, expected_value):
|
||||
msg = f"断言失败:断言名称={assertion_name}, 实际值={actual_value}, 期望值={expected_value}"
|
||||
super().__init__(msg)
|
||||
|
||||
|
||||
class ExecuteDynamiCodeError(MyBaseException):
|
||||
"""执行动态代码异常"""
|
||||
ERROR_CODE = 1006
|
||||
|
||||
def __init__(self, code, reason):
|
||||
msg = f"执行动态代码异常:动态代码={code}, 原因={reason}"
|
||||
super().__init__(msg)
|
|
@ -12,9 +12,9 @@ import unittest
|
|||
from ddt import ddt, data
|
||||
|
||||
from common import bif_functions
|
||||
from common.file_handling.do_excel import DoExcel
|
||||
from common.utils.action import Action
|
||||
from config import Config
|
||||
from common.file_handling.do_excel import DoExcel
|
||||
from extensions import dynamic_scaling_methods
|
||||
|
||||
test_file = Config.test_case # 获取 excel 文件路径
|
||||
|
@ -25,82 +25,81 @@ test_case, databases, initialize_data, host = excel.get_excel_init_and_cases()
|
|||
|
||||
@ddt
|
||||
class TestProjectApi(unittest.TestCase):
|
||||
maxDiff = None
|
||||
action = Action(initialize_data, bif_functions, databases)
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls) -> None:
|
||||
pass
|
||||
|
||||
def setUp(self) -> None:
|
||||
self.action.set_bif_fun(dynamic_scaling_methods)
|
||||
|
||||
@data(*test_case)
|
||||
def test_api(self, item):
|
||||
sheet, iid, condition, st, name, desc, h_crypto, r_crypto, method, expected = self.action.base_info(item)
|
||||
if self.action.is_run(condition):
|
||||
self.skipTest("这个测试用例听说泡面比较好吃,所以放弃执行了!!")
|
||||
regex, keys, deps, jp_dict, ex_request_data = self.action.extractor_info(item)
|
||||
setup_script, teardown_script = self.action.script(item)
|
||||
self.action.pause_execution(st)
|
||||
|
||||
# 首执行 sql
|
||||
self.action.exc_sql(item)
|
||||
if method.upper() == 'SQL':
|
||||
self.skipTest("这条测试用例被 SQL 吃了,所以放弃执行了!!")
|
||||
|
||||
# 执行动态代码
|
||||
item = self.action.execute_dynamic_code(item, setup_script)
|
||||
|
||||
# prepost_script = f"prepost_script_{sheet}_{iid}.py"
|
||||
# item = self.action.load_and_execute_script(Config.SCRIPTS_DIR, prepost_script, "setup", item)
|
||||
|
||||
# 修正参数
|
||||
item = self.action.replace_dependent_parameter(item)
|
||||
url, query_str, request_data, headers, request_data_type = self.action.request_info(item)
|
||||
|
||||
# 分析请求参数信息
|
||||
headers, request_data = self.action.analysis_request(request_data, h_crypto, headers, r_crypto, ex_request_data)
|
||||
result_tp = None
|
||||
result = "PASS"
|
||||
|
||||
# 执行请求操作
|
||||
kwargs = {request_data_type: request_data, 'headers': headers, "params": query_str}
|
||||
response = self.action.send_request(host, url, method, teardown_script, **kwargs)
|
||||
|
||||
try:
|
||||
# 提取响应
|
||||
self.action.substitute_data(response.json(), regex=regex, keys=keys, deps=deps, jp_dict=jp_dict)
|
||||
except Exception as err:
|
||||
self.action.logger.error(f"提取响应失败:{sheet}_{iid}_{name}_{desc}"
|
||||
f"\nregex={regex};"
|
||||
f" \nkeys={keys};"
|
||||
f"\ndeps={deps};"
|
||||
f"\njp_dict={jp_dict}"
|
||||
f"\n{err}")
|
||||
|
||||
# 修正断言
|
||||
expected = self.action.replace_dependent_parameter(expected)
|
||||
try:
|
||||
# print(f"期望结果--> {expected}")
|
||||
# 执行断言 返回结果元组
|
||||
self.action.run_validate(expected, response.json())
|
||||
except Exception as e:
|
||||
result = "FAIL"
|
||||
self.action.logger.error(f'异常用例: **{sheet}_{iid}_{name}_{desc}**\n{e}')
|
||||
raise e
|
||||
finally:
|
||||
print(f"断言结果-->", self.action.assertions)
|
||||
response = response.text if response is not None else str(response)
|
||||
# 响应结果及测试结果回写 excel
|
||||
excel.write_back(sheet_name=sheet, i=iid, response=response, test_result=result,
|
||||
assert_log=str(self.action.assertions))
|
||||
|
||||
@classmethod
|
||||
def tearDownClass(cls) -> None:
|
||||
excel.close_excel()
|
||||
cls.action.logger.info(f"所有用例执行完毕")
|
||||
maxDiff = None
|
||||
action = Action(initialize_data, bif_functions, databases)
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls) -> None:
|
||||
pass
|
||||
|
||||
def setUp(self) -> None:
|
||||
self.action.set_bif_fun(dynamic_scaling_methods)
|
||||
|
||||
@data(*test_case)
|
||||
def test_api(self, item):
|
||||
sheet, iid, condition, st, name, desc, h_crypto, r_crypto, method, expected = self.action.base_info(item)
|
||||
if self.action.is_run(condition):
|
||||
self.skipTest("这个测试用例听说泡面比较好吃,所以放弃执行了!!")
|
||||
regex, keys, deps, jp_dict, ex_request_data = self.action.extractor_info(item)
|
||||
setup_script, teardown_script = self.action.script(item)
|
||||
self.action.pause_execution(st)
|
||||
|
||||
# 首执行 sql
|
||||
self.action.exc_sql(item)
|
||||
if method.upper() == 'SQL':
|
||||
self.skipTest("这条测试用例被 SQL 吃了,所以放弃执行了!!")
|
||||
|
||||
# 执行动态代码
|
||||
item = self.action.execute_dynamic_code(item, setup_script)
|
||||
|
||||
# prepost_script = f"prepost_script_{sheet}_{iid}.py"
|
||||
# item = self.action.load_and_execute_script(Config.SCRIPTS_DIR, prepost_script, "setup", item)
|
||||
|
||||
# 修正参数
|
||||
item = self.action.replace_dependent_parameter(item)
|
||||
url, query_str, request_data, headers, request_data_type = self.action.request_info(item)
|
||||
|
||||
# 分析请求参数信息
|
||||
headers, request_data = self.action.analysis_request(request_data, h_crypto, headers, r_crypto, ex_request_data)
|
||||
result = "PASS"
|
||||
|
||||
# 执行请求操作
|
||||
kwargs = {request_data_type: request_data, 'headers': headers, "params": query_str}
|
||||
self.action.send_request(host, url, method, teardown_script, **kwargs)
|
||||
|
||||
try:
|
||||
# 提取响应
|
||||
self.action.substitute_data(self.action.response_json, regex=regex, keys=keys, deps=deps, jp_dict=jp_dict)
|
||||
except Exception as err:
|
||||
self.action.logger.error(f"提取响应失败:{sheet}_{iid}_{name}_{desc}"
|
||||
f"\nregex={regex};"
|
||||
f" \nkeys={keys};"
|
||||
f"\ndeps={deps};"
|
||||
f"\njp_dict={jp_dict}"
|
||||
f"\n{err}")
|
||||
|
||||
# 修正断言
|
||||
expected = self.action.replace_dependent_parameter(expected)
|
||||
try:
|
||||
# print(f"期望结果--> {expected}")
|
||||
# 执行断言 返回结果元组
|
||||
self.action.run_validate(expected, self.action.response_json)
|
||||
except Exception as e:
|
||||
result = "FAIL"
|
||||
self.action.logger.error(f'异常用例: **{sheet}_{iid}_{name}_{desc}**\n{e}')
|
||||
raise e
|
||||
finally:
|
||||
print(f"断言结果-->", self.action.assertions)
|
||||
response = self.action.response.text if self.action.response is not None else str(self.action.response)
|
||||
# 响应结果及测试结果回写 excel
|
||||
excel.write_back(sheet_name=sheet, i=iid, response=response, test_result=result,
|
||||
assert_log=str(self.action.assertions))
|
||||
|
||||
@classmethod
|
||||
def tearDownClass(cls) -> None:
|
||||
excel.close_excel()
|
||||
cls.action.logger.info(f"所有用例执行完毕")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
unittest.main()
|
||||
|
|
Loading…
Reference in New Issue