新增前置处理

This commit is contained in:
郭林莉 Linli G 2023-02-13 12:33:27 +08:00
parent d196d72e90
commit 45781e2a18
5 changed files with 539 additions and 60 deletions

View File

@ -19,7 +19,13 @@ create_task_01:
is_run:
data:
vins: 73401134000000000
# 请求类型params 是以url拼接的形式请求json则传的是json串
# 依赖数据库的数据,先从数据库取值再存到缓存里面
dependence_case_data:
- case_id: self
dependent_data:
- dependent_type: sqlData
jsonpath: $.username
replace_key: $.data.mobile
assert:
status:
jsonpath: $.status
@ -32,7 +38,14 @@ create_task_01:
type: ==
value: "success"
AssertType:
# 断言接口返回的username
# 断言sql的时候AssertType 的值需要填写成 SQL AssertType: SQL
sql:
- select * from test_goods where shop_id = 515
#前置处理,从数据库取值在请求接口
setup_sql:
- SELECT * FROM test_obp_user.user_biz_info where user_id = '300000405'
#后置处理,从数据库删除
teardown_sql: delete * from xxx - delete * from xxx
#后置处理,从业务逻辑闭环
teardown:

View File

@ -4,7 +4,7 @@
from config.settings import ConfigHandler
import os
from typing import Any
from tools.exceptions import ValueNotFoundError
class Cache:
""" 设置、读取缓存 """
@ -62,6 +62,19 @@ class Cache:
os.remove(cache_path + "/" + i)
_cache_config = {}
class CacheHandler:
@staticmethod
def get_cache(cache_data):
try:
return _cache_config[cache_data]
except KeyError:
raise ValueError(f"{cache_data}的缓存数据未找到,请检查是否将该数据存入缓存中")
@staticmethod
def update_cache(*, cache_name, value):
_cache_config[cache_name] = value
if __name__ == '__main__':
a = Cache('ecu_collection_select_id').get_cache()

166
tools/dependent_case.py Normal file
View File

