debug requests中的hooks狗子函数

This commit is contained in:
aaronchenyongzhi 2023-05-15 00:00:04 +08:00
parent c33ed460bb
commit 8fde38a04d
82 changed files with 729 additions and 398 deletions

View File

@ -8,7 +8,7 @@
<orderEntry type="sourceFolder" forTests="false" /> <orderEntry type="sourceFolder" forTests="false" />
</component> </component>
<component name="PackageRequirementsSettings"> <component name="PackageRequirementsSettings">
<option name="requirementsPath" value="" /> <option name="requirementsPath" value="D:\python3.9\python.exe" />
</component> </component>
<component name="PyDocumentationSettings"> <component name="PyDocumentationSettings">
<option name="format" value="GOOGLE" /> <option name="format" value="GOOGLE" />

View File

@ -1,20 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?> <?xml version="1.0" encoding="UTF-8"?>
<project version="4"> <project version="4">
<component name="dataSourceStorageLocal" created-in="PY-231.8770.66"> <component name="dataSourceStorageLocal" created-in="PY-231.8109.197">
<data-source name="MySQL - @10.8.203.25" uuid="dd327101-360a-4ce3-a39e-0a30a298d5cf">
<database-info product="MySQL" version="8.0.22" jdbc-version="4.2" driver-name="MySQL Connector/J" driver-version="mysql-connector-java-8.0.21 (Revision: 33f65445a1bcc544eb0120491926484da168f199)" dbms="MYSQL" exact-version="8.0.22" exact-driver-version="8.0">
<extra-name-characters>#@</extra-name-characters>
<identifier-quote-string>`</identifier-quote-string>
</database-info>
<case-sensitivity plain-identifiers="lower" quoted-identifiers="lower" />
<secret-storage>master_key</secret-storage>
<user-name>root</user-name>
<schema-mapping>
<introspection-scope>
<node kind="schema" negative="1" />
</introspection-scope>
</schema-mapping>
</data-source>
<data-source name="@localhost" uuid="49b6f686-3676-4df5-9645-cd7a2fe91d80"> <data-source name="@localhost" uuid="49b6f686-3676-4df5-9645-cd7a2fe91d80">
<database-info product="MySQL" version="8.0.26" jdbc-version="4.2" driver-name="MySQL Connector/J" driver-version="mysql-connector-java-8.0.25 (Revision: 08be9e9b4cba6aa115f9b27b215887af40b159e0)" dbms="MYSQL" exact-version="8.0.26" exact-driver-version="8.0"> <database-info product="MySQL" version="8.0.26" jdbc-version="4.2" driver-name="MySQL Connector/J" driver-version="mysql-connector-java-8.0.25 (Revision: 08be9e9b4cba6aa115f9b27b215887af40b159e0)" dbms="MYSQL" exact-version="8.0.26" exact-driver-version="8.0">
<extra-name-characters>#@</extra-name-characters> <extra-name-characters>#@</extra-name-characters>
@ -23,11 +9,7 @@
<case-sensitivity plain-identifiers="lower" quoted-identifiers="lower" /> <case-sensitivity plain-identifiers="lower" quoted-identifiers="lower" />
<secret-storage>master_key</secret-storage> <secret-storage>master_key</secret-storage>
<user-name>root</user-name> <user-name>root</user-name>
<schema-mapping> <schema-mapping />
<introspection-scope>
<node kind="schema" negative="1" />
</introspection-scope>
</schema-mapping>
</data-source> </data-source>
</component> </component>
</project> </project>

View File

@ -1,12 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?> <?xml version="1.0" encoding="UTF-8"?>
<project version="4"> <project version="4">
<component name="DataSourceManagerImpl" format="xml" multifile-model="true"> <component name="DataSourceManagerImpl" format="xml" multifile-model="true">
<data-source source="LOCAL" name="MySQL - @10.8.203.25" uuid="dd327101-360a-4ce3-a39e-0a30a298d5cf">
<driver-ref>mysql.8</driver-ref>
<synchronize>true</synchronize>
<jdbc-driver>com.mysql.cj.jdbc.Driver</jdbc-driver>
<jdbc-url>jdbc:mysql://10.8.203.25:3306</jdbc-url>
</data-source>
<data-source source="LOCAL" name="@localhost" uuid="49b6f686-3676-4df5-9645-cd7a2fe91d80"> <data-source source="LOCAL" name="@localhost" uuid="49b6f686-3676-4df5-9645-cd7a2fe91d80">
<driver-ref>mysql.8</driver-ref> <driver-ref>mysql.8</driver-ref>
<synchronize>true</synchronize> <synchronize>true</synchronize>

View File

@ -18,11 +18,11 @@
<file url="file://$PROJECT_DIR$/temp/sjk.py" charset="US-ASCII" /> <file url="file://$PROJECT_DIR$/temp/sjk.py" charset="US-ASCII" />
<file url="file://$PROJECT_DIR$/test_d/day_four.txt" charset="GBK" /> <file url="file://$PROJECT_DIR$/test_d/day_four.txt" charset="GBK" />
<file url="file://$PROJECT_DIR$/test_d/t" charset="UTF-8" /> <file url="file://$PROJECT_DIR$/test_d/t" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/test_script/auto_script/login.py" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/test_script/bgy/AI_comparison_data.py" charset="UTF-8" /> <file url="file://$PROJECT_DIR$/test_script/bgy/AI_comparison_data.py" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/test_script/bgy/AI_data_comparison.py" charset="UTF-8" /> <file url="file://$PROJECT_DIR$/test_script/bgy/AI_data_comparison.py" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/test_script/bgy/__init__.py" charset="US-ASCII" /> <file url="file://$PROJECT_DIR$/test_script/bgy/__init__.py" charset="US-ASCII" />
<file url="file://$PROJECT_DIR$/test_script/bgy/get_security_check_type_info.py" charset="UTF-8" /> <file url="file://$PROJECT_DIR$/test_script/bgy/get_security_check_type_info.py" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/test_script/bgy/run_sql.py" charset="UTF-8" /> <file url="file://$PROJECT_DIR$/test_script/bgy/run_sql.py" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/test_script/test_script/login.py" charset="UTF-8" />
</component> </component>
</project> </project>

View File

@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?> <?xml version="1.0" encoding="UTF-8"?>
<project version="4"> <project version="4">
<component name="ProjectRootManager" version="2" project-jdk-name="Pipenv (api-test-project)" project-jdk-type="Python SDK" /> <component name="ProjectRootManager" version="2" project-jdk-name="Pipenv (apitest)" project-jdk-type="Python SDK" />
<component name="PyCharmProfessionalAdvertiser"> <component name="PyCharmProfessionalAdvertiser">
<option name="shown" value="true" /> <option name="shown" value="true" />
</component> </component>

View File

@ -1 +1 @@
[{"success": 4, "all": 7, "fail": 1, "skip": 0, "error": 2, "runtime": "0.45 S", "begin_time": "2023-04-21 12:10:31", "pass_rate": "57.14"}, {"success": 0, "all": 1, "fail": 0, "skip": 0, "error": 1, "runtime": "0.01 S", "begin_time": "2023-04-21 12:11:52", "pass_rate": "0.00"}, {"success": 5, "all": 6, "fail": 1, "skip": 0, "error": 0, "runtime": "0.33 S", "begin_time": "2023-04-21 12:12:54", "pass_rate": "83.33"}, {"success": 5, "all": 6, "fail": 1, "skip": 0, "error": 0, "runtime": "0.30 S", "begin_time": "2023-04-21 12:13:32", "pass_rate": "83.33"}, {"success": 5, "all": 6, "fail": 1, "skip": 0, "error": 0, "runtime": "0.28 S", "begin_time": "2023-04-21 14:47:43", "pass_rate": "83.33"}, {"success": 5, "all": 6, "fail": 1, "skip": 0, "error": 0, "runtime": "0.27 S", "begin_time": "2023-04-21 15:01:06", "pass_rate": "83.33"}, {"success": 5, "all": 6, "fail": 1, "skip": 0, "error": 0, "runtime": "0.24 S", "begin_time": "2023-04-21 15:03:53", "pass_rate": "83.33"}, {"success": 5, "all": 6, "fail": 1, "skip": 0, "error": 0, "runtime": "0.23 S", "begin_time": "2023-04-21 15:06:10", "pass_rate": "83.33"}] [{"success": 4, "all": 7, "fail": 1, "skip": 0, "error": 2, "runtime": "0.45 S", "begin_time": "2023-04-21 12:10:31", "pass_rate": "57.14"}, {"success": 0, "all": 1, "fail": 0, "skip": 0, "error": 1, "runtime": "0.01 S", "begin_time": "2023-04-21 12:11:52", "pass_rate": "0.00"}, {"success": 5, "all": 6, "fail": 1, "skip": 0, "error": 0, "runtime": "0.33 S", "begin_time": "2023-04-21 12:12:54", "pass_rate": "83.33"}, {"success": 5, "all": 6, "fail": 1, "skip": 0, "error": 0, "runtime": "0.30 S", "begin_time": "2023-04-21 12:13:32", "pass_rate": "83.33"}, {"success": 5, "all": 6, "fail": 1, "skip": 0, "error": 0, "runtime": "0.28 S", "begin_time": "2023-04-21 14:47:43", "pass_rate": "83.33"}, {"success": 5, "all": 6, "fail": 1, "skip": 0, "error": 0, "runtime": "0.27 S", "begin_time": "2023-04-21 15:01:06", "pass_rate": "83.33"}, {"success": 5, "all": 6, "fail": 1, "skip": 0, "error": 0, "runtime": "0.24 S", "begin_time": "2023-04-21 15:03:53", "pass_rate": "83.33"}, {"success": 5, "all": 6, "fail": 1, "skip": 0, "error": 0, "runtime": "0.23 S", "begin_time": "2023-04-21 15:06:10", "pass_rate": "83.33"}, {"success": 6, "all": 7, "fail": 0, "skip": 0, "error": 1, "runtime": "3.00 S", "begin_time": "2023-05-14 23:58:13", "pass_rate": "85.71"}]

View File

@ -14,7 +14,7 @@ class BaseDates:
# 测试用例脚本目录 # 测试用例脚本目录
# ***************************************************************** # *****************************************************************
# script = os.path.join(base_path, "test_script", "auto_script") # script = os.path.join(base_path, "test_script", "test_script")
script = os.path.join(base_path, "test_script") script = os.path.join(base_path, "test_script")
# ***************************************************************** # *****************************************************************

View File

@ -8,16 +8,13 @@
@desc: @desc:
""" """
import json import json
import re
from common.dependence import Dependence from common.dependence import Dependence
from common.tools.logger import MyLog from common.tools.logger import MyLog
from common.tools.singleton import singleton
logger = MyLog() logger = MyLog()
# @singleton
class DependentParameter: class DependentParameter:
""" """
该类用于替换接口参数它会从字符串中寻找需要替换的参数并将其替换为关联参数表中对应的值 该类用于替换接口参数它会从字符串中寻找需要替换的参数并将其替换为关联参数表中对应的值
@ -103,9 +100,10 @@ if __name__ == '__main__':
dat = { dat = {
"a": "{{var_a}}", "a": "{{var_a}}",
"b": {"c": "{{var_c}}", "d": "{{var_d}}", "e": ["{{var_e_1}}", "{{var_e_2}}"]}, "b": {"c": "{{var_c}}", "d": "{{var_d}}", "e": ["{{var_e_1}}", "{{var_e_2}}"]},
"f": "{{var_f}}[0]", "f": "{{var_f}}[1]",
"g": "{{var_g}}", "g": "{{var_g}}",
"t": "{{get_timestamp()}}" "t": "{{get_timestamp()}}"
} }
t = DependentParameter().replace_dependent_parameter(dat) t = DependentParameter().replace_dependent_parameter(dat)
print(t) print(t)
print(type(t))

View File

@ -129,9 +129,8 @@ class DoExcel:
if __name__ == '__main__': if __name__ == '__main__':
# file_n = BaseDates.test_api file_n = BaseDates.test_api
# print(file_n) print(file_n)
file_n = r'D:\apk_api\api-test-project\data\moduleA\test_cases\test_api.xlsx'
test_init = DoExcel(file_n).get_excel_init() test_init = DoExcel(file_n).get_excel_init()
print(test_init) print(test_init)
excel = DoExcel(file_n) excel = DoExcel(file_n)

View File

@ -16,12 +16,6 @@ sys.path.append('venv/Lib/site-packages')
from openpyxl import load_workbook, Workbook from openpyxl import load_workbook, Workbook
# from common.base_path import BasePath
# BASE_PATH = os.path.dirname(os.path.abspath(__file__)) # 获取当前文件所在的文件夹路径
class DoExcel: class DoExcel:
""" """
excel操作类 excel操作类
@ -167,19 +161,9 @@ class DoExcel:
if __name__ == '__main__': if __name__ == '__main__':
# 示例1
# 1. 创建新文件在左边A1中写入数据
# 2. 文件名为test
ex = DoExcel() ex = DoExcel()
# ex.set_value_by_table("A1", '微信关注-给点知识公众号') BASE_PATH = os.path.dirname(os.path.abspath(__file__)) # 获取当前文件所在的文件夹路径
# ex.save('test')
# 示例2
# 1. 打开文件test
# 1. 获取A1中的数据
# ex = DoExcel(path=r"test.xlsx")
# a1 = ex.get_value_by_table("A1")
# print(a1)
data = [{"url": "1234", "header": "2134", "method": "get", "body": "hhh", "ok": 12345}, data = [{"url": "1234", "header": "2134", "method": "get", "body": "hhh", "ok": 12345},
{"url": "1234", "header": "2134"}, {"url": "1234", "header": "2134"},
{"url": "1234", "header": "2134", "method": "{}sss", "body": json.dumps({})}] {"url": "1234", "header": "2134", "method": "{}sss", "body": json.dumps({})}]
# ex.do_main(BasePath.ApiTestReport_xlsx, *data) ex.do_main("excel.xlsx", *data)

View File

@ -24,5 +24,4 @@ def get_file(file_path):
if __name__ == '__main__': if __name__ == '__main__':
f_p = 'data/booking/booking_upload_order' pass
print(get_file(f_p))

38
common/tools/hooks.py Normal file
View File

@ -0,0 +1,38 @@
from common.extractor.dependent_parameter import DependentParameter
class Hooks:
def __init__(self):
self.before_request_funcs = [] # 存放 before_request 钩子函数的列表
self.after_request_funcs = [] # 存放 after_request 钩子函数的列表
def before_request(self, func):
"""
注册 before_request 钩子函数将其添加到 before_request_funcs 列表中
"""
self.before_request_funcs.append(func)
return func
def after_request(self, func):
"""
注册 after_request 钩子函数将其添加到 after_request_funcs 列表中
"""
self.after_request_funcs.append(func)
return func
def run_before_request_funcs(self, request):
"""
依次执行 before_request 钩子函数处理请求参数并返回处理后的请求对象
"""
for func in self.before_request_funcs:
request = func(request)
return request
def run_after_request_funcs(self, response):
"""
依次执行 after_request 钩子函数处理响应结果并返回处理后的响应对象
"""
for func in self.after_request_funcs:
response = func(response)
return response

View File

@ -0,0 +1,111 @@
import json
from common.files_tools.read_file import read_file
result = []
def parsing_postman(path):
"""
解析postman到处的json文件
Args:
path:
Returns:
"""
data = read_file(path)
def _parse_api(content):
global result
api = {}
if isinstance(content, list):
for item in content:
print(item.get('name'))
_parse_api(content=item)
elif isinstance(content, dict):
if 'item' in content.keys():
_parse_api(content=content.get('item'))
elif 'request' in content.keys():
api['name'] = 'postman'
api['description'] = content.get('name')
request = content.get('request')
if request:
# api请求方法
api['Method'] = request.get('method', 'GET').upper()
header = request.get('header')
header = {item.get('key'): item.get('value') for item in header} if header else {}
auth = request.get('auth')
if auth:
auth_type = auth.get('type')
if auth.get(auth_type):
auth_value = {item.get('key'): item.get('value') for item in auth.get(auth_type) if
(item and item.get('key'))}
header.update(auth_value)
# api请求头
api['Headers'] = json.dumps(header, ensure_ascii=False)
# api 请求地址
url = request.get('url')
if url:
api['Url'] = url.get('raw')
# if url and url.get('path'):
# # api请求URL
# api['Url'] = r'/'.join(url.get('path'))
#
# if url and url.get('query'):
# # api查询参数
# api['Request Data'] = '&'.join(
# [item.get('key') + '=' + (item.get('value') or '') for item in url.get('query') if item])
body = request.get('body')
if body:
# api接口请求参数类型
request_mode = body.get('mode')
if 'raw' == request_mode:
api['request_data_type'] = 'json'
elif 'formdata' == request_mode:
api['request_data_type'] = 'data'
elif 'urlencoded' == request_mode:
api['request_data_type'] = 'data'
else:
api['request_data_type'] = 'json'
# api接口请求参数
request_data = body.get(request_mode)
if request_data and 'raw' == request_mode:
api['Request Data'] = request_data.replace('\t', '').replace('\n', '')
elif request_data and 'formdata' == request_mode:
if isinstance(request_data, list):
api['Request Data'] = json.dumps({item.get('key'): '' for item in request_data},
ensure_ascii=False)
else:
api['Request Data'] = ''
api['参数加密方式'] = ''
api['提取请求参数'] = ''
api['Jsonpath'] = ''
api['正则表达式'] = ''
api['正则变量'] = ''
api['绝对路径表达式'] = ''
api['SQL'] = ''
api['sql变量'] = ''
api['响应类型'] = ''
api['预期结果'] = ''
api['响应结果'] = ''
api['断言结果'] = ''
api['报错日志'] = ''
result.append(api)
for _ in data:
_parse_api(content=data)
return result
if __name__ == '__main__':
pat = r'postman.json'
res = parsing_postman(pat)
print('结果:', res)
print("类型", type(res))
from common.files_tools.excel import DoExcel
ex = DoExcel()
ex.do_main("postman.xlsx", *res)

View File

@ -11,51 +11,33 @@ sys.path.append("./common")
from common.tools.logger import MyLog from common.tools.logger import MyLog
# 初始化全局session
session = None
@MyLog().decorator_log("请求异常") @MyLog().decorator_log("请求异常")
def req(host, url, method, headers=None, **kwargs): def req(host, url, method, **kwargs):
""" """
发送 http 请求 发送 http 请求
@param host: 域名 @param host: 域名
@param url: 接口 url @param url: 接口 url
@param method: http 请求方法 @param method: http 请求方法
@param headers: 请求头部信息默认为 None # @param request_data_type: 请求数据类型
# @param headers: 请求头部信息,默认为 None
@param kwargs: 接受 requests 原生的关键字参数 @param kwargs: 接受 requests 原生的关键字参数
@return: 响应对象 @return: 响应对象
""" """
# 关闭 https 警告信息 # 关闭 https 警告信息
urllib3.disable_warnings() urllib3.disable_warnings()
global session
if not session:
session = requests.Session()
if not url: if not url:
raise ValueError("URL 不能为 None") raise ValueError("URL 不能为 None")
with requests.Session() as session: url = f'{host}{url}' if not re.match(r"https?", url) else url
func = getattr(session, method.lower())
url = f'{host}{url}' if not re.match(r"https?", url) else url return func(url, verify=True, timeout=30, **kwargs)
# 利用反射动态获取requests 模块中对应的请求方法,会有少量性能开销
func = getattr(session, method.lower())
# 默认请求信息
default_headers = {"Content-Type": "application/json;charset=UTF-8"}
# 更新请求头
if headers:
default_headers.update(headers)
headers = default_headers
# 取出传入的 data 和 headers, 如果没有传入则使用默认值
data = kwargs.pop("data", None)
if "application/x-www-form-urlencoded" in default_headers.values():
return func(url, headers=headers, params=data, data=data, verify=False, timeout=30, **kwargs)
else:
return func(url, headers=headers, params=data, json=data, verify=False, timeout=30, **kwargs)
if __name__ == '__main__': if __name__ == '__main__':
h = 'https://bimdc.bzlrobot.com' ...
# u = r'/bsp/test/user/ugs/ibs/api/ibs-material/material/jobRequire/pages?t=1681118956000'
hea = {"BSP_TOKEN": "fc8fc6626920b8a8c729c6e003fbfc4f"}
# da = {"ncCode": "", "applyTimeBegin": "", "applyTimeEnd": "", "applyUserName": "", "auditStatus": "",
# "buildingCode": "", "code": "", "name": "", "purchaseType": "", "size": 10, "current": 1,
# "projectId": "104966"}
u = r'/bsp/test/user/ugs/ibs/api/ibs-material/checkAccept/todo/count'
da = {"t": "1681120148000", "userId": "216483504447804297", "projectId": "104966"}
reqs = req(h, u, "get", headers=hea, data=da).json()
print(reqs)

38
common/tools/request.py Normal file
View File

@ -0,0 +1,38 @@
import re
import requests
import urllib3
from common.tools.hooks import hooks
def req(host, url, method, **kwargs):
"""
发送 http 请求
@param host: 域名
@param url: 接口 url
@param method: http 请求方法
@param kwargs: 接受 requests 原生的关键字参数
@return: 响应对象
"""
# 关闭 https 警告信息
urllib3.disable_warnings()
global session
if not session:
session = requests.Session()
if not url:
raise ValueError("URL 不能为 None")
url = f'{host}{url}' if not re.match(r"https?", url) else url
# 执行 before_request 钩子函数
request = requests.Request(method, url, **kwargs)
request = hooks.run_before_request_funcs(request)
# 发送请求
prepared_request = session.prepare_request(request)
response = session.send(prepared_request)
# 执行 after_request 钩子函数
response = hooks.run_after_request_funcs(response)
return response

View File

@ -0,0 +1,64 @@
from common.extractor.dependent_parameter import DependentParameter
from common.extractor.data_extractor import DataExtractor
from common.encryption.encryption_main import do_encrypt
from common.tools.hooks import Hooks
from common.tools.logger import MyLog
hooks = Hooks()
logger = MyLog()
dep_par = DependentParameter() # 参数提取类实例化
@hooks.before_request
def update_url(request):
"""更新url"""
request.url = dep_par.replace_dependent_parameter(request.url)
logger.my_log(f"请求地址 --> {request.url}", "info")
return request
@hooks.before_request
def update_header(request):
"""更新请求头"""
request.headers = dep_par.replace_dependent_parameter(request.headers)
logger.my_log(f"请求头 --> {request.headers}", "info")
return request
@hooks.before_request
def update_body(request):
"""更新请求参数"""
if request.json:
request.json = dep_par.replace_dependent_parameter(request.json)
if request.encryption:
request.data = do_encrypt(request.encryption, request.json) # 数据加密MD5  
logger.my_log(f"请求 body --> {request.json}", "info")
else:
request.data = dep_par.replace_dependent_parameter(request.data)
if request.encryption:
request.data = do_encrypt(request.encryption, request.data) # 数据加密MD5  
logger.my_log(f"请求 body --> {request.data}", "info")
return request
@hooks.before_request
def update_expected(request):
"""更新预期结果"""
request.expected = dep_par.replace_dependent_parameter(request.expected)
logger.my_log(f"预期结果 --> {request.expected}", "info")
return request
@hooks.after_request
def parse_json(response):
"""
尝试将响应中的内容解析为 JSON 格式
"""
try:
response.json_data = response.json()
except ValueError:
response.json_data = None
return response

Binary file not shown.

Before

Width:  |  Height:  |  Size: 206 KiB

View File

@ -1,92 +1,111 @@
import json # import json
#
from common.files_tools.read_file import read_file # from common.files_tools.read_file import read_file
#
result = [] # result = []
#
#
def parsing_postman(path): # def parsing_postman(path):
""" # """
解析postman到处的json文件 # 解析postman到处的json文件
Args: # Args:
path: # path:
#
Returns: # Returns:
#
""" # """
data = read_file(path) # data = read_file(path)
#
# print("元數據", data) # def _parse_api(content):
# global result
def _parse_api(content): # api = {}
global result # if isinstance(content, list):
api = {} # for item in content:
if isinstance(content, list): # print(item.get('name'))
for item in content: # _parse_api(content=item)
api['name'] = item.get('name') # elif isinstance(content, dict):
_parse_api(content=item) # if 'item' in content.keys():
elif isinstance(content, dict): # _parse_api(content=content.get('item'))
if 'item' in content.keys(): # elif 'request' in content.keys():
_parse_api(content=content.get('item')) # api['name'] = 'postman'
elif 'request' in content.keys(): # api['description'] = content.get('name')
api['description'] = content.get('name') # request = content.get('request')
request = content.get('request') # if request:
if request: # # api请求方法
# api请求方法 # api['Method'] = request.get('method', 'GET').upper()
api['Method'] = request.get('method', 'GET').upper() # header = request.get('header')
# print('Method----->', api['Method']) # header = {item.get('key'): item.get('value') for item in header} if header else {}
header = request.get('header') #
header = {item.get('key'): item.get('value') for item in header} if header else {} # auth = request.get('auth')
# if auth:
auth = request.get('auth') # auth_type = auth.get('type')
if auth: # if auth.get(auth_type):
auth_type = auth.get('type') # auth_value = {item.get('key'): item.get('value') for item in auth.get(auth_type) if
if auth.get(auth_type): # (item and item.get('key'))}
auth_value = {item.get('key'): item.get('value') for item in auth.get(auth_type) if # header.update(auth_value)
(item and item.get('key'))} # # api请求头
header.update(auth_value) # api['Headers'] = json.dumps(header, ensure_ascii=False)
# api请求头 # # api 请求地址
api['Headers'] = json.dumps(header, ensure_ascii=False) # url = request.get('url')
# print('Headers---->:', api['Headers']) # if url:
url = request.get('url') # api['Url'] = url.get('raw')
if url: # # if url and url.get('path'):
api['Url'] = url.get('raw') # # # api请求URL
# if url and url.get('path'): # # api['Url'] = r'/'.join(url.get('path'))
# # api请求URL # #
# api['Url'] = r'/'.join(url.get('path')) # # if url and url.get('query'):
# # # # api查询参数
# if url and url.get('query'): # # api['Request Data'] = '&'.join(
# # api查询参数 # # [item.get('key') + '=' + (item.get('value') or '') for item in url.get('query') if item])
# api['Request Data'] = '&'.join( # body = request.get('body')
# [item.get('key') + '=' + (item.get('value') or '') for item in url.get('query') if item]) # if body:
# print('Url---->:', api['Url']) # # api接口请求参数类型
body = request.get('body') # request_mode = body.get('mode')
if body: # if 'raw' == request_mode:
# api接口请求参数类型 # api['request_data_type'] = 'json'
request_mode = body.get('mode') # elif 'formdata' == request_mode:
if 'raw' == request_mode: # api['request_data_type'] = 'data'
api['request_data_type'] = 'Json' # elif 'urlencoded' == request_mode:
elif 'formdata' == request_mode: # api['request_data_type'] = 'data'
api['request_data_type'] = 'Form Data' # else:
# print('------------------------>Body', body) # api['request_data_type'] = ''
# api接口请求参数 # # api接口请求参数
request_data = body.get(request_mode) # request_data = body.get(request_mode)
if request_data and 'raw' == request_mode: # if request_data and 'raw' == request_mode:
api['Request Data'] = request_data.replace('\t', '').replace('\n', '') # api['Request Data'] = request_data.replace('\t', '').replace('\n', '')
# print('Body------>1:', api['Request Data']) # elif request_data and 'formdata' == request_mode:
elif request_data and 'formdata' == request_mode: # if isinstance(request_data, list):
if isinstance(request_data, list): # api['Request Data'] = json.dumps({item.get('key'): '' for item in request_data},
api['Request Data'] = json.dumps({item.get('key'): '' for item in request_data}, # ensure_ascii=False)
ensure_ascii=False) # else:
# print('Body------>2:', api['Request Data']) # api['Request Data'] = ''
result.append(api) # api['参数加密方式'] = ''
# api['提取请求参数'] = ''
for d in data: # api['Jsonpath'] = ''
_parse_api(content=data) # api['正则表达式'] = ''
return result # api['正则变量'] = ''
# api['绝对路径表达式'] = ''
# api['SQL'] = ''
if __name__ == '__main__': # api['sql变量'] = ''
pat = r'C:\Users\chenyongzhi11\Desktop\市调情报系统.postman_collection.json' # api['响应类型'] = ''
res = parsing_postman(pat) # api['预期结果'] = ''
print('结果:', res) # api['响应结果'] = ''
# api['断言结果'] = ''
# api['报错日志'] = ''
#
# result.append(api)
#
# for _ in data:
# _parse_api(content=data)
# return result
#
#
# if __name__ == '__main__':
# pat = r'postman.json'
# res = parsing_postman(pat)
# print('结果:', res)
# print("类型", type(res))
# from common.files_tools.excel import DoExcel
#
# ex = DoExcel()
# ex.do_main("postman.xlsx", *res)

305
temp/postman.json Normal file
View File

@ -0,0 +1,305 @@
{
"info": {
"_postman_id": "9f5b15a9-464b-40c7-8aa5-95d3094cce23",
"name": "New Collection",
"schema": "https://schema.getpostman.com/json/collection/v2.1.0/collection.json",
"_exporter_id": "6288605"
},
"item": [
{
"name": "login",
"event": [
{
"listen": "test",
"script": {
"exec": [
"pm.test(\"get_bsptoken\", function () {\r",
" let res = pm.response.json();\r",
" let bspToken = res.data.bspToken;\r",
" pm.environment.set(\"bsToken\", bspToken);\r",
"\r",
"})"
],
"type": "text/javascript"
}
}
],
"request": {
"method": "POST",
"header": [],
"body": {
"mode": "raw",
"raw": "{\r\n \"account\": \"18127813600\",\r\n \"password\": \"WD6Y0+LJLHXuFaplzUtSCnwktA7KgXCpjCS+OVvIFGTEoz2gbqK2oOOuJUf7ao0m2YYGiGi1pQTMBnkrxIY1cztGYbVp97kvIQwZLN4UhrOAe3h1asY/NLnDwB/byl7agcGv9WI4oy6B1Z93HVHmQiAKn7QqnDgPVITu4jthNc8=\"\r\n}",
"options": {
"raw": {
"language": "json"
}
}
},
"url": {
"raw": "{{url}}/bsp/test/user/ugs/auth/loginByNotBip",
"host": [
"{{url}}"
],
"path": [
"bsp",
"test",
"user",
"ugs",
"auth",
"loginByNotBip"
]
}
},
"response": []
},
{
"name": "获取需求采购列表(默认条件查询-成功)",
"event": [
{
"listen": "prerequest",
"script": {
"exec": [
""
],
"type": "text/javascript"
}
},
{
"listen": "test",
"script": {
"exec": [
"pm.test(\"assert_code\", function () {\r",
" let body = JSON.parse(pm.request.body.raw)\r",
" body_code = body.code\r",
" let res = pm.response.json()\r",
" for (let i of res.data.records) {\r",
" pm.expect(i.orderCode, \"不包含0003\").to.include(body_code)\r",
" }\r",
" let requireId = res.data.records[0].id\r",
" pm.environment.set(\"requireId\", requireId)\r",
"})"
],
"type": "text/javascript"
}
}
],
"request": {
"method": "POST",
"header": [
{
"key": "BSP_TOKEN",
"value": "{{bsToken}}",
"type": "text"
}
],
"body": {
"mode": "raw",
"raw": "{\r\n \"ncCode\": \"\",\r\n \"applyTimeBegin\": \"\",\r\n \"applyTimeEnd\": \"\",\r\n \"applyUserName\": \"\",\r\n \"auditStatus\": \"\",\r\n \"buildingCode\": \"\",\r\n \"code\": \"RE202210270003\",\r\n \"name\": \"\",\r\n \"purchaseType\": \"\",\r\n \"size\": 10,\r\n \"current\": 1,\r\n \"projectId\": \"104966\"\r\n}",
"options": {
"raw": {
"language": "json"
}
}
},
"url": {
"raw": "{{url}}/bsp/test/user/ugs/ibs/api/ibs-material/material/jobRequire/pages?t=1681310676000",
"host": [
"{{url}}"
],
"path": [
"bsp",
"test",
"user",
"ugs",
"ibs",
"api",
"ibs-material",
"material",
"jobRequire",
"pages"
],
"query": [
{
"key": "t",
"value": "1681310676000"
}
]
}
},
"response": []
},
{
"name": "获取请购详情",
"request": {
"method": "GET",
"header": [
{
"key": "BSP_TOKEN",
"value": "aba920e63584c3b1fef3cb4c498bca44",
"type": "text"
}
],
"url": {
"raw": "https://bimdc.bzlrobot.com/bsp/test/user/ugs/ibs/api/ibs-material/material/jobRequire/detail?t=&isEdit=&requireId=5804&projectId=",
"protocol": "https",
"host": [
"bimdc",
"bzlrobot",
"com"
],
"path": [
"bsp",
"test",
"user",
"ugs",
"ibs",
"api",
"ibs-material",
"material",
"jobRequire",
"detail"
],
"query": [
{
"key": "t",
"value": ""
},
{
"key": "isEdit",
"value": ""
},
{
"key": "requireId",
"value": "5804"
},
{
"key": "projectId",
"value": ""
}
]
}
},
"response": []
},
{
"name": "获取请购详情 Copy",
"request": {
"method": "POST",
"header": [
{
"key": "BSP_TOKEN",
"value": "aba920e63584c3b1fef3cb4c498bca44",
"type": "text"
}
],
"body": {
"mode": "urlencoded",
"urlencoded": []
},
"url": {
"raw": "https://bimdc.bzlrobot.com/bsp/test/user/ugs/ibs/api/ibs-material/material/jobRequire/detail?t=&isEdit=&requireId=5804&projectId=",
"protocol": "https",
"host": [
"bimdc",
"bzlrobot",
"com"
],
"path": [
"bsp",
"test",
"user",
"ugs",
"ibs",
"api",
"ibs-material",
"material",
"jobRequire",
"detail"
],
"query": [
{
"key": "t",
"value": ""
},
{
"key": "isEdit",
"value": ""
},
{
"key": "requireId",
"value": "5804"
},
{
"key": "projectId",
"value": ""
}
]
}
},
"response": []
},
{
"name": "获取请购详情 Copy 2",
"request": {
"method": "POST",
"header": [
{
"key": "BSP_TOKEN",
"value": "aba920e63584c3b1fef3cb4c498bca44",
"type": "text"
}
],
"body": {
"mode": "raw",
"raw": "",
"options": {
"raw": {
"language": "html"
}
}
},
"url": {
"raw": "https://bimdc.bzlrobot.com/bsp/test/user/ugs/ibs/api/ibs-material/material/jobRequire/detail?t=&isEdit=&requireId=5804&projectId=",
"protocol": "https",
"host": [
"bimdc",
"bzlrobot",
"com"
],
"path": [
"bsp",
"test",
"user",
"ugs",
"ibs",
"api",
"ibs-material",
"material",
"jobRequire",
"detail"
],
"query": [
{
"key": "t",
"value": ""
},
{
"key": "isEdit",
"value": ""
},
{
"key": "requireId",
"value": "5804"
},
{
"key": "projectId",
"value": ""
}
]
}
},
"response": []
}
]
}

View File

@ -10,6 +10,8 @@ import sys
import unittest import unittest
import re import re
from common.files_tools.get_file import get_file
sys.path.append("./common") sys.path.append("./common")
sys.path.append("./") sys.path.append("./")
@ -19,14 +21,14 @@ from unittestreport import TestRunner
# @decorator_send_info() # @decorator_send_info()
def run_test_case(case_name, url_key=None): def run(case_name='T', url_key=None):
test_report = BaseDates.test_report test_report = BaseDates.test_report
print(f"当前测试报告路劲: {test_report},测试脚本路劲: {BaseDates.script},测试用例脚本名称: {case_name}") print(f"当前测试报告路劲: {test_report},测试脚本路劲: {BaseDates.script}")
t_case = unittest.defaultTestLoader.discover(BaseDates.script, pattern=f"{case_name}.py") t_case = unittest.defaultTestLoader.discover(BaseDates.script, pattern="test_*.py")
runner = TestRunner(t_case, report_dir=test_report, filename=case_name, title="接口自动化测试报告", templates=2, runner = TestRunner(t_case, report_dir=test_report, filename=case_name, title="接口自动化测试报告", templates=2,
tester="kira", desc="自动化测试") tester="kira", desc="自动化测试")
runner.run() runner.run()
# with open(BaseDates.test_report + f"/{case_name} 测试报告.html", "wb") as fb: # with open(test_report + f"/{case_name} 测试报告.html", "wb") as fb:
# runner = HTMLTestRunner(stream=fb, verbosity=2, title=f"{case_name} 接口自动化测试报告", # runner = HTMLTestRunner(stream=fb, verbosity=2, title=f"{case_name} 接口自动化测试报告",
# description="接口自动化测试") # description="接口自动化测试")
# runner.run(t_case) # runner.run(t_case)
@ -34,23 +36,9 @@ def run_test_case(case_name, url_key=None):
# return data # return data
def run(): def start():
keys = {"test_": "8b1647d4-dc32-447c-b524-548acf18a938" # 企業微信key run()
}
# 获取测试用例脚本文件夹下所有文件
test_case_names = os.listdir(BaseDates.script)
for name in test_case_names:
if re.match(r"test_.+?py", name):
test_case = re.match(r"test_.+?py", name).group()
case = test_case.split(".")[0]
# run_test_case(case)
for key, value in keys.items():
if key in case:
print(f"當前運行的用例:{case}; 對應的機器人key {value}")
run_test_case(case, value)
else:
print("No script to be executed")
if __name__ == '__main__': if __name__ == '__main__':
run() start()

View File

@ -8,7 +8,6 @@
@desc: @desc:
""" """
import unittest import unittest
import warnings
from common import bif_functions from common import bif_functions
from common.extractor.dependent_parameter import DependentParameter as DP from common.extractor.dependent_parameter import DependentParameter as DP
@ -39,7 +38,6 @@ class BaseClass(unittest.TestCase):
loaders.set_bif_fun(bif_functions) # 加载内置方法 loaders.set_bif_fun(bif_functions) # 加载内置方法
logger.my_log("内置方法加载完成", "info") logger.my_log("内置方法加载完成", "info")
logger.my_log(f"所有用例执行开始...", "info") logger.my_log(f"所有用例执行开始...", "info")
# warnings.simplefilter('ignore', ResourceWarning)
super().setUpClass() super().setUpClass()
@classmethod @classmethod
@ -74,10 +72,11 @@ class BaseClass(unittest.TestCase):
self.description = self._case.get("description") self.description = self._case.get("description")
self.url = self._case.get("Url") self.url = self._case.get("Url")
self.method = self._case.get("Method") self.method = self._case.get("Method")
self.request_data_type = self._case.get("request_data_type", "json")
self.sql_variable = self._case.get("sql变量") self.sql_variable = self._case.get("sql变量")
self.sqlps = self._case.get("SQL") self.sqlps = self._case.get("SQL")
self.headers = self._case.get("Headers", {}) self.headers = self._case.get("Headers", {})
self.parameters = self._case.get("请求参数") self.parameters = self._case.get("Request Data")
self.parameters_key = self._case.get("提取请求参数") self.parameters_key = self._case.get("提取请求参数")
self.encryption = self._case.get("参数加密方式") self.encryption = self._case.get("参数加密方式")
self.regex = self._case.get("正则表达式") self.regex = self._case.get("正则表达式")
@ -112,10 +111,14 @@ class BaseClass(unittest.TestCase):
def do_request(self): def do_request(self):
"""发送http请求""" """发送http请求"""
try: try:
logger.my_log(f"请求 URL --> {self.url}", "info") kwargs = {
logger.my_log(f"请求 HEADERS --> {self.headers}", "info") "headers": self.headers,
logger.my_log(f"请求 BODY --> {self.parameters}", "info") self.request_data_type: self.parameters
self._response = req(self.host, self.url, self.method, headers=self.headers, data=self.parameters) }
self._response = req(self.host, self.url, self.method, **kwargs)
logger.my_log(f"请求 URL --> {self._response.request.url}", "info")
logger.my_log(f"请求 HEADERS --> {self._response.request.headers}", "info")
logger.my_log(f"请求 BODY --> {self._response.request.body}", "info")
logger.my_log(f"接口耗时--> 【{self._response.elapsed}", "info") logger.my_log(f"接口耗时--> 【{self._response.elapsed}", "info")
logger.my_log(f"接口状态--> 【{self._response.status_code}", "info") logger.my_log(f"接口状态--> 【{self._response.status_code}", "info")
logger.my_log(f"接口响应--> {self._response.text}", "info") logger.my_log(f"接口响应--> {self._response.text}", "info")
@ -155,7 +158,7 @@ class BaseClass(unittest.TestCase):
if execute_sql_results and self.sql_variable: if execute_sql_results and self.sql_variable:
# 执行sql数据提取 # 执行sql数据提取
DataExtractor(execute_sql_results).substitute_data(jp_dict=self.sql_variable) DataExtractor(execute_sql_results).substitute_data(jp_dict=self.sql_variable)
if method == "SQL" and self.mysql: if self.method == "SQL" and self.mysql:
return return
except Exception as e: except Exception as e:
logger.my_log(f'执行 sql 失败:{self.sql},异常:{e}') logger.my_log(f'执行 sql 失败:{self.sql},异常:{e}')

View File

@ -3,13 +3,13 @@ import unittest
from ddt import ddt, data from ddt import ddt, data
sys.path.append("../../") sys.path.append("../")
sys.path.append("../../common") sys.path.append("../../common")
from common.files_tools.get_excel_init import get_init from common.files_tools.get_excel_init import get_init
from common.tools.logger import MyLog from common.tools.logger import MyLog
from common.dependence import Dependence from common.dependence import Dependence
from common.base_datas import BaseDates from common.base_datas import BaseDates
from test_script.baseclass import BaseClass from test_script.script.baseclass import BaseClass
test_file = BaseDates.test_api # 获取 excel 文件路径 test_file = BaseDates.test_api # 获取 excel 文件路径
excel_handle, init_data, test_case = get_init(test_file) excel_handle, init_data, test_case = get_init(test_file)

View File

@ -1,9 +0,0 @@
#!/usr/bin/env python
# encoding: utf-8
"""
@author: kira
@contact: 262667641@qq.com
@file: __init__.py.py
@time: 2023/4/19 17:15
@desc:
"""

View File

@ -1,169 +0,0 @@
import sys
import time
import unittest
import warnings
from ddt import ddt, data
from common.base_datas import BaseDates
sys.path.append("../../")
sys.path.append("../../common")
from common.files_tools.get_excel_init import get_init
from common.extractor.dependent_parameter import DependentParameter
from common.extractor.data_extractor import DataExtractor
from common.encryption.encryption_main import do_encrypt
from common.do_sql.do_mysql import DoMysql
from common.tools.req import req
from common.tools.logger import MyLog
from common.comparator import loaders
from common.dependence import Dependence
from common.comparator.validator import Validator
from common import bif_functions
warnings.simplefilter('ignore', ResourceWarning)
test_file = BaseDates.test_api # 获取 excel 文件路径
excel_handle, init_data, test_case = get_init(test_file)
databases = init_data.get('databases') # 获取数据库配置信息
mysql = DoMysql(databases) # 初始化 mysql 链接
dep = Dependence
dep.set_dep(eval(init_data.get("initialize_data"))) # 初始化依赖表
dep_par = DependentParameter() # 参数提取类实例化
logger = MyLog()
@ddt
class TestProjectApi(unittest.TestCase):
maxDiff = None
@classmethod
def setUpClass(cls) -> None:
loaders.set_bif_fun(bif_functions) # 加载内置方法
# 获取初始化基础数据
cls.host = init_data.get('host')
cls.path = init_data.get("path")
username = dep.get_dep("{{account}}")
password = dep.get_dep("{{passwd}}")
cls.headers = login(cls.host + cls.path, username, password)
dep.update_dep("headers", cls.headers)
# 加载内置方法
logger.my_log(f"内置方法:{dep.get_dep()}", "info")
def setUp(self) -> None:
logger.my_log(f"获取当前依赖参数表:{dep.get_dep()}")
logger.my_log("-----------------------------------start_test_api-----------------------------------", "info")
@data(*test_case) # {"":""}
def test_api(self, item): # item = {測試用例}
# f"""用例描述:{item.get("name")}_{item.get("desc")}"""
sheet = item.get("sheet")
item_id = item.get("Id")
name = item.get("name")
description = item.get("description")
host = self.__class__.host
path = self.__class__.path
url = item.get("Url")
headers = self.__class__.headers
run = item.get("Run")
method = item.get("Method")
sql_variable = item.get("sql变量")
sqlps = item.get("SQL")
item_headers = item.get("Headers") if item.get("Headers") else {}
parameters = item.get("请求参数")
parameters_key = item.get("提取请求参数")
encryption = item.get("参数加密方式")
regex = item.get("正则表达式")
keys = item.get("正则变量")
deps = item.get("绝对路径表达式")
jp_dict = item.get("Jsonpath")
expect = item.get("预期结果")
if run.upper() != "YES":
return
if method == "TIME":
try:
time.sleep(int(url))
logger.my_log(f"暂存成功:{url}", "info")
return
except Exception as e:
MyLog().my_log(f'暂停时间必须是数字')
raise e
# 首先执行sql替换,将sql替换为正确的sql语句
sql = dep_par.replace_dependent_parameter(sqlps)
try:
# 执行 sql 操作
execute_sql_results = mysql.do_mysql(sql)
if execute_sql_results and sql_variable:
# 执行sql数据提取
DataExtractor(execute_sql_results).substitute_data(jp_dict=sql_variable)
if method == "SQL" and mysql:
return
except Exception as e:
logger.my_log(f'sql:{sql},异常:{e}')
raise e
# 替换 URL, PARAMETERS, HEADER,期望值
url = dep_par.replace_dependent_parameter(url)
parameters = dep_par.replace_dependent_parameter(parameters)
item_headers = dep_par.replace_dependent_parameter(item_headers)
headers = {**headers, **item_headers}
expected = dep_par.replace_dependent_parameter(expect)
# 提取请求参数信息
DataExtractor(parameters).substitute_data(jp_dict=parameters_key)
# 判断是否执行加密
if encryption:
parameters = do_encrypt(encryption, parameters) # 数据加密MD5  
logger.my_log(f"当前用例所在的sheet--> {sheet}", "info")
logger.my_log(f"请求地址--> {host + path + url}", "info")
logger.my_log(f"请求头--> {headers}", "info")
logger.my_log(f"请求body--> {parameters}", "info")
logger.my_log(f"执行SQL语句--> {sql}", "info")
logger.my_log(f"执行sql结果--> {execute_sql_results}", "info")
logger.my_log(f"预期结果--> {expected}", "info")
try:
# 执行请求操作
response = req(host + path, url, method, headers=headers, data=parameters)
logger.my_log(f"接口响应--> {response.text}", "info")
logger.my_log(f"接口耗时--> {response.elapsed}", "info")
except Exception as e:
result = "失败"
logger.my_log(f'{result}:{item_id}-->{name}_{description},异常:{e}')
raise e
result_tuple = Validator().run_validate(expected, response.json()) # 执行断言 返回结果元组
result = "PASS"
try:
self.assertNotIn("FAIL", result_tuple, "FAIL 存在结果元组中")
except Exception as e:
result = "FAIL"
raise e
finally:
pass
# 响应结果及测试结果回写 excel
# excel_handle.write_back(
# sheet_name=sheet,
# i=item_id,
# response_value=response.text,
# test_result=result,
# assert_log=str(result_tuple)
# )
try:
# 提取响应
DataExtractor(response.json()).substitute_data(regex=regex, keys=keys, deps=deps, jp_dict=jp_dict)
except:
logger.my_log(
f"提取响应失败:{name}_{description}:"
f"\nregex={regex},"
f" \nkeys={keys}, "
f"\ndeps={deps}, "
f"\njp_dict={jp_dict}")
def tearDown(self) -> None:
logger.my_log("-----------------------------------end_test_api-----------------------------------", "info")
@classmethod
def tearDownClass(cls) -> None:
pass
if __name__ == '__main__':
unittest.main()

View File

@ -49,11 +49,12 @@ class TestProjectApi(unittest.TestCase):
name = item.get("name") name = item.get("name")
description = item.get("description") description = item.get("description")
url = item.get("Url") url = item.get("Url")
request_data_type = item.get("request_data_type", "json")
method = item.get("Method") method = item.get("Method")
sql_variable = item.get("sql变量") sql_variable = item.get("sql变量")
sqlps = item.get("SQL") sqlps = item.get("SQL")
item_headers = item.get("Headers", {}) item_headers = item.get("Headers", {})
parameters = item.get("请求参数") request_data = item.get("Request Data")
parameters_key = item.get("提取请求参数") parameters_key = item.get("提取请求参数")
encryption = item.get("参数加密方式") encryption = item.get("参数加密方式")
regex = item.get("正则表达式") regex = item.get("正则表达式")
@ -90,24 +91,28 @@ class TestProjectApi(unittest.TestCase):
# 替换 URL, PARAMETERS, HEADER,期望值 # 替换 URL, PARAMETERS, HEADER,期望值
url = dep_par.replace_dependent_parameter(url) url = dep_par.replace_dependent_parameter(url)
parameters = dep_par.replace_dependent_parameter(parameters) request_data = dep_par.replace_dependent_parameter(request_data)
headers = dep_par.replace_dependent_parameter(item_headers) headers = dep_par.replace_dependent_parameter(item_headers)
expected = dep_par.replace_dependent_parameter(expect) expected = dep_par.replace_dependent_parameter(expect)
# 提取请求参数信息 # 提取请求参数信息
DataExtractor(parameters).substitute_data(jp_dict=parameters_key) DataExtractor(request_data).substitute_data(jp_dict=parameters_key)
# 判断是否执行加密 # 判断是否执行加密
if encryption: if encryption:
parameters = do_encrypt(encryption, parameters) # 数据加密MD5   request_data = do_encrypt(encryption, request_data) # 数据加密MD5  
logger.my_log(f"当前用例所在的 sheet --> {sheet}", "info") logger.my_log(f"当前用例所在的 sheet --> {sheet}", "info")
logger.my_log(f"请求地址 --> {host + url}", "info")
logger.my_log(f"请求头 --> {headers}", "info")
logger.my_log(f"请求 body --> {parameters}", "info")
logger.my_log(f"执行 SQL 语句 --> {sql}", "info") logger.my_log(f"执行 SQL 语句 --> {sql}", "info")
logger.my_log(f"预期结果 --> {expected}", "info") logger.my_log(f"预期结果 --> {expected}", "info")
try: try:
# 执行请求操作 # 执行请求操作
response = req(host, url, method, headers=headers, data=parameters) kwargs = {
request_data_type: request_data,
'headers': headers
}
response = req(host, url, method, **kwargs)
logger.my_log(f"请求地址 --> {response.request.url}", "info")
logger.my_log(f"请求头 --> {response.request.headers}", "info")
logger.my_log(f"请求 body --> {response.request.body}", "info")
logger.my_log(f"接口状态--> {response.status_code}", "info") logger.my_log(f"接口状态--> {response.status_code}", "info")
logger.my_log(f"接口耗时--> {response.elapsed}", "info") logger.my_log(f"接口耗时--> {response.elapsed}", "info")
logger.my_log(f"接口响应--> {response.text}", "info") logger.my_log(f"接口响应--> {response.text}", "info")