This commit is contained in:
tlh717 2023-03-08 14:36:05 +08:00
parent 76c8124fca
commit bd53ea81a5
63 changed files with 1021 additions and 0 deletions

8
.idea/.gitignore vendored Normal file
View File

@ -0,0 +1,8 @@
# 默认忽略的文件
/shelf/
/workspace.xml
# 基于编辑器的 HTTP 客户端请求
/httpRequests/
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml

29
Pipfile Normal file
View File

@ -0,0 +1,29 @@
[[source]]
url = "https://pypi.tuna.tsinghua.edu.cn/simple"
verify_ssl = true
name = "pypi"
[packages]
allure-pytest = "==2.11.1"
faker = "==15.1.1"
pandas = "*"
pymysql = "==1.0.2"
pytest = "==7.2.0"
pytest-ordering = "==0.6"
pytest-rerunfailures = "==11.1"
pyyaml = "==6.0"
requests = "==2.28.1"
click = "==8.1.3"
urllib3 = "==1.26.12"
loguru = "==0.6.0"
openpyxl = "==3.1.0"
ddddocr = "==1.4.7"
python-jenkins = "*"
dingtalkchatbot = "1.5.7"
pydantic = "==1.10.5"
jsonpath = "==0.82"
[dev-packages]
[requires]
python_version = "3.8"

158
README.md Normal file
View File