@ -0,0 +1,166 @@
import ast
import json
from typing import Text, Dict, Union, List
from jsonpath import jsonpath
from tools.mysql_control import SetUpMySQL
from tools.regular_control import regular, cache_regular
from tools.log_control import WARNING
from tools.models import TestCase, DependentCaseData, DependentData
from tools.cache_control import CacheHandler
from config.configs import Config
class DependentCase:
""" 处理依赖相关的业务 """
def __init__(self, dependent_yaml_case: TestCase):
self.__yaml_case = dependent_yaml_case
@classmethod
def get_cache(cls, case_id: Text) -> Dict:
"""
获取缓存用例池中的数据通过 case_id 提取
:param case_id:
:return: case_id_01
"""
_case_data = CacheHandler.get_cache(case_id)
return _case_data
@classmethod
def jsonpath_data(
cls,
obj: Dict,
expr: Text) -> list:
"""
通过jsonpath提取依赖的数据
:param obj: 对象信息
:param expr: jsonpath 方法
:return: 提取到的内容值,返回是个数组
对象: {"data": applyID} --> jsonpath提取方法: $.data.data.[0].applyId
"""
_jsonpath_data = jsonpath(obj, expr)
# 判断是否正常提取到数据,如未提取到,则抛异常
if _jsonpath_data is False:
raise ValueError(
f"jsonpath提取失败\n 提取的数据: {obj} \n jsonpath规则: {expr}"
)
return _jsonpath_data
@classmethod
def set_cache_value(cls, dependent_data: "DependentData") -> Union[Text, None]:
"""
获取依赖中是否需要将数据存入缓存中
"""
try:
return dependent_data.set_cache
except KeyError:
return None
@classmethod
def replace_key(cls, dependent_data: "DependentData"):
""" 获取需要替换的内容 """
try:
_replace_key = dependent_data.replace_key
return _replace_key
except KeyError:
return None
def url_replace(
self,
replace_key: Text,
jsonpath_dates: Dict,
jsonpath_data: list) -> None:
"""
url中的动态参数替换
# 如: 一般有些接口的参数在url中,并且没有参数名称, /api/v1/work/spu/approval/spuApplyDetails/{id}
# 那么可以使用如下方式编写用例, 可以使用 $url_params{}替换,
# 如/api/v1/work/spu/approval/spuApplyDetails/$url_params{id}
:param jsonpath_data: jsonpath 解析出来的数据值
:param replace_key: 用例中需要替换数据的 replace_key
:param jsonpath_dates: jsonpath 存放的数据值
:return:
"""
if "$url_param" in replace_key:
_url = self.__yaml_case.url.replace(replace_key, str(jsonpath_data[0]))
jsonpath_dates['$.url'] = _url
else:
jsonpath_dates[replace_key] = jsonpath_data[0]
def _dependent_type_for_sql(
self,
setup_sql: List,
dependence_case_data: "DependentCaseData",
jsonpath_dates: Dict) -> None:
"""
判断依赖类型为 sql程序中的依赖参数从 数据库中提取数据
@param setup_sql: 前置sql语句
@param dependence_case_data: 依赖的数据
@param jsonpath_dates: 依赖相关的用例数据
@return:
"""
# 判断依赖数据类型,依赖 sql中的数据
if setup_sql is not None:
if Config.mysql_db.switch:
setup_sql = ast.literal_eval(cache_regular(str(setup_sql)))
sql_data = SetUpMySQL().setup_sql_data(sql=setup_sql)
dependent_data = dependence_case_data.dependent_data
for i in dependent_data:
_jsonpath = i.jsonpath
jsonpath_data = self.jsonpath_data(obj=sql_data, expr=_jsonpath)
_set_value = self.set_cache_value(i)
_replace_key = self.replace_key(i)
if _set_value is not None:
CacheHandler.update_cache(cache_name=_set_value, value=jsonpath_data[0])
# Cache(_set_value).set_caches(jsonpath_data[0])
if _replace_key is not None:
jsonpath_dates[_replace_key] = jsonpath_data[0]
self.url_replace(
replace_key=_replace_key,
jsonpath_dates=jsonpath_dates,
jsonpath_data=jsonpath_data,
)
else:
WARNING.logger.warning("检查到数据库开关为关闭状态,请确认配置")
def is_dependent(self) -> Union[Dict, bool]:
"""
判断是否有数据依赖
:return:
"""
# 获取用例中的dependent_type值判断该用例是否需要执行依赖
_dependent_type = self.__yaml_case.dependence_case
# 获取依赖用例数据
_dependence_case_dates = self.__yaml_case.dependence_case_data
_setup_sql = self.__yaml_case.setup_sql
# 判断是否有依赖
if _dependent_type is True:
# 读取依赖相关的用例数据
jsonpath_dates = {}
# 循环所有需要依赖的数据
try:
for dependence_case_data in _dependence_case_dates:
_case_id = dependence_case_data.case_id
# 判断依赖数据为sqlcase_id需要写成self否则程序中无法获取case_id
if _case_id == 'self':
self._dependent_type_for_sql(
setup_sql=_setup_sql,
dependence_case_data=dependence_case_data,
jsonpath_dates=jsonpath_dates)
else:
raise ValueError(
"依赖的dependent_type不正确只支持request、response、sql依赖\n"
)
return jsonpath_dates
except TypeError as exc:
raise ValueError(
"dependence_case_data下的所有内容均不能为空"
"请检查相关数据是否填写,如已填写,请检查缩进问题"
) from exc
else:
return False

238
tools/models.py Normal file
View File

