1.重构钉钉机器人发送allure报告,支持解析allure执行结果同步发送文字版测试报告

2.归拢配置信息,剔除硬编码
This commit is contained in:
caiweichao 2022-03-30 21:22:58 +08:00
parent 0cbfe8db14
commit 66ad560577
16 changed files with 370 additions and 139 deletions

View File

@ -17,17 +17,19 @@ from selenium.webdriver.support.wait import WebDriverWait
from Commons.log_process import LogProcess
from Commons.logs import Log
from Commons.read_ymal import ReadYaml
from Commons.return_time import ReturnTime
from ConfigFile.contants_test import *
class BasicPage:
# 图片文件夹路径
__img_dir = LogProcess().get_log_dir()[1]
conf: dict = ReadYaml().get_every_config("Config")
def __init__(self, driver):
self.driver: webdriver.Chrome = driver
self.wait: WebDriverWait = WebDriverWait(self.driver, timeout=WAIT_ELEMENT, poll_frequency=POLL_ELEMENT)
self.wait: WebDriverWait = WebDriverWait(self.driver, timeout=self.conf.get("WAIT_ELEMENT"),
poll_frequency=self.conf.get("POLL_ELEMENT"))
def get_url(self, url: str) -> None:
"""
@ -278,8 +280,7 @@ class BasicPage:
model = self.get_current_url_path()
element = self.find_element(model=model, locator=locator, mode=mode)
if move_elemnet is True:
self.__move_element_visible(model=model, locator=locator,
element=element, alignment=alignment)
self.__move_element_visible(model=model, locator=locator, element=element, alignment=alignment)
try:
Log.debug(f"点击:{model}页面,属性为{locator}的元素")
if is_double_click:
@ -318,7 +319,7 @@ class BasicPage:
raise e
return element.text
def input_text(self, locator: tuple, content: str, mode: str = "visible", alignment: bool = False,
def input_text(self, locator: tuple, content: str or int, mode: str = "visible", alignment: bool = False,
move_elemnet: bool = False) -> WebElement:
"""
输入文本内容

101
Commons/get_allure_data.py Normal file
View File

@ -0,0 +1,101 @@
# -*- coding: utf-8 -*-
# @Author : caiweichao
# @explain : 获取allure报告中的详细信息
import json
import os
from ConfigFile.contants_file import report_path
class AllureFileClean:
"""allure 报告数据清洗,提取业务需要得数据"""
@classmethod
def _getAllFiles(cls) -> list:
""" 获取所有 test-case 中的 json 文件 """
filename = []
# 获取所有文件下的子文件名称
for root, dirs, files in os.walk(report_path + '/data/test-cases'):
for filePath in files:
path = os.path.join(root, filePath)
filename.append(path)
return filename
def getTestCases(self):
""" 获取所有 allure 报告中执行用例的情况"""
# 将所有数据都收集到files中
files = []
for i in self._getAllFiles():
with open(i, 'r', encoding='utf-8') as fp:
date = json.load(fp)
files.append(date)
return files
def getFailedCase(self):
""" 获取到所有失败的用例标题和用例代码路径"""
errorCase = []
for i in self.getTestCases():
if i['status'] == 'failed' or i['status'] == 'broken':
errorCase.append((i['name'], i['fullName']))
return errorCase
def getFailedCasesDetail(self):
""" 返回所有失败的测试用例相关内容 """
Data = self.getFailedCase()
# 判断有失败用例,则返回内容
if len(Data) >= 1:
values = "失败用例:\n"
values += " **********************************\n"
for i in Data:
values += " " + i[0] + ":" + i[1] + "\n"
return values
else:
# 如果没有失败用例则返回False
return ""
@classmethod
def getCaseCount(cls):
""" 统计用例数量 """
fileName = report_path + '/history/history-trend.json'
with open(fileName, 'r', encoding='utf-8') as fp:
date = json.load(fp)[0]['data']
return date
class CaseCount:
def __init__(self):
self.AllureData = AllureFileClean()
def passCount(self):
"""用例成功数"""
return self.AllureData.getCaseCount()['passed']
def failedCount(self):
"""用例失败数"""
return self.AllureData.getCaseCount()['failed']
def brokenCount(self):
"""用例异常数"""
return self.AllureData.getCaseCount()['broken']
def skippedCount(self):
"""用例跳过数"""
return self.AllureData.getCaseCount()['skipped']
def totalCount(self):
"""用例总数"""
return self.AllureData.getCaseCount()['total']
def passRate(self):
"""用例成功率"""
# 四舍五入保留2位小数
try:
passRate = round((self.passCount() + self.skippedCount()) / self.totalCount() * 100, 2)
return passRate
except ZeroDivisionError:
return 0.00
if __name__ == '__main__':
data = AllureFileClean().getCaseCount()
print(data)

