270 lines
8.1 KiB
Python
270 lines
8.1 KiB
Python
"""
|
||
Desc : 自定义函数调用
|
||
# @Time : 2022/4/2 9:32 上午
|
||
# @Author : 李晓杰
|
||
"""
|
||
import re
|
||
import datetime
|
||
import random
|
||
from datetime import date, timedelta, datetime
|
||
|
||
import base64
|
||
import hmac
|
||
import hashlib
|
||
|
||
from jsonpath import jsonpath
|
||
from faker import Faker
|
||
|
||
from utils.cache_process.cache_control import CacheHandler
|
||
from utils.logging_tool.log_control import ERROR
|
||
from utils.times_tool.time_control import get_GMT
|
||
|
||
|
||
|
||
class Context:
|
||
""" 正则替换 """
|
||
# 主要用来创建伪数据,使用Faker包,无需再手动生成或者手写随机数来生成数据
|
||
def __init__(self):
|
||
self.faker = Faker(locale='zh_CN')
|
||
|
||
@classmethod
|
||
def get_GMT(cls):
|
||
gmt_format = '%a, %d %b %Y %H:%M:%S GMT'
|
||
time = datetime.utcnow().strftime(gmt_format)
|
||
return time
|
||
|
||
|
||
#todo 还需判断method类型进行加密 \
|
||
@classmethod
|
||
def get_Authorization(cls, *args):
|
||
"""
|
||
获取Authorization
|
||
"""
|
||
from test_case.conftest import login_aceess
|
||
# login_aceess()
|
||
url, method = args
|
||
userId = CacheHandler.get_cache('userId')
|
||
userId = str(userId)
|
||
accessId = CacheHandler.get_cache('accessId')
|
||
accessKey = CacheHandler.get_cache('accessKey')
|
||
clientTime = get_GMT()
|
||
# url =url.split('?')[0]
|
||
# url = re.search('v.*', url).group()+userId
|
||
if url == "v1/users/":
|
||
url = url + userId
|
||
string= method.upper() + '\\n' + url + '\\n' + '0' + '\\n' + clientTime
|
||
# 哈希加密处理
|
||
message = string.encode()
|
||
key = accessKey.encode()
|
||
result = hmac.new(key, message, hashlib.sha1).digest() # 返回结果:b'\xd5*\x01\xb0\xa4...
|
||
sha = base64.b64encode(result).decode()
|
||
auth = 'Jekun ' + accessId + ':' + sha
|
||
return auth
|
||
|
||
|
||
@classmethod
|
||
def get_accessKey(cls):
|
||
from utils.cache_process.cache_control import CacheHandler
|
||
accessKey = CacheHandler.get_cache('accessKey')
|
||
return accessKey[-10:]
|
||
|
||
@classmethod
|
||
def random_int(cls, *args) -> int:
|
||
"""
|
||
:return: 随机数
|
||
"""
|
||
_data = random.randint(int(args[0]), int(args[1]))
|
||
return _data
|
||
|
||
def get_phone(self) -> int:
|
||
"""
|
||
:return: 随机生成手机号码
|
||
"""
|
||
phone = self.faker.phone_number()
|
||
return phone
|
||
|
||
def get_id_number(self) -> int:
|
||
"""
|
||
|
||
:return: 随机生成身份证号码
|
||
"""
|
||
|
||
id_number = self.faker.ssn()
|
||
return id_number
|
||
|
||
def get_female_name(self) -> str:
|
||
"""
|
||
|
||
:return: 女生姓名
|
||
"""
|
||
female_name = self.faker.name_female()
|
||
return female_name
|
||
|
||
def get_male_name(self) -> str:
|
||
"""
|
||
|
||
:return: 男生姓名
|
||
"""
|
||
male_name = self.faker.name_male()
|
||
return male_name
|
||
|
||
def get_email(self) -> str:
|
||
"""
|
||
|
||
:return: 生成邮箱
|
||
"""
|
||
email = self.faker.email()
|
||
return email
|
||
|
||
@classmethod
|
||
def self_operated_id(cls):
|
||
"""自营店铺 ID """
|
||
operated_id = 212
|
||
return operated_id
|
||
|
||
@classmethod
|
||
def get_time(cls) -> str:
|
||
"""
|
||
计算当前时间
|
||
:return:
|
||
"""
|
||
now_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||
return now_time
|
||
|
||
@classmethod
|
||
def today_date(cls):
|
||
"""获取今日0点整时间"""
|
||
|
||
_today = date.today().strftime("%Y-%m-%d") + " 00:00:00"
|
||
return str(_today)
|
||
|
||
@classmethod
|
||
def time_after_week(cls):
|
||
"""获取一周后12点整的时间"""
|
||
|
||
_time_after_week = (date.today() + timedelta(days=+6)).strftime("%Y-%m-%d") + " 00:00:00"
|
||
return _time_after_week
|
||
|
||
@classmethod
|
||
def host(cls) -> str:
|
||
from utils import config
|
||
""" 获取接口域名 """
|
||
return config.host
|
||
|
||
@classmethod
|
||
def app_host(cls) -> str:
|
||
from utils import config
|
||
"""获取app的host"""
|
||
return config.app_host
|
||
|
||
|
||
def sql_json(js_path, res):
|
||
""" 提取 sql中的 json 数据 """
|
||
_json_data = jsonpath(res, js_path)[0]
|
||
if _json_data is False:
|
||
raise ValueError(f"sql中的jsonpath获取失败 {res}, {js_path}")
|
||
return jsonpath(res, js_path)[0]
|
||
|
||
|
||
def sql_regular(value, res=None):
|
||
"""
|
||
这里处理sql中的依赖数据,通过获取接口响应的jsonpath的值进行替换
|
||
:param res: jsonpath使用的返回结果
|
||
:param value:
|
||
:return:
|
||
"""
|
||
sql_json_list = re.findall(r"\$json\((.*?)\)\$", value)
|
||
|
||
for i in sql_json_list:
|
||
pattern = re.compile(r'\$json\(' + i.replace('$', "\$").replace('[', '\[') + r'\)\$')
|
||
key = str(sql_json(i, res))
|
||
value = re.sub(pattern, key, value, count=1)
|
||
|
||
return value
|
||
|
||
|
||
def cache_regular(value):
|
||
from utils.cache_process.cache_control import CacheHandler
|
||
|
||
"""
|
||
通过正则的方式,读取缓存中的内容
|
||
例:$cache{login_init}
|
||
:param value:
|
||
:return:
|
||
"""
|
||
# 正则获取 $cache{login_init:123}中的值 --> login_init:123 cookie: $cache{login_cookie} --> login_cookie
|
||
# 只取()里面的值
|
||
regular_dates = re.findall(r"\$cache\{(.*?)\}", value)
|
||
|
||
# 拿到的是一个list,循环数据
|
||
for regular_data in regular_dates:
|
||
value_types = ['int:', 'bool:', 'list:', 'dict:', 'tuple:', 'float:']
|
||
if any(i in regular_data for i in value_types) is True:
|
||
value_types = regular_data.split(":")[0]
|
||
regular_data = regular_data.split(":")[1]
|
||
# 返回一个正则表达式对象,方便调用
|
||
# re.compile("\\'\\$cache{int(.*?)}\\'")
|
||
pattern = re.compile(r'\'\$cache{' + value_types.split(":")[0] + r'(.*?)}\'')
|
||
else:
|
||
pattern = re.compile(
|
||
r'\$cache\{' + regular_data.replace('$', "\$").replace('[', '\[') + r'\}'
|
||
)
|
||
try:
|
||
# cache_data = Cache(regular_data).get_cache()
|
||
#
|
||
cache_data = CacheHandler.get_cache(regular_data)
|
||
# 使用sub方法,替换已经拿到的内容
|
||
value = re.sub(pattern, str(cache_data), value)
|
||
except Exception:
|
||
pass
|
||
return value
|
||
|
||
#todo target
|
||
def regular(target,regular=r'\${{(\w.*?)}}'):
|
||
"""
|
||
新版本
|
||
使用正则替换请求数据
|
||
:return:
|
||
匹配出
|
||
"""
|
||
try:
|
||
regular_pattern = regular
|
||
while re.findall(regular_pattern, target):
|
||
key = re.search(regular_pattern, target).group(1)
|
||
value_types = ['int:', 'bool:', 'list:', 'dict:', 'tuple:', 'float:']
|
||
if any(i in key for i in value_types) is True:
|
||
func_name = key.split(":")[1].split("(")[0]
|
||
# 只解析出方法中的参数 ex:host('123')
|
||
value_name = key.split(":")[1].split("(")[1][:-1]
|
||
if value_name == "":
|
||
# Context文件里面封装了一些方案,有的话直接调用,比如调用了host()
|
||
value_data = getattr(Context(), func_name)()
|
||
else:
|
||
value_data = getattr(Context(), func_name)(*value_name.split(","))
|
||
if regular_pattern == r'\${{(.*?)}}':
|
||
regular_int_pattern = r'\'\${{(.*?)}}\''
|
||
target = re.sub(regular_int_pattern, str(value_data), target, 1)
|
||
else:
|
||
target = re.sub(r'\'\${(.*?)}\'', str(value_data), target, 1)
|
||
else:
|
||
func_name = key.split("(")[0]
|
||
value_name = key.split("(")[1][:-1]
|
||
if value_name == "":
|
||
value_data = getattr(Context(), func_name)()
|
||
else:
|
||
value_name = cache_regular(value_name)
|
||
value_data = getattr(Context(), func_name)(*value_name.split(","))
|
||
# value_data = getattr(Context(), func_name)(str(value_name))
|
||
target = re.sub(regular_pattern, str(value_data), target, 1)
|
||
return target
|
||
|
||
except AttributeError:
|
||
ERROR.logger.error("未找到对应的替换的数据, 请检查数据是否正确 %s", target)
|
||
raise
|
||
|
||
if __name__ == '__main__':
|
||
a = "${{host()}} aaa"
|
||
b = regular(a)
|
||
|
||
|