pytest_api/tools/officemail_control.py

261 lines
9.0 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# !/usr/bin/env python3
# coding: utf-8
import smtplib # 加载smtplib模块
from email.mime.text import MIMEText
from email.utils import formataddr
from email.mime.multipart import MIMEMultipart
import os
import json
import yaml.scanner
from email.mime.application import MIMEApplication
import datetime
class ConfigHandler:
# 项目路径
root_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
config_path = os.path.join(root_path, 'config', 'conf.yaml')
report_path = os.path.join(root_path, 'report')
class AllureFileClean:
"""allure 报告数据清洗,提取业务需要得数据"""
def __init__(self):
pass
@classmethod
def _get_al_files(cls) -> list:
""" 获取所有 test-case 中的 json 文件 """
filename = []
# 获取所有文件下的子文件名称
for root, dirs, files in os.walk(ConfigHandler.report_path + '/html/data/test-cases'):
for filePath in files:
path = os.path.join(root, filePath)
filename.append(path)
return filename
def get_test_cases(self):
""" 获取所有 allure 报告中执行用例的情况"""
# 将所有数据都收集到files中
files = []
for i in self._get_al_files():
with open(i, 'r', encoding='utf-8') as fp:
date = json.load(fp)
files.append(date)
return files
def get_failed_case(self):
""" 获取到所有失败的用例标题和用例代码路径"""
error_cases = []
for i in self.get_test_cases():
if i['status'] == 'failed' or i['status'] == 'broken':
error_cases.append((i['name'], i['fullName']))
return error_cases
def get_failed_cases_detail(self):
""" 返回所有失败的测试用例相关内容 """
date = self.get_failed_case()
# 判断有失败用例,则返回内容
if len(date) >= 1:
values = "失败用例:\n"
values += " **********************************\n"
for i in date:
values += " " + i[0] + ":" + i[1] + "\n"
return values
else:
# 如果没有失败用例则返回False
return ""
@classmethod
def get_case_count(cls):
""" 统计用例数量 """
file_name = ConfigHandler.report_path + '/html/history/history-trend.json'
with open(file_name, 'r', encoding='utf-8') as fp:
date = json.load(fp)[0]['data']
return date
class GetYamlData:
def __init__(self, file_dir):
self.fileDir = file_dir
def get_yaml_data(self) -> dict:
"""
获取 yaml 中的数据
:param: fileDir:
:return:
"""
# 判断文件是否存在
if os.path.exists(self.fileDir):
data = open(self.fileDir, 'r', encoding='utf-8')
try:
res = yaml.load(data, Loader=yaml.FullLoader)
return res
except UnicodeDecodeError:
raise ValueError(f"yaml文件编码错误文件路径:{self.fileDir}")
else:
raise FileNotFoundError("文件路径不存在")
def write_yaml_data(self, key: str, value) -> int:
"""
更改 yaml 文件中的值
:param key: 字典的key
:param value: 写入的值
:return:
"""
with open(self.fileDir, 'r', encoding='utf-8') as f:
# 创建了一个空列表,里面没有元素
lines = []
for line in f.readlines():
if line != '\n':
lines.append(line)
f.close()
with open(self.fileDir, 'w', encoding='utf-8') as f:
flag = 0
for line in lines:
left_str = line.split(":")[0]
if key == left_str and '#' not in line:
newline = "{0}: {1}".format(left_str, value)
line = newline
f.write('%s\n' % line)
flag = 1
else:
f.write('%s' % line)
f.close()
return flag
class CaseCount:
def __init__(self):
self.AllureData = AllureFileClean()
def pass_count(self):
"""用例成功数"""
return self.AllureData.get_case_count()['passed']
def failed_count(self):
"""用例失败数"""
return self.AllureData.get_case_count()['failed']
def broken_count(self):
"""用例异常数"""
return self.AllureData.get_case_count()['broken']
def skipped_count(self):
"""用例跳过数"""
return self.AllureData.get_case_count()['skipped']
def total_count(self):
"""用例总数"""
return self.AllureData.get_case_count()['total']
def pass_rate(self):
"""用例成功率"""
# 四舍五入保留2位小数
try:
pass_rate = round((self.pass_count() + self.skipped_count()) / self.total_count() * 100, 2)
return pass_rate
except ZeroDivisionError:
return 0.00
class Config:
def __getattr__(self, attr):
return os.environ[attr]
class SendMail(object):
def __init__(self):
self.sys_sender = Config().__getattr__("send_user") # 系统账户
self.sys_pwd = Config().__getattr__("send_pwd") # 系统账户密码
self.test_name = Config().__getattr__("GITLAB_USER_NAME")
self.ProjectName = GetYamlData(ConfigHandler.config_path).get_yaml_data()['ProjectName']
self.allure_data = CaseCount()
self.PASS = self.allure_data.pass_count()
self.FAILED = self.allure_data.failed_count()
self.BROKEN = self.allure_data.broken_count()
self.SKIP = self.allure_data.skipped_count()
self.TOTAL = self.allure_data.total_count()
self.RATE = self.allure_data.pass_rate()
self.ALL_CASE = self.PASS + self.FAILED + self.BROKEN
try:
self.host = Config().__getattr__("DAV_ENVIRONMENT_SLUG")
except:
self.host = Config().__getattr__("CI_ENVIRONMENT_SLUG")
self.job_id = str(Config().__getattr__("GL_JOB_ID"))
self.sender_list = Config().__getattr__("sender_list")
self.email_host = Config().__getattr__('email_host')
self.email_port = int(Config().__getattr__("email_port"))
def contents(self):
current_time = (datetime.datetime.now() + datetime.timedelta(hours=8)).strftime('%Y-%m-%d %H:%M:%S')
content = """
各位同事, 大家好:
自动化用例执行完成,执行结果如下:
测试人员 : {tester_name}
运行环境 : {host}
运行的成功率: {rate} %
用例运行总数: {all_case}
通过用例个数: {total}
失败用例个数: {failed}
异常用例个数: {exception_case}
跳过用例个数: {skip_case}
用例运行时间:{run_time}
**********************************
报告地址:'https://davinci-rnd.pages.davincimotor.com/-/testing/davinci_dm_api/-/jobs/{job_id}/artifacts/report/pytest_html/result.html'
系统自动发送请勿回复,详细情况可报告查看,非相关负责人员可忽略此消息。谢谢。
PS名词解释
异常用例异常是代码还没执行到assert断言就报错了
失败用例失败是代码执行assert断言失败与预期不符
跳过用例:不在此环境执行这个接口通常用于生产环境
""".format(tester_name=self.test_name, host=str(self.host), rate=self.RATE, all_case=self.ALL_CASE,
total=self.PASS, failed=self.FAILED, exception_case=self.BROKEN, skip_case=self.SKIP,
run_time=current_time, job_id=self.job_id)
return content
def send(self):
"""
发送邮件
:return: bool
"""
try:
# 创建一个带附件的实例
msg = MIMEMultipart()
# 发件人格式
msg['From'] = formataddr(["", self.sys_sender])
# 收件人格式
sender = self.sender_list
msg['To'] = formataddr(["", sender])
# 邮件主题 title
msg['Subject'] = str(self.ProjectName) + "(全量回归)"
# 邮件正文内容
msg.attach(MIMEText(SendMail().contents(), 'plain', 'utf-8'))
# SMTP服务器
server = smtplib.SMTP(self.email_host, int(self.email_port), timeout=10)
server.ehlo()
# TLS加密
server.starttls()
# 登录账户
server.login(self.sys_sender, self.sys_pwd)
# 发送邮件
server.sendmail(self.sys_sender, sender.split(','), msg.as_string())
# 退出账户
server.quit()
return True
except Exception as e:
raise e
if __name__ == '__main__':
SendMail().send()