@ -0,0 +1,158 @@
**EasyTest接口自动化测试框架**
===
## 框架模式
基于 pytest + allure + yaml + mysql + 钉钉通知 + Jenkins 实现的接口自动化框架
* git地址: [https://gitee.com/t_l_h/EasyTest.git](https://gitee.com/t_l_h/EasyTest.git)
## 框架介绍
为了抛弃臃肿庞大的测试框架EasyTest将大部分代码逻辑通过conftest文件实现前置使得编写测试用例时无需导入各种乱七八糟的模块。
## 框架功能
* yaml管理测试数据实现测试数据分离
* 支持不同接口间的数据依赖
* 支持数据库的增、删、改、查
* 支持yaml文件中的动态参数自动替换
* 支持测试完成后发送钉钉消息通知
* 可轻易集成jenkins
## 遇到问题
* 可以直接提issue或联系v:Tlh_0717
## 目录结构
```
├── files 存放接口上传的文件
├── test_cases 存放测试用例
├── test_data yaml格式测试数据
├── utils 存放各种封装方法
├── conftest.py pytest钩子函数
├── pytest.ini pytest执行脚本参数配置文件
├── run.py 全量执行用例
├── send_ding.py 钉钉消息通知
├── config.yml 全局配置
├── requirements.txt 依赖文件
└── Pipfile 虚拟环境依赖文件
```
### 使用教程
#### 1、Gitee 拉取项目
需要先配置好python、jdk、allure环境(不懂的自行百度)
```shell
git clone https://gitee.com/tanlinhai_code/EasyTest.git
```
#### 2、安装依赖
方式一:
直接安装依赖文件
```shell
pip install -r requirements.txt
```
方式二:
使用pipenv创建虚拟环境运行
```shell
"""配置虚拟环境"""
pipenv install # 根据根目录下的Pipfile创建一个新环境
pipenv --venv # 查看虚拟环境路径
/Users/用户/.local/share/virtualenvs/api_autotest-J3yMsRGU # 创建的虚拟环境地址
```
```shell
"""激活虚拟环境"""
pipenv shell # 激活虚拟环境
exit # 退出虚拟环境
```
## 配置项目的通用配置
![img.png](files/testconfig.png)
## 编写测试用例
### 创建yaml测试文件
![img.png](files/testdata.png)
在test_data目录下的login目录中创建yaml文件必须是二级目录下创建通常业务也会划分模块。
字段说明:
* common_inputs: 请求方法,请求路径(只需要写域名后的路径即可)
* case: 用例名
* inputs: 用例输入
* params: 请求为get类型时填写
* json: 请求为post类型时填写
* file: 请求上传的文件名文件需要放在files目录下
* sql: sql语句
* expectation: 用例输出
* response: 接口返回数据体
创建好了yaml文件就可以创建测试用例文件了
### 创建测试用例文件
![img.png](files/testcase.png)
同样的在test_cases下的login目录中创建yaml文件目录层级与test_data保持一致
字段说明:
* allure.feature: 模块名称
* allure.title: 用例名称
* pytest.mark.imports 用imports标记这条用例后续可以执行指定标记的用例
* pytest.mark.datafile: 需要使用的yaml测试数据需要从test_data目录开始写千万不能写错
* 测试函数必须test_*开头,重点说各个入参的字段
* requests: 封装好的实例化请求方法根据请求方式直接调用requests.post或requests.get如图中的res所示
* url: 由根目录config.yml文件中读取的路由及该测试用例对应的yaml文件中读取的path拼接无需填写只需正常传入
* headers: 以静态方法存放在utils.requests_control文件的RestClient类中可按需更改
* case: 用例对应yaml文件中的case名称可在做不同校验时作为判断条件
* inputs: 用例对应yaml文件中inputs内容使用时直接用获取字典值的方式获取
1、如果是上传文件的接口可以参考前面截图中的样式填写只需要填写files目录下的文件名即可
2、如果请求参数需要动态参数需要用"{{xxx}}"这种双引号和双括号包裹的方式填写名称必须是utils.fake_data_control.py文件Execute类中定义的方法名可以按需添加
* expectation: 用例对应yaml文件中expectation内容使用时可以直接用字典值方式获取
* sql: 数据库实例化方法可以通过sql.query('xxxx'), sql.execute('xxxxx')方式调用使用时直接使用yaml文件中的sql语句sql.query(inputs['sql'])(当然也可以直接写)
* cache: 共享的缓存数据这个使用起来主要注意例如在test_1中我需要使用某个数据给test_2使用那么可以在test_1中使用cache['key'] = 1 这样的方式把数据存入全局的共享数据中test_2使用时就用字典取值的方式cache['key']即可
,那么就有人问了,那多个数据依赖怎么办呢?目前的办法是将会产出依赖数据的接口打上@pytest.mark.run(order=1)的标签并且存入共享数据时的key必须不重复
这样可以理解为用例执行开始前有一个空字典,在测试过程中不断向其中添加依赖数据,而在后面执行的用例只需要从里面取就可以了
### 运行测试用例
```shell
python run.py # 全量执行用例生成测试报告保存在allure-report目录下
pipenv run.py -m login # 执行被@pytest.mark.login标记的所有用例
python send_ding.py # 发送测试结果到钉钉通知,如果jenkins和config.yml都配置好了可以直接使用
```
钉钉通知样式:
![img.png](files/testding.png)
## 最后
框架还有许多能优化的地方,如果有什么好的建议欢迎一起来讨论。

Binary file not shown.

Binary file not shown.

35
config.yml Normal file
View File

@ -0,0 +1,35 @@
project_name: 项目名称
env: 测试环境
tester_name: xxx
# 测试域名
host: https://xxxxx.com
# 登录用户信息
account: xxxxxxx
password: xxxxxxxxxxxxx
# jenkins相关配置
jenkins:
# 本地服务
url: http://127.0.0.1:8080
# 如果有映射网址,可以填在这里
mapping_url: xxxxxxx
# jenkins账号
user:
# jenkins密码
pwd:
# 要推送的项目名
project:
# 钉钉消息通知相关配置
ding_talk:
webhook:
# 数据库相关配置
mysql_db:
host:
user:
password:
port:

82
conftest.py Normal file
View File

@ -0,0 +1,82 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import time
import pytest
from utils.path import root
from itertools import zip_longest
from utils.database_control import MysqlDB
from utils.fake_data_control import Execute
from utils.requests_control import RestClient
from utils.open_file_control import open_file
from utils.read_yaml_control import HandleYaml
def pytest_generate_tests(metafunc):
"""参数化测试函数"""
case, url, inputs, expectation = [], [], [], []
markers = metafunc.definition.own_markers
for marker in markers:
if marker.name == 'datafile':
test_data_path = os.path.join(metafunc.config.rootdir, marker.args[0]) if marker.args else ...
test_data = HandleYaml(test_data_path).get_yaml_data()
# if 'cases' in metafunc.fixturenames:
for data in test_data['tests']:
for k, v in list(data['inputs']['json'].items()):
if str(v)[0:2] == '{{' and str(v)[-2:] == '}}':
val = str(v)[2:-2]
data['inputs']['json'][k] = Execute(val)() # 替换yaml文件json中{{xxx}}xxx的值
if data['inputs']['file']:
file_name = str(data['inputs']['file'])
data['inputs']['file'] = eval(f"open_file(f'{str(file_name)}')") # 直接返回open_file对象
case.append(data['case'] if data['case'] is not None else {})
url.append(str(host()) + str(test_data['common_inputs']['path']) if test_data['common_inputs'][
'path'] is not None else {})
inputs.append(data['inputs'] if data['inputs'] is not None else {})
expectation.append(data['expectation'] if data['expectation'] is not None else {})
metafunc.parametrize("case, url, inputs, expectation", zip_longest(case, url, inputs, expectation),
scope='function')
def pytest_collection_modifyitems(items):
"""测试用例收集完成时将收集到的item的name和nodeid的中文显示"""
for item in items:
item.name = item.name.encode("utf-8").decode("unicode_escape")
item._nodeid = item.nodeid.encode("utf-8").decode("unicode_escape")
@pytest.fixture()
def headers():
"""通用请求头"""
yield RestClient().headers()
def host():
"""获取配置文件中的域名"""
res_host = HandleYaml(root / 'config.yml').get_yaml_data()['host']
return res_host
@pytest.fixture()
def requests():
"""返回实例化请求方法"""
yield RestClient()
@pytest.fixture()
def sql():
"""返回实例化数据库方法"""
yield MysqlDB()
@pytest.fixture(scope='session')
def cache():
"""返回一个字典,用作数据共享"""
yield {}

BIN
files/avatar_new.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 27 KiB

BIN
files/import.xlsx Normal file

Binary file not shown.

BIN
files/testcase.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 67 KiB

BIN
files/testconfig.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 60 KiB

BIN
files/testdata.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 48 KiB

BIN
files/testding.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 39 KiB

BIN
img.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 46 KiB

8
pytest.ini Normal file
View File

@ -0,0 +1,8 @@
[pytest]
addopts = -p no:warnings -s
filterwarnings = ignore:Module already imported:pytest.PytestWarning
markers =
datafile: hook marker
login: login
resource: resource
user: user

20
requirements.txt Normal file
View File

@ -0,0 +1,20 @@
-i https://pypi.tuna.tsinghua.edu.cn/simple
allure-pytest==2.11.1
Faker==15.1.1
pandas==1.5.3
openpyxl==3.1.0
PyMySQL==1.0.2
pytest==7.2.0
pytest-ordering==0.6
pytest-rerunfailures==11.1
PyYAML==6.0
requests==2.28.1
click==8.1.3
urllib3==1.26.12
python-jenkins==1.7.0
loguru==0.6.0
ddddocr==1.4.7
dingtalkchatbot==1.5.7
pydantic==1.10.5
jsonpath~=0.82

19
run.py Normal file
View File

@ -0,0 +1,19 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import pytest
import click
@click.command()
@click.option('--mark', '-m', default='', help='传入被标记的case套件, 例: -m login')
def run(mark):
pytest.main(['test_cases', f'-m={mark}', '--clean-alluredir', '--alluredir=allure-results'])
os.system("allure generate -c -o allure-report")
if __name__ == '__main__':
print("""
开始执行项目...
""")
run()

120
send_ding.py Normal file
View File

@ -0,0 +1,120 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import json
import time
import datetime
from utils import config
from utils.path import root
import click
import jenkins
import urllib3
from utils.fake_data_control import Execute
from utils.read_yaml_control import HandleYaml
from dingtalkchatbot.chatbot import DingtalkChatbot
yaml_data = HandleYaml(root / 'config.yml').get_yaml_data()
url = config.jenkins.url
user = config.jenkins.user
password = config.jenkins.pwd
project_name = config.jenkins.project
mapping_url = config.jenkins.mapping_url
robot_webhook = config.ding_talk.webhook
class JenkinsContest:
def __init__(self):
urllib3.disable_warnings()
# jenkins的IP地址
self.jenkins_url = mapping_url
# jenkins用户名和密码
self.server = jenkins.Jenkins(self.jenkins_url, username=user, password=password)
def jenkins_content_info(self):
result_job = self.server.get_jobs()
# jobs_name = result_job[0]["name"]
job_name = project_name
job_url = self.server.get_job_info(job_name)['url'].replace(url, mapping_url)
job_last_number = self.server.get_job_info(job_name)['lastBuild']['number']
job_result = self.server.get_build_info(job_name, job_last_number)['result']
report_url = job_url + str(job_last_number) + '/allure'
return result_job, job_name, job_url, job_last_number, report_url, job_result
class SendDingTalk(JenkinsContest):
def __init__(self):
super().__init__()
self.result_job, self.job_name, self.job_url, self.job_last_number, self.report_url, self.job_result = self.jenkins_content_info()
def send_ding(self):
content = {}
file_path = root / 'allure-report/export/prometheusData.txt'
f = open(file_path)
for line in f.readlines():
launch_name = line.strip('\n').split(' ')[0]
num = line.strip('\n').split(' ')[1]
content.update({launch_name: num})
f.close()
passed_num = content['launch_status_passed'] # 通过数量
failed_num = content['launch_status_failed'] # 失败数量
broken_num = content['launch_status_broken'] # 阻塞数量
skipped_num = content['launch_status_skipped'] # 跳过数量
case_num = content['launch_retries_run'] # 总数量
run_duration = content['launch_time_duration']
print(self.job_result)
if self.job_result == 'SUCCESS':
job_result = '成功'
elif self.job_result == 'FAILURE':
job_result = '失败'
elif self.job_result == 'ABORTED':
job_result = '中止'
else:
job_result = '悬挂'
json_path = root / 'allure-report/widgets/summary.json'
with open(json_path) as f:
res = json.load(f)
today = str(datetime.date.today())
s = time.localtime(res['time']['start'] / 1000)
report_time = time.strftime("%Y-%m-%d", s)
if report_time == today:
text = f'### **{self.job_name}接口自动化通知**\n' \
f"**Jenkins构建结果: {job_result}**\n\n" \
f"**测试环境: 线上环境**\n\n" \
f"**时间: {Execute('now_time')()}**\n\n" \
"------------\n\n" \
f"### **执行结果**\n\n" \
f"**成功率: <font color='#00dd00'>{round(int(passed_num) / int(case_num) * 100)}%</font>**\n\n" \
f"**总用例数: <font color='#0000FF'>{case_num}</font>**\n\n" \
f"**成功用例数: <font color='#008000'>{passed_num}</font>**\n\n" \
f"**失败用例数: <font color='#FF0000'>{failed_num}</font>**\n\n" \
f"**异常用例数: <font color='#FF0000'>{broken_num}</font>**\n\n" \
f"**跳过用例数: <font color='#FFA500'>{skipped_num}</font>**\n\n" \
f"**本次执行耗时: {round(int(run_duration) / 1000)}秒**\n\n" \
"------------\n\n" \
f"**测试报告:** [点击查看]({self.report_url}) \n\n" \
f"**测试地址:** {yaml_data['host']} \n"
else:
text = f'### **{self.job_name}自动化通知**\n' \
f"**Jenkins构建结果: {job_result}**\n\n" \
"------------\n\n" \
f"**失败原因: <font color='#FF0000'>网络或代理已断开</font>**\n\n" \
"------------\n\n" \
f"**构建日志:** [点击查看]({self.job_url + str(self.job_last_number) + '/console'}) \n"
title = f'接口自动化通知'
send_msg = DingtalkChatbot(webhook=robot_webhook)
send_msg.send_markdown(title=title,
text=text,
is_at_all=True)
if __name__ == '__main__':
SendDingTalk().send_ding()

3
test_cases/__init__.py Normal file
View File

@ -0,0 +1,3 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-

Binary file not shown.

View File

Binary file not shown.

View File

@ -0,0 +1,24 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import allure
import pytest
@allure.feature('XXX模块')
@allure.title('XXX接口')
@pytest.mark.imports
@pytest.mark.datafile('test_data/import/test_import.yml')
def test_import(requests, url, headers, case, inputs, expectation, sql, cache):
sqls = sql.query('select * from uclass.users limit 10')
print(requests,
url,
headers,
case,
inputs,
expectation,
sqls,
cache)
res = requests.post(url, headers=headers, files=inputs['file']).json()
assert res == expectation['response']

View File

Binary file not shown.

View File

@ -0,0 +1,13 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import allure
import pytest
@allure.feature('登录模块')
@allure.title('登录接口')
@pytest.mark.login
@pytest.mark.datafile('test_data/login/test_login.yml')
def test_login(requests, url, headers, case, inputs, expectation):
res = requests.post(url, json=inputs['json'], headers=headers).json()
assert res == expectation['response']

View File

@ -0,0 +1,23 @@
common_inputs:
method: POST
path: xxx/import
tests:
- case: 更换照片
inputs:
params: {}
json: {"key": "{{random_phone}}"}
file: "import.xlsx"
sql:
expectation:
response: {"code": 0}
- case: 更换照片2
inputs:
params: {}
json: {"key": "{{random_num}}"}
file: "import.xlsx"
sql:
expectation:
response: {"code": 0}

View File

@ -0,0 +1,23 @@
common_inputs:
method: POST
path: "xxx/login"
tests:
- case: 正确账号、密码登录
inputs:
params: {}
json: {"account": "{{random_phone}}", "password": "xxxxxxxxxx"}
file:
sql:
expectation:
db_data:
response: {'code': 12, 'msg': '手机号或密码错误'}
- case: 错误账号、密码登录
inputs:
params: {}
json: {"account": "18900001000", "password": "xxxxxx"}
expectation:
db_data:
response:
code: 0

7
utils/__init__.py Normal file
View File

@ -0,0 +1,7 @@
# -*- coding: utf-8 -*-
from utils.path import root
from utils.models import Config
from utils.read_yaml_control import HandleYaml
config_data = HandleYaml(root / 'config.yml').get_yaml_data()
config = Config(**config_data)

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

33
utils/allure_control.py Normal file
View File

@ -0,0 +1,33 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import json
import allure
from typing import Any, Dict, Union, Optional
class ReportStyle:
"""allure 报告样式"""
@staticmethod
def allure_step(step: str, var: Optional[Union[str, Dict[str, Any]]] = None):
with allure.step(step):
allure.attach(
json.dumps(var, ensure_ascii=False, indent=4),
step,
allure.attachment_type.JSON,
)
@staticmethod
def title(title: str):
allure.dynamic.title(title)
@staticmethod
def allure_step_no(step: str):
"""
无附件的操作步骤
:param step: 步骤名称
:return:
"""
with allure.step(step):
pass

74
utils/database_control.py Normal file
View File

@ -0,0 +1,74 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import pymysql
from utils import config
from loguru import logger
class MysqlDB:
""" 数据库 封装 """
def __init__(self):
try:
# 建立数据库连接
self.conn = pymysql.connect(
host=config.mysql_db.host,
user=config.mysql_db.user,
password=config.mysql_db.password,
port=config.mysql_db.port,
)
# 使用 cursor 方法获取操作游标得到一个可以执行sql语句并且操作结果为字典返回的游标
self.cur = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
except AttributeError as error:
logger.error("数据库连接失败,失败原因 %s", error)
def __del__(self):
try:
# 关闭游标
self.cur.close()
# 关闭连接
self.conn.close()
except AttributeError as error:
logger.error("数据库连接失败,失败原因 %s", error)
def query(self, sql, state="all"):
"""
查询
:param sql:
:param state: all 是默认查询全部
:return:
"""
try:
self.cur.execute(sql)
if state == "all":
# 查询全部
data = self.cur.fetchall()
else:
# 查询单条
data = self.cur.fetchone()
return data
except AttributeError as error_data:
logger.error("数据库连接失败,失败原因 %s", error_data)
raise
def execute(self, sql):
"""
更新 删除 新增
:param sql:
:return:
"""
try:
# 使用 execute 操作 sql
rows = self.cur.execute(sql)
# 提交事务
self.conn.commit()
return rows
except AttributeError as error:
logger.error("数据库连接失败,失败原因 %s", error)
# 如果事务异常,则回滚数据
self.conn.rollback()
raise

View File

@ -0,0 +1,51 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import json
from loguru import logger
from utils.allure_control import ReportStyle
class Log:
"""日志操作装饰器"""
def __init__(self, switch=True):
self._switch = switch
def __call__(self, func):
def wrapper(*args, **kwargs):
if self._switch is True:
res = func(*args, **kwargs)
_url = args[1] if args[1] else {}
_method = args[2] if args[2] else {}
if _method == 'post':
_headers = args[5] if args[5] else {}
_data = args[3] if args[3] else {}
_json = args[4] if args[4] else {}
elif _method == 'get':
_headers = args[3] if args[3] else {}
_data = kwargs if kwargs else {}
_json = kwargs if kwargs else {}
r = res.text
try:
resp = json.loads(r)
except json.decoder.JSONDecodeError:
logger.info(f'请求返回结果为text')
resp = res.status_code
logger.info("\n===============================================================")
res_info = f"请求地址: {_url}\n" \
f"请求方法: {_method.upper()}\n" \
f"请求头: {_headers}\n" \
f"请求数据: {_data if _data else _json}\n\n" \
f"响应数据: {resp}\n" \
f"响应耗时(ms): {float(round(res.elapsed.total_seconds() * 1000))}\n" \
f"接口响应码: {res.status_code}"
logger.info(res_info)
ReportStyle.allure_step_no(f"请求地址: {_url}")
ReportStyle.allure_step_no(f"请求方法: {_method.upper()}")
ReportStyle.allure_step("请求头", _headers)
ReportStyle.allure_step("请求数据", _data if _data else _json)
ReportStyle.allure_step_no(f"接口响应码: {res.status_code}")
ReportStyle.allure_step_no(f"响应耗时(ms): {round(res.elapsed.total_seconds() * 1000)}")
ReportStyle.allure_step("响应数据", resp)
return res
return wrapper

110
utils/fake_data_control.py Normal file
View File

@ -0,0 +1,110 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import inspect
import random
from faker import Faker
from datetime import datetime
class Execute:
""" Mock数据 """
def __init__(self, func_name):
self.faker = Faker(locale='zh_CN')
self.func_name = func_name
self.func_map = self.func_list()
def __call__(self, *args, **kwargs):
func = self.func_map.get(self.func_name, None)
if func is not None and callable(func):
return func()
else:
print("未获取到该字段对应方法,请检查")
def func_list(self):
"""
:return: 返回除内置方法外类中的所有其他方法
"""
func_list = {}
all_method = inspect.getmembers(self, inspect.ismethod)
for name in all_method:
func_list[name[0]] = eval(f'self.{name[0]}') if '__' not in str(name[0]) else ...
return func_list
@classmethod
def random_int(cls):
"""
:return: 随机数
"""
_data = random.randint(0, 5000)
return _data
def random_phone(self):
"""
:return: 随机生成手机号码
"""
return '1140124' + str(self.faker.phone_number()[7:])
def random_id_number(self):
"""
:return: 随机生成身份证号码
"""
id_number = self.faker.ssn()
return id_number
def random_female_name(self):
"""
:return: 女生姓名
"""
female_name = self.faker.name_female()
return female_name
def random_male_name(self):
"""
:return: 男生姓名
"""
male_name = self.faker.name_male()
return male_name
def random_email(self):
"""
:return: 生成邮箱
"""
email = self.faker.email()
return email
@classmethod
def now_time(cls):
"""
计算当前时间
:return:
"""
now_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
return now_time
@classmethod
def now_time_stamp(cls):
"""
计算当前时间戳(秒级)
:return:
"""
now_time_stamp = datetime.now().timestamp()
return int(now_time_stamp)
def random_num(self):
"""
:return: 随机1~10数字
"""
num = random.randint(1, 10)
return num
if __name__ == '__main__':
r = Execute('random_num')()
print(r)

View File

@ -0,0 +1,25 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from utils.path import root
from jsonpath import jsonpath
from utils.read_yaml_control import HandleYaml
class Authentication:
"""获取token/cookies"""
def __init__(self):
self.handle_yaml = HandleYaml(root / 'config.yml')
self.payload = {"account": self.handle_yaml.get_yaml_data()['account'],
"password": self.handle_yaml.get_yaml_data()['password'],
"isVaildCode": False}
@property
def cookie_token(self):
import requests
res = requests.post('https://xxxxxxx/login', data=self.payload)
res_cookies, res_token = res.cookies, jsonpath(res.json(), '$..token')[0]
return res_cookies, res_token
cookies, token = Authentication().cookie_token

37
utils/models.py Normal file
View File

@ -0,0 +1,37 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from typing import Union
from pydantic import BaseModel
class Jenkins(BaseModel):
url: Union[str, None]
mapping_url: Union[str, None]
user: Union[str, int, None]
pwd: Union[str, int, None]
project: Union[str, None]
class DingTalk(BaseModel):
webhook: Union[str, None]
class MySqlDB(BaseModel):
host: Union[str, None]
user: Union[str, None]
password: Union[str, None]
port: Union[int, None]
class Config(BaseModel):
project_name: Union[str, None]
env: Union[str, None]
tester_name: Union[str, None]
host: Union[str, None]
account: Union[str, int, None]
password: Union[str, int, None]
jenkins: "Jenkins"
ding_talk: "DingTalk"
mysql_db: "MySqlDB"

View File

@ -0,0 +1,16 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from utils.path import root
def open_file(file):
"""封装接口请求打开文件 函数"""
file_path = str(root / f'files/{file}')
file_name = file_path.split('/')[-1]
return {'file': (file_name, open(file_path, 'rb'), 'application/json')}
if __name__ == '__main__':
r = open_file('avatar_new.png')
print(r)

5
utils/path.py Normal file
View File

@ -0,0 +1,5 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from pathlib import Path
root = Path(__file__).resolve().parents[1]

View File

@ -0,0 +1,26 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import yaml
class HandleYaml:
def __init__(self, file_path):
self._file_path = file_path
def get_yaml_data(self):
"""读取yaml文件数据"""
if os.path.exists(self._file_path):
with open(self._file_path, 'r', encoding='utf-8') as data:
res = yaml.load(data, Loader=yaml.FullLoader)
else:
raise FileNotFoundError("文件路径不存在,请检查")
return res
if __name__ == '__main__':
pass

72
utils/requests_control.py Normal file
View File

@ -0,0 +1,72 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import urllib3
import requests
from utils.decorator_control import Log
from utils.get_authentication_control import Authentication
cookie, token = Authentication().cookie_token
class RestClient:
"""封装api请求类"""
def __init__(self):
urllib3.disable_warnings()
self.sesson = requests.session() # 创建会话对象
self.sesson.cookies = cookie
def get(self, url, headers=None, **kwargs):
return self.request(url, "get", headers, verify=False, **kwargs)
def post(self, url, data=None, json=None, headers=None, **kwargs):
return self.request(url, "post", data, json, headers, verify=False, **kwargs)
def options(self, url, **kwargs):
return self.request(url, "options", verify=False, **kwargs)
def head(self, url, **kwargs):
return self.request(url, "head", **kwargs)
def put(self, url, data=None, **kwargs):
return self.request(url, "put", data, verify=False, **kwargs)
def patch(self, url, data=None, json=None, **kwargs):
return self.request(url, "patch", data, json, verify=False, **kwargs)
def delete(self, url, **kwargs):
return self.request(url, "delete", verify=False, **kwargs)
@Log(True)
def request(self, url, request_method, data=None, json=None, headers=None, **kwargs):
if request_method == 'get':
res = self.sesson.get(url, headers=headers, **kwargs)
elif request_method == 'post':
res = self.sesson.post(url, json, data, headers=headers, **kwargs)
elif request_method == 'options':
res = self.sesson.options(url, **kwargs)
elif request_method == 'head':
res = self.sesson.head(url, **kwargs)
elif request_method == 'put':
res = self.sesson.put(url, data, **kwargs)
elif request_method == 'patch':
data = json.dump(json) if json else ...
res = self.sesson.patch(url, data, **kwargs)
elif request_method == 'delete':
res = self.sesson.delete(url, **kwargs)
return res
@staticmethod
def headers():
"""全局headers"""
headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36',
'ci-token': token
}
return headers
if __name__ == '__main__':
pass