View File

@ -8,12 +8,13 @@ import shutil
from Commons.return_time import ReturnTime
from ConfigFile import contants_file
from ConfigFile import contants_test
from Commons.read_ymal import ReadYaml
class LogProcess:
# 获取今天的日期
today = ReturnTime.get_time()
conf: dict = ReadYaml().get_every_config("Config")
# 获取当天的日志存放目录,不存在则创建
def get_log_dir(self):
@ -36,7 +37,7 @@ class LogProcess:
floders_log = os.listdir(contants_file.LOGS_DIR)
floders_img = os.listdir(contants_file.IMG_DIR)
# 获取最大的日志存储时间
max_time = int(self.today) - contants_test.LOG_TIME
max_time = int(self.today) - int(self.conf.get("LOG_TIME"))
# 删除n天前的日志
for floder in floders_log:
if int(floder) < max_time:

View File

@ -4,13 +4,15 @@
import logging
from Commons import log_process
from Commons.read_ymal import ReadYaml
from ConfigFile.contants_file import *
from ConfigFile.contants_test import *
conf: dict = ReadYaml().get_every_config("Config")
# 日志收集器
logger = logging.getLogger("Log")
# 定义输出级别
logger.setLevel(LOG_LEVEL)
logger.setLevel(conf.get("LOG_LEVEL"))
def set_handler(levels):
@ -36,9 +38,7 @@ class Log:
def __new__(cls, *args, **kwargs):
if not cls.__obj:
cls.__obj = super().__new__(cls)
return cls.__obj
else:
return cls.__obj
return cls.__obj
# 实例化文件管理类
log_process = log_process.LogProcess()
@ -47,15 +47,15 @@ class Log:
# 指定输出文件
log_file = os.path.join(log_dir[0], 'logs.log')
# 设置日志输出格式
formatter = logging.Formatter(fmt=FORMATTER)
formatter = logging.Formatter(fmt=conf.get("FORMATTER"))
# 指定输出渠道
# 控制台输出
ch = logging.StreamHandler()
ch.setLevel(LOG_LEVEL_CONSILE)
ch.setLevel(conf.get("LOG_LEVEL_CONSILE"))
ch.setFormatter(formatter)
# INFO日志输出
handler = logging.FileHandler(filename=log_file, encoding='utf-8')
handler.setLevel('DEBUG')
handler.setLevel('INFO')
handler.setFormatter(formatter)
# 错误日志输出
error_handle = logging.FileHandler(filename=log_file, encoding='utf-8')

View File

@ -2,6 +2,7 @@
# @Author : caiweichao
# @explain : 建立数据库连接进行查询
import pymysql
from pymysql.cursors import DictCursor
from Commons.logs import Log
from Commons.read_ymal import ReadYaml
@ -45,30 +46,26 @@ class Mysql_Util:
# 查询单条数据并且返回 可以通过sql查询指定的值 也可以通过索引去选择指定的值
def fetch_one(self, sql, name=None):
# 修改返回值为数组键值对
# cursor=self.db.cursors.DictCursor()
cursor = self.db.cursor()
try:
# 按照sql进行查询
cursor.execute(sql)
self.cursor.execute(sql)
if name is None:
# 返回一条数据 还有 all size自己控制
sql_data = cursor.fetchone()
sql_data = self.cursor.fetchone()
return sql_data
elif name is not None:
sql_data = cursor.fetchone()
sql_data = self.cursor.fetchone()
return sql_data[name]
except pymysql.err.ProgrammingError as e:
Log.error("请检查sql是否正确 sql={}".format(sql))
raise e
def fetch_all(self, sql): # 查询多条数据并且返回
# 修改返回值为数组键值对 cursor=pymysql.cursors.DictCursor
cursor = self.db.cursor()
try:
# 按照sql进行查询
cursor.execute(sql)
self.cursor.execute(sql)
# 返回一条数据 还有 all size自己控制
sql_data = cursor.fetchall()
sql_data = self.cursor.fetchall()
except pymysql.err.ProgrammingError as e:
Log.error("请检查sql是否正确 sql={}".format(sql))
raise e
@ -98,8 +95,7 @@ class Mysql_Util:
if __name__ == '__main__':
sql = "select ID from tem_platform_uat.ip_district where P_ID = 10801;"
with Mysql_Util(mysql_name='Mysql_test') as db:
sql = "select ID from tem_platform.ip_district where P_ID = 10801;"
with Mysql_Util(mysql_name='Mysql_ota') as db:
value = db.fetch_all(sql)
for x in value:
print(x[0])
print(value)