@ -0,0 +1,238 @@
import types
from enum import Enum, unique
from typing import Text, Dict, Callable, Union, Optional, List, Any
from dataclasses import dataclass
from pydantic import BaseModel, Field
class NotificationType(Enum):
""" 自动化通知方式 """
DEFAULT = 0
DING_TALK = 1
WECHAT = 2
EMAIL = 3
FEI_SHU = 4
@dataclass
class TestMetrics:
""" 用例执行数据 """
passed: int
failed: int
broken: int
skipped: int
total: int
pass_rate: float
time: Text
class RequestType(Enum):
"""
request请求发送请求参数的数据类型
"""
JSON = "JSON"
PARAMS = "PARAMS"
DATA = "DATA"
FILE = 'FILE'
EXPORT = "EXPORT"
NONE = "NONE"
def load_module_functions(module) -> Dict[Text, Callable]:
""" 获取 module中方法的名称和所在的内存地址 """
module_functions = {}
for name, item in vars(module).items():
if isinstance(item, types.FunctionType):
module_functions[name] = item
return module_functions
@unique
class DependentType(Enum):
"""
数据依赖相关枚举
"""
RESPONSE = 'response'
REQUEST = 'request'
SQL_DATA = 'sqlData'
CACHE = "cache"
class Assert(BaseModel):
jsonpath: Text
type: Text
value: Any
AssertType: Union[None, Text] = None
class DependentData(BaseModel):
dependent_type: Text
jsonpath: Text
set_cache: Optional[Text]
replace_key: Optional[Text]
class DependentCaseData(BaseModel):
case_id: Text
# dependent_data: List[DependentData]
dependent_data: Union[None, List[DependentData]] = None
class ParamPrepare(BaseModel):
dependent_type: Text
jsonpath: Text
set_cache: Text
class SendRequest(BaseModel):
dependent_type: Text
jsonpath: Optional[Text]
cache_data: Optional[Text]
set_cache: Optional[Text]
replace_key: Optional[Text]
class TearDown(BaseModel):
case_id: Text
param_prepare: Optional[List["ParamPrepare"]]
send_request: Optional[List["SendRequest"]]
class CurrentRequestSetCache(BaseModel):
type: Text
jsonpath: Text
name: Text
class TestCase(BaseModel):
url: Text
method: Text
detail: Text
# assert_data: Union[Dict, Text] = Field(..., alias="assert")
assert_data: Union[Dict, Text]
headers: Union[None, Dict, Text] = {}
requestType: Text
is_run: Union[None, bool, Text] = None
data: Any = None
dependence_case: Union[None, bool] = False
dependence_case_data: Optional[Union[None, List["DependentCaseData"], Text]] = None
sql: List = None
setup_sql: List = None
status_code: Optional[int] = None
teardown_sql: Optional[List] = None
teardown: Union[List["TearDown"], None] = None
current_request_set_cache: Optional[List["CurrentRequestSetCache"]]
sleep: Optional[Union[int, float]]
class ResponseData(BaseModel):
url: Text
is_run: Union[None, bool, Text]
detail: Text
response_data: Text
request_body: Any
method: Text
sql_data: Dict
yaml_data: "TestCase"
headers: Dict
cookie: Dict
assert_data: Dict
res_time: Union[int, float]
status_code: int
teardown: List["TearDown"] = None
teardown_sql: Union[None, List]
body: Any
class DingTalk(BaseModel):
webhook: Union[Text, None]
secret: Union[Text, None]
class MySqlDB(BaseModel):
switch: bool = False
host: Union[Text, None] = None
user: Union[Text, None] = None
password: Union[Text, None] = None
port: Union[int, None] = 3306
class Webhook(BaseModel):
webhook: Union[Text, None]
class Email(BaseModel):
send_user: Union[Text, None]
email_host: Union[Text, None]
stamp_key: Union[Text, None]
# 收件人
send_list: Union[Text, None]
class Config(BaseModel):
project_name: Text
env: Text
tester_name: Text
notification_type: int = 0
excel_report: bool
ding_talk: "DingTalk"
mysql_db: "MySqlDB"
mirror_source: Text
wechat: "Webhook"
email: "Email"
lark: "Webhook"
real_time_update_test_cases: bool = False
host: Text
app_host: Union[Text, None]
@unique
class AllureAttachmentType(Enum):
"""
allure 报告的文件类型枚举
"""
TEXT = "txt"
CSV = "csv"
TSV = "tsv"
URI_LIST = "uri"
HTML = "html"
XML = "xml"
JSON = "json"
YAML = "yaml"
PCAP = "pcap"
PNG = "png"
JPG = "jpg"
SVG = "svg"
GIF = "gif"
BMP = "bmp"
TIFF = "tiff"
MP4 = "mp4"
OGG = "ogg"
WEBM = "webm"
PDF = "pdf"
@unique
class AssertMethod(Enum):
"""断言类型"""
equals = "=="
less_than = "lt"
less_than_or_equals = "le"
greater_than = "gt"
greater_than_or_equals = "ge"
not_equals = "not_eq"
string_equals = "str_eq"
length_equals = "len_eq"
length_greater_than = "len_gt"
length_greater_than_or_equals = 'len_ge'
length_less_than = "len_lt"
length_less_than_or_equals = 'len_le'
contains = "contains"
contained_by = 'contained_by'
startswith = 'startswith'
endswith = 'endswith'

