pytest-jequnauto/utils/read_files_tools/regular_control.py

270 lines
8.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Desc : 自定义函数调用
# @Time : 2022/4/2 9:32 上午
# @Author : 李晓杰
"""
import re
import datetime
import random
from datetime import date, timedelta, datetime
import base64
import hmac
import hashlib
from jsonpath import jsonpath
from faker import Faker
from utils.cache_process.cache_control import CacheHandler
from utils.logging_tool.log_control import ERROR
from utils.times_tool.time_control import get_GMT
class Context:
""" 正则替换 """
# 主要用来创建伪数据使用Faker包无需再手动生成或者手写随机数来生成数据
def __init__(self):
self.faker = Faker(locale='zh_CN')
@classmethod
def get_GMT(cls):
gmt_format = '%a, %d %b %Y %H:%M:%S GMT'
time = datetime.utcnow().strftime(gmt_format)
return time
#todo 还需判断method类型进行加密 \
@classmethod
def get_Authorization(cls, *args):
"""
获取Authorization
"""
from test_case.conftest import login_aceess
# login_aceess()
url, method = args
userId = CacheHandler.get_cache('userId')
userId = str(userId)
accessId = CacheHandler.get_cache('accessId')
accessKey = CacheHandler.get_cache('accessKey')
clientTime = get_GMT()
# url =url.split('?')[0]
# url = re.search('v.*', url).group()+userId
if url == "v1/users/":
url = url + userId
string= method.upper() + '\\n' + url + '\\n' + '0' + '\\n' + clientTime
# 哈希加密处理
message = string.encode()
key = accessKey.encode()
result = hmac.new(key, message, hashlib.sha1).digest() # 返回结果b'\xd5*\x01\xb0\xa4...
sha = base64.b64encode(result).decode()
auth = 'Jekun ' + accessId + ':' + sha
return auth
@classmethod
def get_accessKey(cls):
from utils.cache_process.cache_control import CacheHandler
accessKey = CacheHandler.get_cache('accessKey')
return accessKey[-10:]
@classmethod
def random_int(cls, *args) -> int:
"""
:return: 随机数
"""
_data = random.randint(int(args[0]), int(args[1]))
return _data
def get_phone(self) -> int:
"""
:return: 随机生成手机号码
"""
phone = self.faker.phone_number()
return phone
def get_id_number(self) -> int:
"""
:return: 随机生成身份证号码
"""
id_number = self.faker.ssn()
return id_number
def get_female_name(self) -> str:
"""
:return: 女生姓名
"""
female_name = self.faker.name_female()
return female_name
def get_male_name(self) -> str:
"""
:return: 男生姓名
"""
male_name = self.faker.name_male()
return male_name
def get_email(self) -> str:
"""
:return: 生成邮箱
"""
email = self.faker.email()
return email
@classmethod
def self_operated_id(cls):
"""自营店铺 ID """
operated_id = 212
return operated_id
@classmethod
def get_time(cls) -> str:
"""
计算当前时间
:return:
"""
now_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
return now_time
@classmethod
def today_date(cls):
"""获取今日0点整时间"""
_today = date.today().strftime("%Y-%m-%d") + " 00:00:00"
return str(_today)
@classmethod
def time_after_week(cls):
"""获取一周后12点整的时间"""
_time_after_week = (date.today() + timedelta(days=+6)).strftime("%Y-%m-%d") + " 00:00:00"
return _time_after_week
@classmethod
def host(cls) -> str:
from utils import config
""" 获取接口域名 """
return config.host
@classmethod
def app_host(cls) -> str:
from utils import config
"""获取app的host"""
return config.app_host
def sql_json(js_path, res):
""" 提取 sql中的 json 数据 """
_json_data = jsonpath(res, js_path)[0]
if _json_data is False:
raise ValueError(f"sql中的jsonpath获取失败 {res}, {js_path}")
return jsonpath(res, js_path)[0]
def sql_regular(value, res=None):
"""
这里处理sql中的依赖数据通过获取接口响应的jsonpath的值进行替换
:param res: jsonpath使用的返回结果
:param value:
:return:
"""
sql_json_list = re.findall(r"\$json\((.*?)\)\$", value)
for i in sql_json_list:
pattern = re.compile(r'\$json\(' + i.replace('$', "\$").replace('[', '\[') + r'\)\$')
key = str(sql_json(i, res))
value = re.sub(pattern, key, value, count=1)
return value
def cache_regular(value):
from utils.cache_process.cache_control import CacheHandler
"""
通过正则的方式,读取缓存中的内容
例:$cache{login_init}
:param value:
:return:
"""
# 正则获取 $cache{login_init123}中的值 --> login_init:123 cookie: $cache{login_cookie} --> login_cookie
# 只取()里面的值
regular_dates = re.findall(r"\$cache\{(.*?)\}", value)
# 拿到的是一个list循环数据
for regular_data in regular_dates:
value_types = ['int:', 'bool:', 'list:', 'dict:', 'tuple:', 'float:']
if any(i in regular_data for i in value_types) is True:
value_types = regular_data.split(":")[0]
regular_data = regular_data.split(":")[1]
# 返回一个正则表达式对象,方便调用
# re.compile("\\'\\$cache{int(.*?)}\\'")
pattern = re.compile(r'\'\$cache{' + value_types.split(":")[0] + r'(.*?)}\'')
else:
pattern = re.compile(
r'\$cache\{' + regular_data.replace('$', "\$").replace('[', '\[') + r'\}'
)
try:
# cache_data = Cache(regular_data).get_cache()
#
cache_data = CacheHandler.get_cache(regular_data)
# 使用sub方法替换已经拿到的内容
value = re.sub(pattern, str(cache_data), value)
except Exception:
pass
return value
#todo target
def regular(target,regular=r'\${{(\w.*?)}}'):
"""
新版本
使用正则替换请求数据
:return:
匹配出
"""
try:
regular_pattern = regular
while re.findall(regular_pattern, target):
key = re.search(regular_pattern, target).group(1)
value_types = ['int:', 'bool:', 'list:', 'dict:', 'tuple:', 'float:']
if any(i in key for i in value_types) is True:
func_name = key.split(":")[1].split("(")[0]
# 只解析出方法中的参数 ex:host('123')
value_name = key.split(":")[1].split("(")[1][:-1]
if value_name == "":
# Context文件里面封装了一些方案有的话直接调用比如调用了host()
value_data = getattr(Context(), func_name)()
else:
value_data = getattr(Context(), func_name)(*value_name.split(","))
if regular_pattern == r'\${{(.*?)}}':
regular_int_pattern = r'\'\${{(.*?)}}\''
target = re.sub(regular_int_pattern, str(value_data), target, 1)
else:
target = re.sub(r'\'\${(.*?)}\'', str(value_data), target, 1)
else:
func_name = key.split("(")[0]
value_name = key.split("(")[1][:-1]
if value_name == "":
value_data = getattr(Context(), func_name)()
else:
value_name = cache_regular(value_name)
value_data = getattr(Context(), func_name)(*value_name.split(","))
# value_data = getattr(Context(), func_name)(str(value_name))
target = re.sub(regular_pattern, str(value_data), target, 1)
return target
except AttributeError:
ERROR.logger.error("未找到对应的替换的数据, 请检查数据是否正确 %s", target)
raise
if __name__ == '__main__':
a = "${{host()}} aaa"
b = regular(a)