View File

@ -21,19 +21,27 @@ class ReadExcel:
# 读取指定sheet页的数据放入testcase对象
def get_testcase(self, sheetname: str) -> list[dict]:
sheet = self.workbook[sheetname]
caseDatas = list(sheet.rows)
caseTitle = [title.value for title in caseDatas[0]]
all_caseDatas = list(sheet.rows)
# 测试用例表头集合
caseTitle: list = [title.value for title in all_caseDatas[0]]
# 将第一行之后的数据解析后和第一行组装为测试用例
testCases = [dict(zip(caseTitle, [case.value for case in caseData])) for caseData in caseDatas[1:]]
testCases = [dict(zip(caseTitle, [caseData.value for caseData in caseDatas])) for caseDatas in
all_caseDatas[1:]]
return testCases
# 获取cases的标题
@staticmethod
def getCaseTitle(testCases: list[dict]):
caseTitle = [caseTitle.get("case_title") for caseTitle in testCases]
def getCaseValues(testCases: list[dict], key="case_title") -> list:
"""
通过指定的key获取用例集合中的内容默认返回用例的标题
:param testCases: 用例集合list[dict]
:param key: 用例数据中对应的键
:return: 键对应的值
"""
caseTitle = [caseTitle.get(key) for caseTitle in testCases]
return caseTitle
if __name__ == '__main__':
cases = ReadExcel().get_testcase(sheetname='OederControl')
print(ReadExcel.getCaseTitle(cases))
print(ReadExcel.getCaseValues(cases, 'case_id'))

View File

@ -21,6 +21,14 @@ def check_yaml(func):
class ReadYaml:
__obj = None
@staticmethod
def __new__(cls, *args, **kwargs):
if not cls.__obj:
cls.__obj = super().__new__(cls)
return cls.__obj
# 方法初始化的时候读取yaml文件
def __init__(self, file_url=None):
try:
@ -55,5 +63,5 @@ class ReadYaml:
if __name__ == '__main__':
user:dict = ReadYaml(file_url=contants_file.FACTORING_CODE).get_every_config("error_code")
print(user.get("0"))
x = ReadYaml().get_every_config('Config')
print(x.get("ALL_TIMEOUT"))

View File

