From 45781e2a1809b2d6ef50e11e6665621731c991d8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=83=AD=E6=9E=97=E8=8E=89=20Linli=20G?= Date: Mon, 13 Feb 2023 12:33:27 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=E5=89=8D=E7=BD=AE=E5=A4=84?= =?UTF-8?q?=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- data/dm/fota/api_create_task.yaml | 17 ++- tools/cache_control.py | 15 +- tools/dependent_case.py | 166 +++++++++++++++++++++ tools/models.py | 238 ++++++++++++++++++++++++++++++ tools/mysql_control.py | 163 +++++++++++++------- 5 files changed, 539 insertions(+), 60 deletions(-) create mode 100644 tools/dependent_case.py create mode 100644 tools/models.py diff --git a/data/dm/fota/api_create_task.yaml b/data/dm/fota/api_create_task.yaml index 6a39b43..f57236e 100644 --- a/data/dm/fota/api_create_task.yaml +++ b/data/dm/fota/api_create_task.yaml @@ -19,7 +19,13 @@ create_task_01: is_run: data: vins: 73401134000000000 -# 请求类型:params 是以url拼接的形式请求,json则传的是json串 +# 依赖数据库的数据,先从数据库取值再存到缓存里面 + dependence_case_data: + - case_id: self + dependent_data: + - dependent_type: sqlData + jsonpath: $.username + replace_key: $.data.mobile assert: status: jsonpath: $.status @@ -32,7 +38,14 @@ create_task_01: type: == value: "success" AssertType: - # 断言接口返回的username + # 断言sql的时候,AssertType 的值需要填写成 SQL AssertType: SQL sql: + - select * from test_goods where shop_id = 515 + #前置处理,从数据库取值在请求接口 + setup_sql: + - SELECT * FROM test_obp_user.user_biz_info where user_id = '300000405' + #后置处理,从数据库删除 + teardown_sql: delete * from xxx - delete * from xxx + #后置处理,从业务逻辑闭环 teardown: diff --git a/tools/cache_control.py b/tools/cache_control.py index cf2f31e..ee7e64b 100644 --- a/tools/cache_control.py +++ b/tools/cache_control.py @@ -4,7 +4,7 @@ from config.settings import ConfigHandler import os from typing import Any - +from tools.exceptions import ValueNotFoundError class Cache: """ 设置、读取缓存 """ @@ -62,6 +62,19 @@ class Cache: os.remove(cache_path + "/" + i) +_cache_config = {} +class CacheHandler: + @staticmethod + def get_cache(cache_data): + try: + return _cache_config[cache_data] + except KeyError: + raise ValueError(f"{cache_data}的缓存数据未找到,请检查是否将该数据存入缓存中") + + @staticmethod + def update_cache(*, cache_name, value): + _cache_config[cache_name] = value + if __name__ == '__main__': a = Cache('ecu_collection_select_id').get_cache() diff --git a/tools/dependent_case.py b/tools/dependent_case.py new file mode 100644 index 0000000..dea59e4 --- /dev/null +++ b/tools/dependent_case.py @@ -0,0 +1,166 @@ +import ast +import json +from typing import Text, Dict, Union, List +from jsonpath import jsonpath +from tools.mysql_control import SetUpMySQL +from tools.regular_control import regular, cache_regular +from tools.log_control import WARNING +from tools.models import TestCase, DependentCaseData, DependentData +from tools.cache_control import CacheHandler +from config.configs import Config + + +class DependentCase: + """ 处理依赖相关的业务 """ + + def __init__(self, dependent_yaml_case: TestCase): + self.__yaml_case = dependent_yaml_case + + @classmethod + def get_cache(cls, case_id: Text) -> Dict: + """ + 获取缓存用例池中的数据,通过 case_id 提取 + :param case_id: + :return: case_id_01 + """ + _case_data = CacheHandler.get_cache(case_id) + return _case_data + + @classmethod + def jsonpath_data( + cls, + obj: Dict, + expr: Text) -> list: + """ + 通过jsonpath提取依赖的数据 + :param obj: 对象信息 + :param expr: jsonpath 方法 + :return: 提取到的内容值,返回是个数组 + + 对象: {"data": applyID} --> jsonpath提取方法: $.data.data.[0].applyId + """ + + _jsonpath_data = jsonpath(obj, expr) + # 判断是否正常提取到数据,如未提取到,则抛异常 + if _jsonpath_data is False: + raise ValueError( + f"jsonpath提取失败!\n 提取的数据: {obj} \n jsonpath规则: {expr}" + ) + return _jsonpath_data + + @classmethod + def set_cache_value(cls, dependent_data: "DependentData") -> Union[Text, None]: + """ + 获取依赖中是否需要将数据存入缓存中 + """ + try: + return dependent_data.set_cache + except KeyError: + return None + + @classmethod + def replace_key(cls, dependent_data: "DependentData"): + """ 获取需要替换的内容 """ + try: + _replace_key = dependent_data.replace_key + return _replace_key + except KeyError: + return None + + def url_replace( + self, + replace_key: Text, + jsonpath_dates: Dict, + jsonpath_data: list) -> None: + """ + url中的动态参数替换 + # 如: 一般有些接口的参数在url中,并且没有参数名称, /api/v1/work/spu/approval/spuApplyDetails/{id} + # 那么可以使用如下方式编写用例, 可以使用 $url_params{}替换, + # 如/api/v1/work/spu/approval/spuApplyDetails/$url_params{id} + :param jsonpath_data: jsonpath 解析出来的数据值 + :param replace_key: 用例中需要替换数据的 replace_key + :param jsonpath_dates: jsonpath 存放的数据值 + :return: + """ + + if "$url_param" in replace_key: + _url = self.__yaml_case.url.replace(replace_key, str(jsonpath_data[0])) + jsonpath_dates['$.url'] = _url + else: + jsonpath_dates[replace_key] = jsonpath_data[0] + + def _dependent_type_for_sql( + self, + setup_sql: List, + dependence_case_data: "DependentCaseData", + jsonpath_dates: Dict) -> None: + """ + 判断依赖类型为 sql,程序中的依赖参数从 数据库中提取数据 + @param setup_sql: 前置sql语句 + @param dependence_case_data: 依赖的数据 + @param jsonpath_dates: 依赖相关的用例数据 + @return: + """ + # 判断依赖数据类型,依赖 sql中的数据 + if setup_sql is not None: + if Config.mysql_db.switch: + setup_sql = ast.literal_eval(cache_regular(str(setup_sql))) + sql_data = SetUpMySQL().setup_sql_data(sql=setup_sql) + dependent_data = dependence_case_data.dependent_data + for i in dependent_data: + _jsonpath = i.jsonpath + jsonpath_data = self.jsonpath_data(obj=sql_data, expr=_jsonpath) + _set_value = self.set_cache_value(i) + _replace_key = self.replace_key(i) + if _set_value is not None: + CacheHandler.update_cache(cache_name=_set_value, value=jsonpath_data[0]) + # Cache(_set_value).set_caches(jsonpath_data[0]) + if _replace_key is not None: + jsonpath_dates[_replace_key] = jsonpath_data[0] + self.url_replace( + replace_key=_replace_key, + jsonpath_dates=jsonpath_dates, + jsonpath_data=jsonpath_data, + ) + else: + WARNING.logger.warning("检查到数据库开关为关闭状态,请确认配置") + + + def is_dependent(self) -> Union[Dict, bool]: + """ + 判断是否有数据依赖 + :return: + """ + + # 获取用例中的dependent_type值,判断该用例是否需要执行依赖 + _dependent_type = self.__yaml_case.dependence_case + # 获取依赖用例数据 + _dependence_case_dates = self.__yaml_case.dependence_case_data + _setup_sql = self.__yaml_case.setup_sql + # 判断是否有依赖 + if _dependent_type is True: + # 读取依赖相关的用例数据 + jsonpath_dates = {} + # 循环所有需要依赖的数据 + try: + for dependence_case_data in _dependence_case_dates: + _case_id = dependence_case_data.case_id + # 判断依赖数据为sql,case_id需要写成self,否则程序中无法获取case_id + if _case_id == 'self': + self._dependent_type_for_sql( + setup_sql=_setup_sql, + dependence_case_data=dependence_case_data, + jsonpath_dates=jsonpath_dates) + else: + raise ValueError( + "依赖的dependent_type不正确,只支持request、response、sql依赖\n" + ) + return jsonpath_dates + except TypeError as exc: + raise ValueError( + "dependence_case_data下的所有内容均不能为空!" + "请检查相关数据是否填写,如已填写,请检查缩进问题" + ) from exc + else: + return False + diff --git a/tools/models.py b/tools/models.py new file mode 100644 index 0000000..d8eb97b --- /dev/null +++ b/tools/models.py @@ -0,0 +1,238 @@ +import types +from enum import Enum, unique +from typing import Text, Dict, Callable, Union, Optional, List, Any +from dataclasses import dataclass +from pydantic import BaseModel, Field + + +class NotificationType(Enum): + """ 自动化通知方式 """ + DEFAULT = 0 + DING_TALK = 1 + WECHAT = 2 + EMAIL = 3 + FEI_SHU = 4 + + +@dataclass +class TestMetrics: + """ 用例执行数据 """ + passed: int + failed: int + broken: int + skipped: int + total: int + pass_rate: float + time: Text + + +class RequestType(Enum): + """ + request请求发送,请求参数的数据类型 + """ + JSON = "JSON" + PARAMS = "PARAMS" + DATA = "DATA" + FILE = 'FILE' + EXPORT = "EXPORT" + NONE = "NONE" + + +def load_module_functions(module) -> Dict[Text, Callable]: + """ 获取 module中方法的名称和所在的内存地址 """ + module_functions = {} + + for name, item in vars(module).items(): + if isinstance(item, types.FunctionType): + module_functions[name] = item + return module_functions + + +@unique +class DependentType(Enum): + """ + 数据依赖相关枚举 + """ + RESPONSE = 'response' + REQUEST = 'request' + SQL_DATA = 'sqlData' + CACHE = "cache" + + +class Assert(BaseModel): + jsonpath: Text + type: Text + value: Any + AssertType: Union[None, Text] = None + + +class DependentData(BaseModel): + dependent_type: Text + jsonpath: Text + set_cache: Optional[Text] + replace_key: Optional[Text] + + +class DependentCaseData(BaseModel): + case_id: Text + # dependent_data: List[DependentData] + dependent_data: Union[None, List[DependentData]] = None + + +class ParamPrepare(BaseModel): + dependent_type: Text + jsonpath: Text + set_cache: Text + + +class SendRequest(BaseModel): + dependent_type: Text + jsonpath: Optional[Text] + cache_data: Optional[Text] + set_cache: Optional[Text] + replace_key: Optional[Text] + + +class TearDown(BaseModel): + case_id: Text + param_prepare: Optional[List["ParamPrepare"]] + send_request: Optional[List["SendRequest"]] + + +class CurrentRequestSetCache(BaseModel): + type: Text + jsonpath: Text + name: Text + + +class TestCase(BaseModel): + url: Text + method: Text + detail: Text + # assert_data: Union[Dict, Text] = Field(..., alias="assert") + assert_data: Union[Dict, Text] + headers: Union[None, Dict, Text] = {} + requestType: Text + is_run: Union[None, bool, Text] = None + data: Any = None + dependence_case: Union[None, bool] = False + dependence_case_data: Optional[Union[None, List["DependentCaseData"], Text]] = None + sql: List = None + setup_sql: List = None + status_code: Optional[int] = None + teardown_sql: Optional[List] = None + teardown: Union[List["TearDown"], None] = None + current_request_set_cache: Optional[List["CurrentRequestSetCache"]] + sleep: Optional[Union[int, float]] + + +class ResponseData(BaseModel): + url: Text + is_run: Union[None, bool, Text] + detail: Text + response_data: Text + request_body: Any + method: Text + sql_data: Dict + yaml_data: "TestCase" + headers: Dict + cookie: Dict + assert_data: Dict + res_time: Union[int, float] + status_code: int + teardown: List["TearDown"] = None + teardown_sql: Union[None, List] + body: Any + + +class DingTalk(BaseModel): + webhook: Union[Text, None] + secret: Union[Text, None] + + +class MySqlDB(BaseModel): + switch: bool = False + host: Union[Text, None] = None + user: Union[Text, None] = None + password: Union[Text, None] = None + port: Union[int, None] = 3306 + + +class Webhook(BaseModel): + webhook: Union[Text, None] + + +class Email(BaseModel): + send_user: Union[Text, None] + email_host: Union[Text, None] + stamp_key: Union[Text, None] + # 收件人 + send_list: Union[Text, None] + + +class Config(BaseModel): + project_name: Text + env: Text + tester_name: Text + notification_type: int = 0 + excel_report: bool + ding_talk: "DingTalk" + mysql_db: "MySqlDB" + mirror_source: Text + wechat: "Webhook" + email: "Email" + lark: "Webhook" + real_time_update_test_cases: bool = False + host: Text + app_host: Union[Text, None] + + +@unique +class AllureAttachmentType(Enum): + """ + allure 报告的文件类型枚举 + """ + TEXT = "txt" + CSV = "csv" + TSV = "tsv" + URI_LIST = "uri" + + HTML = "html" + XML = "xml" + JSON = "json" + YAML = "yaml" + PCAP = "pcap" + + PNG = "png" + JPG = "jpg" + SVG = "svg" + GIF = "gif" + BMP = "bmp" + TIFF = "tiff" + + MP4 = "mp4" + OGG = "ogg" + WEBM = "webm" + + PDF = "pdf" + + +@unique +class AssertMethod(Enum): + """断言类型""" + equals = "==" + less_than = "lt" + less_than_or_equals = "le" + greater_than = "gt" + greater_than_or_equals = "ge" + not_equals = "not_eq" + string_equals = "str_eq" + length_equals = "len_eq" + length_greater_than = "len_gt" + length_greater_than_or_equals = 'len_ge' + length_less_than = "len_lt" + length_less_than_or_equals = 'len_le' + contains = "contains" + contained_by = 'contained_by' + startswith = 'startswith' + endswith = 'endswith' \ No newline at end of file diff --git a/tools/mysql_control.py b/tools/mysql_control.py index 0b2a75a..9c80258 100644 --- a/tools/mysql_control.py +++ b/tools/mysql_control.py @@ -1,41 +1,41 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - - -import pymysql +""" +mysql 封装,支持 增、删、改、查 +""" +import ast +import datetime +import decimal from warnings import filterwarnings -from tools.yaml_control import GetYamlData -from tools.log_control import ERROR -from tools.yaml_control import GetCaseData -from config.settings import ConfigHandler -from tools.regular_control import sql_regular +import pymysql +from typing import List, Union, Text, Dict +from utils import config +from utils.logging_tool.log_control import ERROR +from utils.read_files_tools.regular_control import sql_regular +from utils.read_files_tools.regular_control import cache_regular +from utils.other_tools.exceptions import DataAcquisitionFailed, ValueTypeError # 忽略 Mysql 告警信息 filterwarnings("ignore", category=pymysql.Warning) -switch = GetCaseData(ConfigHandler.config_path).get_yaml_data()['MySqlDB']['switch'] - -class MysqlDB(object): - if switch: +class MysqlDB: + """ mysql 封装 """ + if config.mysql_db.switch: def __init__(self): - self.config = GetYamlData(ConfigHandler.config_path) - self.read_mysql_config = self.config.get_yaml_data()['MySqlDB'] try: # 建立数据库连接 self.conn = pymysql.connect( - host=self.read_mysql_config['host'], - user=self.read_mysql_config['user'], - password=self.read_mysql_config['password'], - db=self.read_mysql_config['db'] + host=config.mysql_db.host, + user=config.mysql_db.user, + password=config.mysql_db.password, + port=config.mysql_db.port ) # 使用 cursor 方法获取操作游标,得到一个可以执行sql语句,并且操作结果为字典返回的游标 self.cur = self.conn.cursor(cursor=pymysql.cursors.DictCursor) - except Exception as e: - ERROR.logger.error("数据库连接失败,失败原因{0}".format(e)) + except AttributeError as error: + ERROR.logger.error("数据库连接失败,失败原因 %s", error) def __del__(self): try: @@ -43,8 +43,8 @@ class MysqlDB(object): self.cur.close() # 关闭连接 self.conn.close() - except Exception as e: - ERROR.logger.error("数据库连接失败,失败原因{0}".format(e)) + except AttributeError as error: + ERROR.logger.error("数据库连接失败,失败原因 %s", error) def query(self, sql, state="all"): """ @@ -59,16 +59,15 @@ class MysqlDB(object): if state == "all": # 查询全部 data = self.cur.fetchall() - else: # 查询单条 data = self.cur.fetchone() - return data - except Exception as e: - ERROR.logger.error("数据库连接失败,失败原因{0}".format(e)) + except AttributeError as error_data: + ERROR.logger.error("数据库连接失败,失败原因 %s", error_data) + raise - def execute(self, sql): + def execute(self, sql: Text): """ 更新 、 删除、 新增 :param sql: @@ -80,43 +79,93 @@ class MysqlDB(object): # 提交事务 self.conn.commit() return rows - except Exception as e: - ERROR.logger.error("数据库连接失败,失败原因{0}".format(e)) + except AttributeError as error: + ERROR.logger.error("数据库连接失败,失败原因 %s", error) # 如果事务异常,则回滚数据 self.conn.rollback() + raise - def assert_execution(self, sql: list, resp) -> dict: + @classmethod + def sql_data_handler(cls, query_data, data): """ - 执行 sql, 负责处理 yaml 文件中的断言需要执行多条 sql 的场景,最终会将所有数据以对象形式返回 - :param resp: 接口响应数据 - :param sql: sql - :return: - """ - try: - if isinstance(sql, list): + 处理部分类型sql查询出来的数据格式 + @param query_data: 查询出来的sql数据 + @param data: 数据池 + @return: + """ + # 将sql 返回的所有内容全部放入对象中 + for key, value in query_data.items(): + if isinstance(value, decimal.Decimal): + data[key] = float(value) + elif isinstance(value, datetime.datetime): + data[key] = str(value) + else: + data[key] = value + return data - data = {} - if 'UPDATE' and 'update' and 'DELETE' and 'delete' and 'INSERT' and 'insert' in sql: - raise ValueError("断言的 sql 必须是查询的 sql") + +class SetUpMySQL(MysqlDB): + """ 处理前置sql """ + + def setup_sql_data(self, sql: Union[List, None]) -> Dict: + """ + 处理前置请求sql + :param sql: + :return: + """ + sql = ast.literal_eval(cache_regular(str(sql))) + try: + data = {} + if sql is not None: + for i in sql: + # 判断断言类型为查询类型的时候, + if i[0:6].upper() == 'SELECT': + sql_date = self.query(sql=i)[0] + for key, value in sql_date.items(): + data[key] = value else: - for i in sql: - # 判断sql中是否有正则,如果有则通过jsonpath提取相关的数据 - sql = sql_regular(i, resp) + self.execute(sql=i) + return data + except IndexError as exc: + raise DataAcquisitionFailed("sql 数据查询失败,请检查setup_sql语句是否正确") from exc + + +class AssertExecution(MysqlDB): + """ 处理断言sql数据 """ + + def assert_execution(self, sql: list, resp) -> dict: + """ + 执行 sql, 负责处理 yaml 文件中的断言需要执行多条 sql 的场景,最终会将所有数据以对象形式返回 + :param resp: 接口响应数据 + :param sql: sql + :return: + """ + try: + if isinstance(sql, list): + + data = {} + _sql_type = ['UPDATE', 'update', 'DELETE', 'delete', 'INSERT', 'insert'] + if any(i in sql for i in _sql_type) is False: + for i in sql: + # 判断sql中是否有正则,如果有则通过jsonpath提取相关的数据 + sql = sql_regular(i, resp) + if sql is not None: # for 循环逐条处理断言 sql query_data = self.query(sql)[0] - # 将sql 返回的所有内容全部放入对象中 - for key, value in query_data.items(): - data[key] = value - - return data + data = self.sql_data_handler(query_data, data) + else: + raise DataAcquisitionFailed(f"该条sql未查询出任何数据, {sql}") else: - raise ValueError("断言的查询sql需要是list类型") - except Exception as e: - ERROR.logger.error("数据库连接失败,失败原因{0}".format(e)) - raise + raise DataAcquisitionFailed("断言的 sql 必须是查询的 sql") + else: + raise ValueTypeError("sql数据类型不正确,接受的是list") + return data + except Exception as error_data: + ERROR.logger.error("数据库连接失败,失败原因 %s", error_data) + raise error_data if __name__ == '__main__': - mysql_db = MysqlDB() - a = mysql_db.assert_execution(sql=[""], resp={"code": 237, "value": 1}) - print(a) + a = MysqlDB() + b = a.query(sql="select * from `test_obp_configure`.lottery_prize where activity_id = 3") + print(b) \ No newline at end of file