View File

@ -1,41 +1,41 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import pymysql
"""
mysql 封装支持
"""
import ast
import datetime
import decimal
from warnings import filterwarnings
from tools.yaml_control import GetYamlData
from tools.log_control import ERROR
from tools.yaml_control import GetCaseData
from config.settings import ConfigHandler
from tools.regular_control import sql_regular
import pymysql
from typing import List, Union, Text, Dict
from utils import config
from utils.logging_tool.log_control import ERROR
from utils.read_files_tools.regular_control import sql_regular
from utils.read_files_tools.regular_control import cache_regular
from utils.other_tools.exceptions import DataAcquisitionFailed, ValueTypeError
# 忽略 Mysql 告警信息
filterwarnings("ignore", category=pymysql.Warning)
switch = GetCaseData(ConfigHandler.config_path).get_yaml_data()['MySqlDB']['switch']
class MysqlDB(object):
if switch:
class MysqlDB:
""" mysql 封装 """
if config.mysql_db.switch:
def __init__(self):
self.config = GetYamlData(ConfigHandler.config_path)
self.read_mysql_config = self.config.get_yaml_data()['MySqlDB']
try:
# 建立数据库连接
self.conn = pymysql.connect(
host=self.read_mysql_config['host'],
user=self.read_mysql_config['user'],
password=self.read_mysql_config['password'],
db=self.read_mysql_config['db']
host=config.mysql_db.host,
user=config.mysql_db.user,
password=config.mysql_db.password,
port=config.mysql_db.port
)
# 使用 cursor 方法获取操作游标得到一个可以执行sql语句并且操作结果为字典返回的游标
self.cur = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
except Exception as e:
ERROR.logger.error("数据库连接失败,失败原因{0}".format(e))
except AttributeError as error:
ERROR.logger.error("数据库连接失败,失败原因 %s", error)
def __del__(self):
try:
@ -43,8 +43,8 @@ class MysqlDB(object):
self.cur.close()
# 关闭连接
self.conn.close()
except Exception as e:
ERROR.logger.error("数据库连接失败,失败原因{0}".format(e))
except AttributeError as error:
ERROR.logger.error("数据库连接失败,失败原因 %s", error)
def query(self, sql, state="all"):
"""
@ -59,16 +59,15 @@ class MysqlDB(object):
if state == "all":
# 查询全部
data = self.cur.fetchall()
else:
# 查询单条
data = self.cur.fetchone()
return data
except Exception as e:
ERROR.logger.error("数据库连接失败,失败原因{0}".format(e))
except AttributeError as error_data:
ERROR.logger.error("数据库连接失败,失败原因 %s", error_data)
raise
def execute(self, sql):
def execute(self, sql: Text):
"""
更新 删除 新增
:param sql:
@ -80,43 +79,93 @@ class MysqlDB(object):
# 提交事务
self.conn.commit()
return rows
except Exception as e:
ERROR.logger.error("数据库连接失败,失败原因{0}".format(e))
except AttributeError as error:
ERROR.logger.error("数据库连接失败,失败原因 %s", error)
# 如果事务异常,则回滚数据
self.conn.rollback()
raise
def assert_execution(self, sql: list, resp) -> dict:
@classmethod
def sql_data_handler(cls, query_data, data):
"""
执行 sql, 负责处理 yaml 文件中的断言需要执行多条 sql 的场景最终会将所有数据以对象形式返回
:param resp: 接口响应数据
:param sql: sql
:return:
"""
try:
if isinstance(sql, list):
处理部分类型sql查询出来的数据格式
@param query_data: 查询出来的sql数据
@param data: 数据池
@return:
"""
# 将sql 返回的所有内容全部放入对象中
for key, value in query_data.items():
if isinstance(value, decimal.Decimal):
data[key] = float(value)
elif isinstance(value, datetime.datetime):
data[key] = str(value)
else:
data[key] = value
return data
data = {}
if 'UPDATE' and 'update' and 'DELETE' and 'delete' and 'INSERT' and 'insert' in sql:
raise ValueError("断言的 sql 必须是查询的 sql")
class SetUpMySQL(MysqlDB):
""" 处理前置sql """
def setup_sql_data(self, sql: Union[List, None]) -> Dict:
"""
处理前置请求sql
:param sql:
:return:
"""
sql = ast.literal_eval(cache_regular(str(sql)))
try:
data = {}
if sql is not None:
for i in sql:
# 判断断言类型为查询类型的时候,
if i[0:6].upper() == 'SELECT':
sql_date = self.query(sql=i)[0]
for key, value in sql_date.items():
data[key] = value
else:
for i in sql:
# 判断sql中是否有正则如果有则通过jsonpath提取相关的数据
sql = sql_regular(i, resp)
self.execute(sql=i)
return data
except IndexError as exc:
raise DataAcquisitionFailed("sql 数据查询失败请检查setup_sql语句是否正确") from exc
class AssertExecution(MysqlDB):
""" 处理断言sql数据 """
def assert_execution(self, sql: list, resp) -> dict:
"""
执行 sql, 负责处理 yaml 文件中的断言需要执行多条 sql 的场景最终会将所有数据以对象形式返回
:param resp: 接口响应数据
:param sql: sql
:return:
"""
try:
if isinstance(sql, list):
data = {}
_sql_type = ['UPDATE', 'update', 'DELETE', 'delete', 'INSERT', 'insert']
if any(i in sql for i in _sql_type) is False:
for i in sql:
# 判断sql中是否有正则如果有则通过jsonpath提取相关的数据
sql = sql_regular(i, resp)
if sql is not None:
# for 循环逐条处理断言 sql
query_data = self.query(sql)[0]
# 将sql 返回的所有内容全部放入对象中
for key, value in query_data.items():
data[key] = value
return data
data = self.sql_data_handler(query_data, data)
else:
raise DataAcquisitionFailed(f"该条sql未查询出任何数据, {sql}")
else:
raise ValueError("断言的查询sql需要是list类型")
except Exception as e:
ERROR.logger.error("数据库连接失败,失败原因{0}".format(e))
raise
raise DataAcquisitionFailed("断言的 sql 必须是查询的 sql")
else:
raise ValueTypeError("sql数据类型不正确接受的是list")
return data
except Exception as error_data:
ERROR.logger.error("数据库连接失败,失败原因 %s", error_data)
raise error_data
if __name__ == '__main__':
mysql_db = MysqlDB()
a = mysql_db.assert_execution(sql=[""], resp={"code": 237, "value": 1})
print(a)
a = MysqlDB()
b = a.query(sql="select * from `test_obp_configure`.lottery_prize where activity_id = 3")
print(b)