@ -6,34 +6,42 @@
import allure
import requests
from Commons.json_util import JsonUtil
from Commons.logs import Log
class Requset:
"""
请求的基础类
:param method: 请求方式
:param url: 请求的url
:param data: 请求的参数
:param cookies: 请求中带的cookie
:param headers: 请求头
"""
class HttpRequset:
@allure.step("发起请求")
def __init__(self, method: str, url: str, data=None, cookies=None, headers=None):
"""
请求的基础类
:param method: 请求方式
:param url: 请求的url
:param data: 请求的参数
:param cookies: 请求中带的cookie
:param headers: 请求头
"""
try:
# Log.info(f'开始发起请求: 请求方式{method} 请求url={url}cookies={cookies}请求头header={headers} ,\n请求参数={data}')
if method.upper() == "GET":
self._res = requests.get(url=url, params=data, headers=headers, cookies=cookies)
self._res = requests.session().get(url=url, params=data, headers=headers, cookies=cookies)
elif method.upper() == "POST":
self._res = requests.post(url=url, json=data, headers=headers, cookies=cookies)
self._res = requests.session().post(url=url, json=data, headers=headers, cookies=cookies)
else:
Log.error(f"请求类未添加对应请求方式{method}")
with allure.step(f"响应结果{self._res.json()}"):
pass
with allure.step("请求日志"):
Log.info("请求信息:")
Log.info(f"request_url:{self._res.request.url}")
Log.info(f"request_headers:{self._res.request.headers}")
Log.info(f"request_body:{self._res.request.body}")
Log.info(f"request_cookies:{self._res.cookies}")
Log.info("响应信息:")
Log.info(f"response_headers:{self._res.headers}")
Log.info(f"response_body:{self._res.text}")
except Exception:
Log.error(Exception)
raise Exception("请求错误请检查参数")
raise Exception("请求异常请检查")
# 获取请求的cookies
def get_cookies(self):
@ -51,30 +59,10 @@ class Requset:
def get_response_time(self):
return self._res.elapsed.total_seconds()
# 获取响应后打印相关信息
def print_log(self, case_title=None):
"""
获取响应后打印相关信息
:param case_title: 调试用例的title
:return: None
"""
if case_title:
Log.info(f"------------------------用例: {case_title}------------------------")
Log.info("请求信息:")
Log.info(f"request_url:{self._res.request.url}")
Log.info(f"request_headers:{self._res.request.headers}")
Log.info(f"request_body:{self._res.request.body}")
Log.info(f"request_cookies:{self._res.cookies}")
Log.info("响应信息:")
Log.info(f"response_headers:{self._res.headers}")
Log.info(f"response_body:{self._res.text}")
def assert_response(self, rule):
response_json = self.get_json()
return JsonUtil.jsonToOneValue(rule=rule, json=response_json)
if __name__ == '__main__':
rest = Requset(method="post",
url="http://api.lemonban.com/futureloan/member/register",
data={"mobile_phone": 13248231300,
"pwd": 100000000},
headers={"Content-Type": "application/json",
"X-Lemonban-Media-Type": "lemonban.v1"})
rest.print_log()
pass

39
Commons/request_dubbo.py Normal file
View File

@ -0,0 +1,39 @@
# -*- coding: utf-8 -*-
# @Author : caiweichao
# @explain : 请求dubbo的工具类 (未实现)
from pyhessian import protocol
from pyhessian.client import HessianProxy
from Commons.logs import Log
# class DubboRequest:
# def req_dubbo(self, url, interface, method, param_obj, **kwargs):
# """
# :param url: url地址
# :param interface: 接口名称,因为这里可能还有别的服务要测,接口名不一样,这里定义成变量
# :param method: 调用哪个方法
# :param param_obj: 入参的对象
# :param kwargs: 这个用关键字参数,因为每个接口的参数都不一样,不固定,所以这里用关键字参数
# """
# # 这个是用来构造二进制的入参的,也就是把入参序列化
# res_param = protocol.object_factory(param_obj, **kwargs)
# try:
# # 这个res是生成一个请求对象
# req_obj = HessianProxy(url + interface)
# # getattr是python的内置方法获取对象的方法咱们从构造的请求对象里面获取到方法
# # 然后调用,把前面生成的 序列化好的参数传进去,然后获取到返回的数据
# res = getattr(req_obj, method)(res_param)
# return res
# except Exception as e:
# Log.error(f"请求出现异常,异常信息是:\n{e}")
#
#
# if __name__ == '__main__':
# url = 'http://192.168.1.225:21884/'
# interface = 'com.tem.platform.api.PartnerService'
# method = 'findById'
# param_obj = 'com.tem.platform.api.param.Param'
# params = {"id": 192}
# value = DubboRequest().req_dubbo(url, interface, method, param_obj, **params)
# print(value)

View File

