193 lines
7.8 KiB
Python
193 lines
7.8 KiB
Python
#!/usr/bin/env python
|
||
# -*- coding: utf-8 -*-
|
||
|
||
|
||
import requests
|
||
from tools.runtime_control import execution_duration
|
||
from tools.mysql_control import MysqlDB,SetUpMySQL,AssertExecution
|
||
from tools.log_decorator import log_decorator
|
||
from tools.allure_control import allure_step, allure_step_no, SqlSwitch,allure_attach
|
||
from tools.regular_control import cache_regular
|
||
from config.settings import ConfigHandler
|
||
from requests_toolbelt import MultipartEncoder
|
||
from tools.dependent_case import DependentCase
|
||
from typing import Any
|
||
import random
|
||
import os
|
||
|
||
class Transmission:
|
||
JSON: str = "JSON"
|
||
PARAMS: str = "params"
|
||
DATE: str = "date"
|
||
FILE: str = 'file'
|
||
|
||
|
||
class RequestControl:
|
||
""" 封装请求 """
|
||
|
||
def __init__(self):
|
||
# TODO 初始化逻辑调整
|
||
pass
|
||
|
||
@classmethod
|
||
def _check_params(cls, response, yaml_data, headers, cookie, res_time, status_code, teardown,assert_data) -> Any:
|
||
""" 抽离出通用模块,判断 http_request 方法中的一些数据校验 """
|
||
# 判断数据库开关,开启状态,则返回对应的数据
|
||
if SqlSwitch() and yaml_data['sql'] is not None:
|
||
sql_data = AssertExecution.assert_execution(sql=yaml_data['sql'], resp=response)
|
||
return {"response_data": response, "sql_data": sql_data, "yaml_data": yaml_data,
|
||
"headers": headers, "cookie": cookie, "res_time": res_time, "status_code": status_code,
|
||
"teardown": teardown,"assert": assert_data}
|
||
else:
|
||
# 数据库关闭走的逻辑
|
||
res = response
|
||
return {"response_data": res, "sql_data": {"sql": None}, "yaml_data": yaml_data,
|
||
"headers": headers, "cookie": cookie, "res_time": res_time, "status_code": status_code,
|
||
"teardown": teardown,"assert": assert_data}
|
||
|
||
@classmethod
|
||
def response_elapsed_total_seconds(cls, res):
|
||
"""获取接口响应时长"""
|
||
return res.elapsed.total_seconds()
|
||
|
||
@classmethod
|
||
def text_encode(cls, text):
|
||
"""unicode 解码"""
|
||
return text.encode("utf-8").decode("utf-8")
|
||
|
||
@classmethod
|
||
def file_data_exit(cls, yaml_data, file_data):
|
||
"""判断上传文件时,data参数是否存在"""
|
||
# 兼容又要上传文件,又要上传其他类型参数
|
||
try:
|
||
for key, value in yaml_data['data']['data'].items():
|
||
file_data[key] = value
|
||
except KeyError:
|
||
pass
|
||
|
||
@classmethod
|
||
def multipart_data(cls, file_data):
|
||
multipart = MultipartEncoder(
|
||
fields=file_data, # 字典格式
|
||
boundary='-----------------------------' + str(random.randint(int(1e28), int(1e29 - 1)))
|
||
)
|
||
return multipart
|
||
|
||
@classmethod
|
||
def file_prams_exit(cls, yaml_data, multipart):
|
||
# 判断上传文件接口,文件参数是否存在
|
||
try:
|
||
params = yaml_data['data']['params']
|
||
except KeyError:
|
||
params = None
|
||
return multipart, params
|
||
|
||
def upload_file(self, yaml_data):
|
||
"""
|
||
判断处理上传文件
|
||
:param yaml_data:
|
||
:return:
|
||
"""
|
||
# 处理上传多个文件的情况
|
||
yaml_data = eval(cache_regular(str(yaml_data)))
|
||
_files = []
|
||
file_data = {}
|
||
# 兼容又要上传文件,又要上传其他类型参数
|
||
self.file_data_exit(yaml_data, file_data)
|
||
for key, value in yaml_data['data']['file'].items():
|
||
file_path = os.path.join(ConfigHandler.file_path,value)
|
||
file_data[key] = (value, open(file_path, 'rb'), 'application/octet-stream')
|
||
_files.append(file_data)
|
||
# allure中展示该附件
|
||
allure_attach(source=file_path, name=value, extension=value)
|
||
multipart = self.multipart_data(file_data)
|
||
yaml_data['headers']['Content-Type'] = multipart.content_type
|
||
# yaml_data, multipart = self.file_prams_exit(yaml_data, multipart)
|
||
return yaml_data, multipart
|
||
|
||
@log_decorator(True)
|
||
@execution_duration(3000)
|
||
# @encryption("md5")
|
||
def http_request(self, yaml_data, dependent_switch=True, **kwargs):
|
||
"""
|
||
请求封装
|
||
:param yaml_data: 从yaml文件中读取出来的所有数据
|
||
:param dependent_switch:
|
||
:param kwargs:
|
||
:return:
|
||
"""
|
||
_is_run = yaml_data['is_run']
|
||
_url = yaml_data['url']
|
||
_method = yaml_data['method']
|
||
_detail = yaml_data['detail']
|
||
_headers = yaml_data['headers']
|
||
_requestType = yaml_data['requestType'].upper()
|
||
_data = yaml_data['data']
|
||
_sql = yaml_data['sql']
|
||
_teardown = yaml_data["teardown"]
|
||
# _teardown_sql = yaml_data[ 'teardown_sql']
|
||
res = None
|
||
yaml_data = eval(cache_regular(str(yaml_data)))
|
||
_assert = yaml_data['assert']
|
||
# 判断用例是否执行
|
||
if _is_run is True or _is_run is None:
|
||
# 处理多业务逻辑
|
||
# print("_requestType",_requestType)
|
||
# print(_requestType == "JSON")
|
||
if dependent_switch is True:
|
||
DependentCase(yaml_data).is_dependent() #执行setup_sql
|
||
if _requestType == "JSON":
|
||
#缓存里面正则替换
|
||
yaml_data = eval(cache_regular(str(yaml_data)))
|
||
# print("djskfhsfhksh",yaml_data)
|
||
# print(yaml_data['data'])
|
||
_data = yaml_data['data']
|
||
res = requests.request(method=_method, url=_url, json=_data,headers=yaml_data['headers'], verify=False, **kwargs)
|
||
elif _requestType == "PARAMS":
|
||
yaml_data = eval(cache_regular(str(yaml_data)))
|
||
_data = yaml_data['data']
|
||
url = yaml_data[ 'url']
|
||
if _data is not None:
|
||
# url 拼接的方式传参
|
||
params_data = "?"
|
||
for k, v in _data.items():
|
||
params_data += (k + "=" + str(v) + "&")
|
||
url = yaml_data[ 'url'] + params_data[:-1]
|
||
|
||
res = requests.request(method=_method, url=url, headers=yaml_data['headers'], verify=False, **kwargs)
|
||
# 判断上传文件
|
||
elif _requestType == 'FILE':
|
||
yaml_data = eval(cache_regular(str(yaml_data)))
|
||
multipart = self.upload_file(yaml_data)
|
||
res = requests.request(method=_method, url=yaml_data['url'], data=multipart[1], headers=multipart[0]['headers'], verify=False, **kwargs)
|
||
|
||
elif _requestType == "DATE":
|
||
yaml_data = eval(cache_regular(str(yaml_data)))
|
||
res = requests.request(method=_method, url=yaml_data['url'], data=_data, headers=yaml_data['headers'], verify=False, **kwargs)
|
||
|
||
|
||
_status_code = res.status_code
|
||
allure_step_no(f"请求URL: {yaml_data['url']}")
|
||
allure_step_no(f"请求方式: {_method}")
|
||
allure_step("请求头: ", _headers)
|
||
allure_step("请求数据: ", _data)
|
||
allure_step("预期数据: ", _assert)
|
||
_res_time = self.response_elapsed_total_seconds(res)
|
||
allure_step_no(f"响应耗时(s): {_res_time}")
|
||
try:
|
||
res = res.json()
|
||
allure_step("响应结果111: ", res)
|
||
except:
|
||
res = self.text_encode(res.text)
|
||
allure_step("响应结果222: ", res)
|
||
try:
|
||
cookie = res.cookies.get_dict()
|
||
except:
|
||
cookie = None
|
||
return self._check_params(res, yaml_data, _headers, cookie, _res_time, _status_code, _teardown,_assert)
|
||
# return self._check_params(res, yaml_data, _headers, cookie, _res_time,_status_code)
|
||
else:
|
||
# 用例跳过执行的话,响应数据和sql数据为空
|
||
return {"response_data": False, "sql_data": False, "yaml_data": yaml_data, "res_time": 0.00}
|
||
|