@ -3,8 +3,9 @@
# @explain : 钉钉机器人通知
from Commons.read_ymal import ReadYaml
from Commons.request import Requset
from Commons.request import HttpRequset
from ConfigFile.contants_file import DING_CONFIG
from Commons.get_allure_data import CaseCount
class DingRobot:
@ -12,47 +13,62 @@ class DingRobot:
def __init__(self, robot_name):
try:
self.res_url = ReadYaml(file_url=DING_CONFIG).get_every_config(key=robot_name).get('url')
self.header = {"Content-Type": "application/json", "Charset": "UTF-8"}
except KeyError:
raise KeyError(f'{robot_name}错误请与配置文件核对')
except Exception:
raise Exception('未知异常请检查')
# 对指定的机器人发起请求
def res_dingding(self, error_msg=None, msg=None, is_at=None,is_scuess=False):
def res_dingding(self, error_msg=None, msg=None, is_at=None, is_scuess=False):
# 构建请求数据
header = {"Content-Type": "application/json", "Charset": "UTF-8"}
if is_scuess:
msg_ding = {"msgtype": "text",
"text": {"content": f'【线上监控】:{msg}'},
}
msg_ding = {
"msgtype": "text",
"text": {"content": f'【线上监控】:{msg}'}
}
else:
if is_at is None:
msg_ding = {"msgtype": "text",
"text": {"content": f'【线上报警】:\n{error_msg}\n{msg}'},
}
msg_ding = {
"msgtype": "text",
"text": {"content": f'【线上报警】:\n{error_msg}\n{msg}'},
}
else:
msg_ding = {"msgtype": "text",
"text": {"content": f'【线上报警】:\n{error_msg}\n{msg}'},
"at": {
"atMobiles": [f"{is_at}"]
}
}
Requset(method='post', url=self.res_url, data=msg_ding, headers=header)
msg_ding = {
"msgtype": "text",
"text": {"content": f'【线上报警】:\n{error_msg}\n{msg}'},
"at": {
"atMobiles": [f"{is_at}"]
}
}
HttpRequset(method='post', url=self.res_url, data=msg_ding, headers=self.header)
def res_allure_report(self, job_name, report_url,username="nbyy",pwd="ZTnbyy123456!"):
header = {"Content-Type": "application/json", "Charset": "UTF-8"}
def res_allure_report(self, name, report_url, username="nbyy", pwd="ZTnbyy123456!", ):
caseCount = CaseCount()
test_detail = {
"msgtype": "text",
"text": {"content": f"测试结果明细:\n"
f"本次运行case总数: {caseCount.totalCount()}\n"
f"用例通过率: {caseCount.passRate()}%\n"
f"通过case:{caseCount.passCount()}\n"
f"失败case:{caseCount.failedCount()}\n"
f"运行异常case:{caseCount.brokenCount()}\n"
f"跳过执行case:{caseCount.skippedCount()}\n"},
}
msg_ding = {
"msgtype": "link",
"link": {
"text": f"点击查看:\n 登录账号密码:{username}/{pwd}",
"title": f"{job_name}自动化日常巡检完成】",
"text": f"点击查看详细报告:登录账号密码:{username}/{pwd}",
"title": f"{name}自动化日常巡检完成】",
"picUrl": "",
"messageUrl": report_url
}
}
Requset(method='post', url=self.res_url, data=msg_ding, headers=header)
HttpRequset(method='post', url=self.res_url, data=test_detail, headers=self.header)
HttpRequset(method='post', url=self.res_url, data=msg_ding, headers=self.header)
if __name__ == '__main__':

View File

@ -59,7 +59,7 @@ class ReturnTime:
return now
@staticmethod
def time_cal(mode, num=0, time_delta=None):
def time_cal(mode, num, time_delta=None):
"""
:return: 返回当前时间 + n天
"%Y-%m-%d %H:%M"
@ -78,5 +78,4 @@ class ReturnTime:
if __name__ == '__main__':
print(ReturnTime.get_hour())
print(ReturnTime.time_cal(mode="%Y-%m-%d", num=0))

View File

@ -3,14 +3,61 @@ global:
# SIT:测试环境UAT:预上线环境PRO线上环境
#数据库链接参数-生产
Mysql_PRO:
Mysql_oms:
host:
port:
user:
password:
Mysql_ota:
host:
port:
user:
password:
# 数据库链接参数-测试
Mysql_test:
host:
port:
user:
password:
Account:
username: ""
password: ""
Config:
LOG_TIME: 5
# 最低日志输出级别
LOG_LEVEL: "DEBUG"
# 最低日志输出级别-控制台
LOG_LEVEL_CONSILE: "DEBUG"
# 日志输出格式
FORMATTER: '%(asctime)s-%(filename)s-%(levelname)s-%(message)s'
# 元素等待超时时间
WAIT_ELEMENT: 15
# 页面轮询元素间隔
POLL_ELEMENT: 0.8
# 全局等待时间
ALL_TIMEOUT: 30
# web端url
PC_URL: ""
# h5端url
H5_URL: ""
# admin端url
ADMIN_URL: ""
# 测试报告发送的对象
TESTREPROT_OBJECT: ""
# jenkins的job地址
JOB_URL: ""
# 公司jenkins地址
JENKINS_URL: ""
# jenkins账号
JENKINS_ACCOUNT: ""
# jenkins密码
JENKINS_PWD: ""
Api_test_data:

View File

@ -1,27 +0,0 @@
# -*- coding: utf-8 -*-
# @Author : caiweichao
# @explain : 自动化测试用到的常量
# -----------日志常量管理-------------------
# 日志存储时间
LOG_TIME = 7
# 最低日志输出级别
LOG_LEVEL = "DEBUG"
# 日志输出格式
FORMATTER = '%(asctime)s-%(filename)s-%(levelname)s-%(message)s'
# 元素等待超时时间
WAIT_ELEMENT = 15
# 页面轮询元素间隔
POLL_ELEMENT = 0.5
# 全局等待时间
ALL_TIMEOUT = 30
# web端url
PC_URL = ""
# h5端url
H5_URL = ""
# admin端url
ADMIN_URL = ""
# jenkins配置
JENKINS_URL = ""
J_USERNAME = ""
J_PWD = ""

View File

@ -0,0 +1,2 @@
robot_name:
url:

View File

@ -1,26 +1,36 @@
# -*- coding: utf-8 -*-
# @Author : caiweichao
# @explain : 通过jenkins发送allure测试报告
import os
import sys
import jenkins
from Commons.read_ymal import ReadYaml
from Commons.res_dingding import DingRobot
from ConfigFile import contants_test
def send_report(name, job_name):
# jenkins登录地址
jenkins_url = "http://192.168.1.227:8080"
def send_report():
conf = ReadYaml().get_every_config("Config")
name = conf.get("TESTREPROT_OBJECT")
job_url = conf.get('JOB_URL')
jenkins_url = conf.get("JENKINS_URL")
# 实例化jenkins对象
jenkins_server = jenkins.Jenkins(url=contants_test.JENKINS_URL,
username=contants_test.J_USERNAME,
password=contants_test.J_PWD)
jenkins_server = jenkins.Jenkins(
url=jenkins_url,
username=conf.get("JENKINS_ACCOUNT"),
password=conf.get("JENKINS_PWD")
)
# 获取job最后一次的构建内容
job_last_bulid = jenkins_server.get_info(job_name)["lastBuild"]["url"]
job_last_bulid = jenkins_server.get_info(job_url)["lastBuild"]["url"]
# 测试报告地址
report_url = job_last_bulid + "allure/"
# 发送报告
DingRobot(robot_name="oper_dingding_robot").res_allure_report(job_name=name, report_url=report_url)
DingRobot(robot_name="oper_dingding_robot").res_allure_report(name=name, report_url=report_url)
if __name__ == '__main__':
send_report(name='内部运营组', job_name='job/oper_test/', )
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
send_report()

42
requirements.txt Normal file
View File

@ -0,0 +1,42 @@
allure-pytest==2.8.29
allure-python-commons==2.8.29
apipkg==1.5
attrs==21.2.0
certifi==2021.5.30
chardet==4.0.0
charset-normalizer==2.0.4
colorama==0.4.4
configparser==5.0.2
crayons==0.4.0
et-xmlfile==1.1.0
execnet==1.9.0
idna==3.2
iniconfig==1.1.1
jenkins==1.0.2
jmespath==0.10.0
jsonpath==0.82
jsonschema==4.4.0
multi-key-dict==2.0.3
openpyxl==3.0.7
packaging==21.3
pbr==5.8.1
pluggy==0.13.1
py==1.10.0
PyMySQL==0.9.3
pyparsing==2.4.7
pyrsistent==0.18.1
pytest==6.2.4
pytest-assume==2.4.3
pytest-forked==1.3.0
pytest-ordering==0.6
pytest-repeat==0.9.1
pytest-rerunfailures==10.1
pytest-xdist==2.3.0
python-jenkins==1.7.0
PyYAML==5.3.1
requests==2.26.0
selenium==3.141.0
six==1.16.0
toml==0.10.2
urllib3==1.26.6
webdriver-manager==3.2.2