fix: 1、新增自动同步用例脚本ID到CSV文件功能;2、新增自动将PMS上用例标签同步到CSV文件的功能;3、优化了一些配置参数和命令行参数的入参方式;4、更新了本次新增功能的文档内容;

Description:

Log:
This commit is contained in:
mikigo 2023-09-08 16:56:50 +08:00
parent fbac127c72
commit 28e12e363f
17 changed files with 1710 additions and 887 deletions

View File

@ -63,12 +63,13 @@ from src.pms.suite import Suite
from src.pms.send2pms import Send2Pms
from src.recording_screen import recording_screen
FLAG_FEEL = '=' * 10
FLAG_FEEL = "=" * 10
LN = "\n"
class LabelType(Enum):
"""用例级别对应报告级别"""
L1 = allure.severity_level.BLOCKER
L2 = allure.severity_level.CRITICAL
L3 = allure.severity_level.NORMAL
@ -90,9 +91,7 @@ def write_json(session):
def auto_send(session):
"""auto send"""
return bool(
session.config.option.send_pms and session.config.option.trigger
)
return bool(session.config.option.send_pms and session.config.option.trigger)
def async_send(session):
@ -113,62 +112,43 @@ def finish_send(session):
def pytest_addoption(parser):
"""pytest_cmdline_main"""
parser.addoption(
"--clean", action="store", default="no", help="是否清理环境&杀进程"
)
parser.addoption("--clean", action="store", default="no", help="是否清理环境&杀进程")
parser.addoption(
"--log_level", action="store", default=GlobalConfig.LOG_LEVEL, help="终端日志输出级别"
)
parser.addoption(
"--noskip", action="store", default="", help="skip-xxx标签不生效"
)
parser.addoption(
"--ifixed", action="store", default="", help="fixed-xxx标签不生效"
)
parser.addoption(
"--max_fail", action="store", default="", help="最大失败次数"
)
parser.addoption("--noskip", action="store", default="", help="skip-xxx标签不生效")
parser.addoption("--ifixed", action="store", default="", help="fixed-xxx标签不生效")
parser.addoption("--max_fail", action="store", default="", help="最大失败次数")
parser.addoption(
"--record_failed_case", action="store", default="", help="失败录屏从第几次失败开始录制视频"
)
parser.addoption("--send_pms", action="store", default="", help="用例数据回填")
parser.addoption("--task_id", action="store", default="", help="测试单id")
parser.addoption("--trigger", action="store", default="", help="数据回填的触发者")
parser.addoption("--suite_id", action="store", default="", help="pms的测试套件ID")
parser.addoption("--pms_user", action="store", default="", help="登录pms的账号")
parser.addoption("--pms_password", action="store", default="", help="登录pms的密码")
parser.addoption("--top", action="store", default="", help="过程中记录top命令中的值")
parser.addoption(
"--asan", action="store", default="", help="执行安全测试用例"
"--duringfail",
action="store_true",
dest="duringfail",
default=False,
help="出现错误时立即显示",
)
parser.addoption("--repeat", action="store", default=1, type=int, help="用例重复执行的次数")
parser.addoption("--export_csv_file", action="store", default="", help="导出csv文件")
parser.addoption("--line", action="store", default="", help="业务线(CI)")
parser.addoption("--app_name", action="store", default="", help="执行的应用名称")
parser.addoption(
"--autostart", action="store", default="", help="重启类场景开启letmego执行方案"
)
parser.addoption(
"--send_pms", action="store", default="", help="用例数据回填"
)
parser.addoption(
"--task_id", action="store", default="", help="测试单id"
)
parser.addoption(
"--trigger", action="store", default="", help="数据回填的触发者"
)
parser.addoption(
"--suite_id", action="store", default="", help="pms的测试套件ID"
)
parser.addoption(
"--pms_user", action="store", default="", help="登录pms的账号"
)
parser.addoption(
"--pms_password", action="store", default="", help="登录pms的密码"
)
parser.addoption(
"--top", action="store", default="", help="过程中记录top命令中的值"
)
parser.addoption(
"--duringfail", action="store_true", dest="duringfail", default=False, help="出现错误时立即显示"
)
parser.addoption(
'--repeat', action='store', default=1, type=int, help="用例重复执行的次数"
)
parser.addoption(
'--exportcsv', action='store', default="", help="导出测试用例文件"
)
parser.addoption(
'--line', action='store', default="", help="业务线(CI)"
)
parser.addoption(
'--autostart', action='store', default="", help="用例执行程序注册到开机自启服务"
"--pyid2csv",
action="store_true",
dest="pyid2csv",
default=False,
help="将用例py文件的case id同步到对应的csv文件中",
)
@ -204,13 +184,13 @@ def pytest_sessionstart(session):
"""pytest_sessionstart"""
# 批量执行之前修改主题
if (
CmdCtl.run_cmd(
"gsettings get com.deepin.dde.appearance gtk-theme",
interrupt=False,
out_debug_flag=False,
command_log=False,
).strip("'")
!= GlobalConfig.SYS_THEME
CmdCtl.run_cmd(
"gsettings get com.deepin.dde.appearance gtk-theme",
interrupt=False,
out_debug_flag=False,
command_log=False,
).strip("'")
!= GlobalConfig.SYS_THEME
):
CmdCtl.run_cmd(
f"gsettings set com.deepin.dde.appearance gtk-theme {GlobalConfig.SYS_THEME}",
@ -218,7 +198,11 @@ def pytest_sessionstart(session):
out_debug_flag=False,
command_log=False,
)
_display = GlobalConfig.DisplayServer.wayland if GlobalConfig.IS_WAYLAND else GlobalConfig.DisplayServer.x11
_display = (
GlobalConfig.DisplayServer.wayland
if GlobalConfig.IS_WAYLAND
else GlobalConfig.DisplayServer.x11
)
logger.info(f"当前系统显示协议为 {_display.title()}.")
# 设置任务栏方向
popen("gsettings set com.deepin.dde.dock position bottom")
@ -231,13 +215,16 @@ def pytest_sessionstart(session):
suite_id = session.config.option.suite_id
if write_json(session):
session.case_res_path = Send2Pms.case_res_path(task_id or suite_id)
session.data_send_result_csv = Send2Pms.data_send_result_csv(task_id or suite_id)
session.data_send_result_csv = Send2Pms.data_send_result_csv(
task_id or suite_id
)
if user and password and async_send(session):
session.all_thread_task = []
session.t_executor = ThreadPoolExecutor()
if not session.config.option.collectonly and session.config.option.top:
def record_top():
top_log_path = f"{GlobalConfig.REPORT_PATH}/logs"
if not exists(top_log_path):
@ -265,7 +252,7 @@ def pytest_generate_tests(metafunc):
return f"{i + 1}-{number}"
metafunc.parametrize(
'__pytest_repeat_step_number',
"__pytest_repeat_step_number",
range(repeat),
indirect=True,
ids=ids,
@ -274,15 +261,57 @@ def pytest_generate_tests(metafunc):
def pytest_collection_modifyitems(session):
"""pytest_collection_modifyitems"""
no_youqu_mark = {}
csv_path_dict = {}
for root, _, files in walk(GlobalConfig.APPS_PATH):
if "NOYOUQUMARK" in files and not no_youqu_mark.get(root):
no_youqu_mark[root] = True
continue
for file in files:
if file.endswith(".csv") and file != "case_list.csv":
csv_path_dict[splitext(file)[0]] = f"{root}/{file}"
walk_dir = (
f"{GlobalConfig.APPS_PATH}/{session.config.option.app_name}"
if session.config.option.app_name
else GlobalConfig.APPS_PATH
)
csv_path_dict, no_youqu_mark = walk_apps(walk_dir)
if session.config.option.collectonly and session.config.option.pyid2csv:
for item in session.items:
_case_id = findall(r"test_.*?_(\d+)", item.fspath.purebasename)
case_id = _case_id[0] if _case_id else "No match found for case id"
_csv_name = findall(r"test_(.*?)_\d+", item.fspath.purebasename)
csv_name = _csv_name[0] if _csv_name else None
csv_path = csv_path_dict.get(csv_name)
if not csv_path_dict or not csv_path_dict.get(csv_name):
_dir_name = item.fspath.dirname
if str(_dir_name).endswith("case"):
dir_name = _dir_name.rstrip("/case")
else:
dir_name = _dir_name.replace("/case/", "/tag/")
if not exists(dir_name):
makedirs(dir_name)
csv_path = f"{dir_name}/{csv_name}.csv"
with open(csv_path, "w+", encoding="utf-8") as f:
f.write(",".join([i.value for i in FixedCsvTitle]) + LN)
csv_path_dict, no_youqu_mark = walk_apps(walk_dir)
with open(csv_path, "r", encoding="utf-8") as f:
csv_txt_list = f.readlines()
try:
csv_head = csv_txt_list[0]
comma_num = csv_head.count(",")
except IndexError:
with open(csv_path, "w+", encoding="utf-8") as f:
f.write(",".join([i.value for i in FixedCsvTitle]) + LN)
comma_num = len(FixedCsvTitle) - 1
csv_taglines = [txt.strip().split(",") for txt in csv_txt_list[1:]]
if not csv_taglines:
with open(csv_path, "a+", encoding="utf-8") as f:
f.write(f"{case_id}{comma_num * ','}" + LN)
else:
for i in csv_taglines:
if i[0] == case_id or int(i[0]) == int(case_id):
break
else:
with open(csv_path, "a+", encoding="utf-8") as f:
f.write(f"{case_id}{comma_num * ','}" + LN)
if not csv_path_dict:
return
@ -331,8 +360,15 @@ def pytest_collection_modifyitems(session):
try:
csv_name, _id = findall(r"test_(.*?)_(\d+)", item.name)[0]
_case_id = findall(r"test_.*?_(\d+)", item.fspath.purebasename)[0]
if _id != _case_id:
raise ValueError
except IndexError:
skip_text = f"{item.nodeid} 用例名称缺少用例id, 跳过执行"
skip_text = f"\n用例名称缺少用例id,跳过处理:[{item.nodeid}]"
logger.error(skip_text)
add_mark(item, ConfStr.SKIP.value, (skip_text,), {})
except ValueError:
skip_text = f"\n用例py文件的id和用例函数的id不一致,跳过处理:[{item.nodeid}]"
logger.error(skip_text)
add_mark(item, ConfStr.SKIP.value, (skip_text,), {})
else:
@ -377,10 +413,15 @@ def pytest_collection_modifyitems(session):
tags = containers.get(csv_path).get(_id)
if tags:
try:
if containers[csv_path][ConfStr.REMOVED_INDEX.value] is not None \
and tags[containers[csv_path][ConfStr.REMOVED_INDEX.value]] \
.strip('"').startswith(
f"{ConfStr.REMOVED.value}-"):
if containers[csv_path][
ConfStr.REMOVED_INDEX.value
] is not None and tags[
containers[csv_path][ConfStr.REMOVED_INDEX.value]
].strip(
'"'
).startswith(
f"{ConfStr.REMOVED.value}-"
):
session.items.remove(item)
continue
except IndexError as exc:
@ -395,17 +436,26 @@ def pytest_collection_modifyitems(session):
# 先处理“跳过原因”列
if index == containers[csv_path][ConfStr.SKIP_INDEX.value]:
# 标签是以 “skip-” 开头, noskip 用于解除所有的skip
if not session.config.option.noskip \
and tag.startswith(f"{ConfStr.SKIP.value}-"):
if not session.config.option.noskip and tag.startswith(
f"{ConfStr.SKIP.value}-"
):
# 标签以 “fixed-” 开头, ifixed表示ignore fixed, 用于忽略所有的fixed
# 1. 不给ifixed参数时只要标记了fixed的用例即使标记了skip-,也会执行;
# 2. 给ifixed 参数时(--ifixed yes)fixed不生效仅通过skip跳过用例
try:
if (
not session.config.option.ifixed
and containers[csv_path][ConfStr.FIXED_INDEX.value] is not None
and tags[containers[csv_path][ConfStr.FIXED_INDEX.value]].strip(
'"').startswith(f"{ConfStr.FIXED.value}-")
not session.config.option.ifixed
and containers[csv_path][
ConfStr.FIXED_INDEX.value
]
is not None
and tags[
containers[csv_path][
ConfStr.FIXED_INDEX.value
]
]
.strip('"')
.startswith(f"{ConfStr.FIXED.value}-")
):
continue
except IndexError:
@ -432,7 +482,10 @@ def pytest_collection_modifyitems(session):
)
else: # 非跳过列
# 处理 pms id
if containers[csv_path][ConfStr.PMS_ID_INDEX.value] == index:
if (
containers[csv_path][ConfStr.PMS_ID_INDEX.value]
== index
):
if suite_runs_ids:
if tag not in suit_id_deque:
session.items.remove(item)
@ -453,7 +506,9 @@ def pytest_collection_modifyitems(session):
add_mark(item, tag, (mark_title,), {})
else: # tag为空
# 处理 pmd id 为空的情况
if (task_id or suite_id) and containers[csv_path][ConfStr.PMS_ID_INDEX.value] == index:
if (task_id or suite_id) and containers[csv_path][
ConfStr.PMS_ID_INDEX.value
] == index:
session.items.remove(item)
continue
else:
@ -480,6 +535,19 @@ def pytest_collection_modifyitems(session):
print() # 处理日志换行
def walk_apps(walk_dir):
no_youqu_mark = {}
csv_path_dict = {}
for root, _, files in walk(walk_dir):
if "NOYOUQUMARK" in files and not no_youqu_mark.get(root):
no_youqu_mark[root] = True
continue
for file in files:
if file.endswith(".csv") and file != "case_list.csv":
csv_path_dict[splitext(file)[0]] = f"{root}/{file}"
return csv_path_dict, no_youqu_mark
def pytest_collection_finish(session):
"""pytest_collection_finish"""
session.item_count = len(session.items)
@ -487,14 +555,17 @@ def pytest_collection_finish(session):
if session.config.option.reruns and not session.config.option.collectonly:
print(f"失败重跑次数:\t{session.config.option.reruns}")
if session.config.option.max_fail and not session.config.option.collectonly:
session.config.option.maxfail = int(float(session.config.option.max_fail) * session.item_count)
session.config.option.maxfail = int(
float(session.config.option.max_fail) * session.item_count
)
print(f"最大失败次数:\t{session.config.option.maxfail}")
session.sessiontimeout = 0
if session.config.option.timeout and not session.config.option.collectonly:
_min, sec = divmod(int(session.config.option.timeout), 60)
hour, _min = divmod(_min, 60)
print(
f"用例超时时间:\t{session.config.option.timeout}s ({hour}{'小时' if hour else ''}{_min}{'' if _min else ''}{sec}秒)")
f"用例超时时间:\t{session.config.option.timeout}s ({hour}{'小时' if hour else ''}{_min}{'' if _min else ''}{sec}秒)"
)
# sessiontimeout
_n = 0
items_timeout = 0
@ -508,7 +579,9 @@ def pytest_collection_finish(session):
item_timeout = 0
items_timeout += item_timeout
break
session.sessiontimeout = ((session.item_count - _n) * session.config.option.timeout) + items_timeout
session.sessiontimeout = (
(session.item_count - _n) * session.config.option.timeout
) + items_timeout
_min, sec = divmod(int(session.sessiontimeout), 60)
hour, _min = divmod(_min, 60)
print(
@ -516,12 +589,12 @@ def pytest_collection_finish(session):
)
# 生成 case_list.csv
if session.config.option.collectonly:
if session.config.option.collectonly and session.config.option.export_csv_file:
execute = []
execute.append("用例名称," + GlobalConfig.CSV_HEARD + LN)
execute.append("用例名称," + GlobalConfig.EXPORT_CSV_HEARD + LN)
for item in session.items:
node_id = item.nodeid.split("[")[0]
header = GlobalConfig.CSV_HEARD.split(",")
header = GlobalConfig.EXPORT_CSV_HEARD.split(",")
case_info = ["" for _ in header]
case_info.insert(0, node_id)
for mark in item.own_markers:
@ -537,7 +610,11 @@ def pytest_collection_finish(session):
execute2.sort(key=execute.index)
if not exists(GlobalConfig.REPORT_PATH):
makedirs(GlobalConfig.REPORT_PATH)
with open(f"{GlobalConfig.REPORT_PATH}/{GlobalConfig.CSV_FILE}", "w", encoding="utf-8") as _f:
with open(
f"{GlobalConfig.REPORT_PATH}/{session.config.option.export_csv_file}",
"w+",
encoding="utf-8",
) as _f:
_f.writelines(execute2)
@ -574,9 +651,11 @@ def pytest_runtest_setup(item):
pass
if item.config.option.pms_user and item.config.option.pms_password:
def send2pms(case_res_path, data_send_result_csv):
Send2Pms(
user=item.config.option.pms_user, password=item.config.option.pms_password
user=item.config.option.pms_user,
password=item.config.option.pms_password,
).send2pms(case_res_path, data_send_result_csv)
if async_send(item.session):
@ -620,7 +699,9 @@ def pytest_runtest_makereport(item, call):
allure.dynamic.severity(LabelType.L3.value)
elif mark.args[0] == FixedCsvTitle.pms_case_id.value:
# if mark.name:
testcase_url = f"https://pms.uniontech.com/testcase-view-{mark.name}.html"
testcase_url = (
f"https://pms.uniontech.com/testcase-view-{mark.name}.html"
)
allure.dynamic.testcase(testcase_url)
logger.info(testcase_url)
else:
@ -683,7 +764,9 @@ def pytest_runtest_makereport(item, call):
# 非图像识别错误
pass
try:
template = f"{splitext(item.record['image_path'])[0]}_ocr_.png"
template = (
f"{splitext(item.record['image_path'])[0]}_ocr_.png"
)
CmdCtl.run_cmd(f"cp {item.record['ocr']} {template}")
allure.attach.file(
template,
@ -712,14 +795,12 @@ def pytest_report_teststatus(report, config):
if report.when in ("setup", "teardown"):
if report.failed:
short, verbose = config.hook.pytest_emoji_error(
config=config,
head_line=report.head_line
config=config, head_line=report.head_line
)
return "error", short, verbose
if report.skipped:
short, verbose = config.hook.pytest_emoji_skipped(
config=config,
head_line=report.head_line
config=config, head_line=report.head_line
)
return "skipped", short, verbose
# 在用例执行阶段处理 passed skipped failed
@ -727,18 +808,15 @@ def pytest_report_teststatus(report, config):
short = verbose = ""
if report.passed:
short, verbose = config.hook.pytest_emoji_passed(
config=config,
head_line=report.head_line
config=config, head_line=report.head_line
)
elif report.skipped:
short, verbose = config.hook.pytest_emoji_skipped(
config=config,
head_line=report.head_line
config=config, head_line=report.head_line
)
elif report.failed:
short, verbose = config.hook.pytest_emoji_failed(
config=config,
head_line=report.head_line
config=config, head_line=report.head_line
)
return report.outcome, short, verbose
return None
@ -764,19 +842,20 @@ def pytest_sessionfinish(session):
default_result["result"] = "fail"
item_name = item.nodeid.split("[")[0]
if not execute.get(item_name) or (
item.outcome != ConfStr.PASSED.value
and execute.get(item_name).get("result") == "pass"
item.outcome != ConfStr.PASSED.value
and execute.get(item_name).get("result") == "pass"
):
execute[item_name] = default_result
except AttributeError:
pass
if execute:
with open(
f"{GlobalConfig.ROOT_DIR}/ci_result.json", "w", encoding="utf-8"
f"{GlobalConfig.ROOT_DIR}/ci_result.json", "w", encoding="utf-8"
) as _f:
_f.write(dumps(execute, indent=2, ensure_ascii=False))
if session.config.option.pms_user and session.config.option.pms_password:
def send2pms(case_res_path, data_send_result_csv):
Send2Pms(
user=session.config.option.pms_user,
@ -805,7 +884,7 @@ def pytest_sessionfinish(session):
f"echo '{GlobalConfig.PASSWORD}' | sudo -S rm -rf {GlobalConfig.TMPDIR}",
interrupt=False,
out_debug_flag=False,
command_log=False
command_log=False,
)
@ -815,7 +894,7 @@ def pytest_emoji_passed(config, head_line):
# 笑脸
return (
f"{datetime.now()} {head_line} || 😃 】\n",
f"{datetime.now()} {head_line} || PASSED 😃 】\n"
f"{datetime.now()} {head_line} || PASSED 😃 】\n",
)
@ -825,7 +904,7 @@ def pytest_emoji_failed(config, head_line):
# 哭笑不得
return (
f"{datetime.now()} {head_line} || 😰 】\n",
f"{datetime.now()} {head_line} || FAILED 😰 】\n"
f"{datetime.now()} {head_line} || FAILED 😰 】\n",
)
@ -835,7 +914,7 @@ def pytest_emoji_skipped(config, head_line):
# 翻白眼儿
return (
f"{datetime.now()} {head_line} || 🙄 】\n",
f"{datetime.now()} {head_line} || SKIPPED 🙄 】\n"
f"{datetime.now()} {head_line} || SKIPPED 🙄 】\n",
)
@ -845,7 +924,7 @@ def pytest_emoji_error(config, head_line):
# 哭哭
return (
f"{datetime.now()} {head_line} || 😡 】\n",
f"{datetime.now()} {head_line} || ERROR 😡 】\n"
f"{datetime.now()} {head_line} || ERROR 😡 】\n",
)

View File

@ -29,9 +29,9 @@ AT 开发规范是根据自动化测试运行两年多来,遇到问题解决
举例:
```python
# test_music_001.py
# test_music_679537.py
def test_music_001():
def test_music_679537():
"""用例标题"""
pass
```
@ -45,9 +45,9 @@ AT 开发规范是根据自动化测试运行两年多来,遇到问题解决
如果你将上例写成了这样:
```python
# test_music_001.py
# test_music_679537.py
def test_movie_001():
def test_movie_679537():
"""用例标题"""
pass
```
@ -191,7 +191,7 @@ class TitleWidget(BaseWidget):
class TestMusic(BaseCase):
"""音乐用例"""
def test_music_001(self):
def test_music_679537(self):
"""音乐启动"""
```
@ -209,13 +209,13 @@ class TestMusic(BaseCase):
class TestMusic(BaseCase):
"""音乐用例"""
def test_music_001_1(self):
def test_music_679537_1(self):
"""任务栏启动音乐"""
def test_music_001_2(self):
def test_music_679537_2(self):
"""启动器启动音乐"""
def test_music_001_3(self):
def test_music_679537_3(self):
"""桌面启动音乐"""
```
@ -223,7 +223,7 @@ class TestMusic(BaseCase):
- 用例函数以 test 开头,遵循蛇形命名规范,中间为用例的模块名称,后面加用例 ID最后加测试点序号`test_${module}_${case_id}[_${index}]`
比如:`test_music_001_1`index 从 1 开始。
比如:`test_music_679537_1`index 从 1 开始。
- 函数功能说明里面写用例标题,直接复制 PMS 上用例标题即可,注意用三对双引号,不要用其他注释,更不要用井号注释写用例标题;
@ -241,7 +241,7 @@ class TestMusic(BaseCase):
class TestMusic(BaseCase):
"""音乐用例"""
def test_music_182(self):
def test_music_679537(self):
"""演唱者-平铺视图下进入演唱者详情页"""
# 1
@ -258,7 +258,7 @@ class TestMusic(BaseCase):
class TestMusic(BaseCase):
"""音乐用例"""
def test_music_182(self):
def test_music_679537(self):
"""演唱者-平铺视图下进入演唱者详情页"""
music = DeepinMusicWidget()
music.click_singer_btn_in_music_by_ui()

View File

@ -9,6 +9,7 @@ new
fix
- 对 docs 里面细化了远程执行章节的描述;
- 多 docs 里面优化了标签化管理章节的描述;
## 2.1.52023/8/31
@ -369,7 +370,7 @@ new
```python
@pytest.mark.count(2)
def test_music_001():
def test_music_679537():
pass
```
@ -382,7 +383,7 @@ new
- image_utils 增加函数 save_temporary_picture支持指定屏幕区域截图并返回图片存放的本地路径后续使用 assert_image_exist 进行断言
- ```Python
def test_music_001(self):
def test_music_679537(self):
pic_path = DeepinMusicWidget.save_temporary_picture(x, y, width, height)
...... # 中间操作
self.assert_image_exit(pic_path)
@ -391,7 +392,7 @@ new
- button_center 新增 btn_size 获取控件左上角坐标及长宽,用于动态的截取元素的图片,可用于定位断言
- ```python
def test_music_001(self):
def test_music_679537(self):
pic_path = DeepinMusicWidget.save_temporary_picture(*DeepinMusicWidget().ui.btn_size("所有音乐按钮"))
...... # 中间操作
self.assert_image_exit(pic_path)

View File

@ -109,9 +109,7 @@ youqu
基础框架工程目录建议放在 `~` 目录下,放在其他目录也可以运行,但是在自动化用例执行过程中可能需要做环境清理,放在其他目录存在代码被删除的风险。
如果你的机器上不同目录下存在多个 YouQu 工程,那么在运行之前,请先执行 env.sh 校正相关环境。
### 2. 执行器
如果你的机器上不同目录下存在多个 `YouQu` 工程,那么在运行之前,请先执行 `env.sh` 校正相关环境。
在项目根目录下有一个 `manage.py` ,它是一个执行器入口,提供了本地执行、远程执行等的功能。
@ -121,252 +119,17 @@ youqu
youqu manage.py run -a deepin-music
```
#### 2.1. 本地执行
### 2. 本地执行
```shell
youqu manage.py run
```
##### 2.1.1. 配置文件
通过配置文件配置参数
在配置文件 `setting/globalconfig.ini` 里面支持配置对执行的一些参数进行配置,常用的如:
```ini
;=============================== CASE CONFIG ===================================
[case]
;执行的应用名称
;为空表示执行 apps/ 目录下所有应用的用例
APP_NAME =
;执行包含关键词的用例
KEYWORDS =
;执行包含用例标签的用例
TAGS =
;-----------------------------------------------
;1.KEYWORDS 和 TAGS 都为空表示执行 APP_NAME 的所有用例
;2.KEYWORDS 和 TAGS 都支持逻辑组合,即 and/or/not 的表达式
;e.g. TAGS = L1 or smoke
;-----------------------------------------------
;本地文件测试套,将要执行的用例写入指定的 csv 文件
;默认为空从基础框架根目录开始e.g. CASE_FILE = case_list.txt
;如果这里有值APP_NAME KEYWORDS TAGS 的配置均不生效
CASE_FILE =
;=============================== RUNNER CONFIG ===================================
[runner]
;最大失败用例数量的占比
;比如:总执行用例数为 100, 若 MAX_FAIL = 0.5,则失败用例数达到 50 就会终止测试。
MAX_FAIL = 1
;单条用例的超时时间,如果一条用例的执行时间超时,这条用例会被停止,后续用例继续执行。
;单位为秒
;这是一个全局统一配置,如果某条用例需要单独配置超时时间,可以在用例中这样写:
;@pytest.mark.timeout(500)
;def test_xxx_001():
; ...
;会话超时(所有用例执行的超时时间)是根据全局超时配置和用例单独超时配置自动计算的。
CASE_TIME_OUT = 200
;失败用例重跑次数
;注意RERUN = 1 表示重跑 1 次,即第一次用例执行失败会自动重跑 1 次,总共执行 2 次;
;如果第 2 次执行成功,结果成功,失败亦为失败。
RERUN = 1
;失败录屏从第几次失败开始录制视频。
;比如 RECORD_FAILED_CASE = 1 ,表示用例第 1 次执行失败之后开始录屏RERUN >= RECORD_FAILED_CASE。
;1.关闭录屏RECORD_FAILED_CASE > RERUN
;2.每条用例都录屏RECORD_FAILED_CASE = 0
RECORD_FAILED_CASE = 1
;yes 每条用例执行之后进行环境清理
CLEAN_ALL = yes
;检查测试机分辨率, 比如1920x1080
;no: 表示不做分辨率校验
RESOLUTION = 1920x1080
;不跳过用例csv文件里面标记了 skip-xxx的用例不跳过
NOSKIP = no
;ignore fixed
;no只要标记了fixed的用例即使标记了skip-,也会执行;
;yesfixed不生效仅通过skip跳过用例
IFIXED = no
;要安装deb包的路径
;e.g : ~/Downloads/ 安装下载目录下的deb包如果是远程执行会自动拷贝到远程并安装。
DEB_PATH =
;DEBUG 模式执行用例,只收集不执行用例,也不做设备分辨率的检查。
DEBUG = no
;记录top命令查询的系统资源占用情况TOP = 3 表示记录前3个进程。
TOP =
;指定用例执行次数
REPEAT =
;默认在所有测试完成之后输出报错信息.
;yes, 测试过程中立即显示报错
DURING_FAIL = no
;注册自启服务
AUTOSTART = no
;=============================== REPORT CONFIG ===================================
[report]
;测试报告的title
REPORT_TITLE = YouQu Report
;测试报告的name
REPORT_NAME = YouQu Report
;测试报告的默认语言
;en:English
;ru:Русский
;zh:中文
;de:Deutsch
;nl:Nederlands
;he:Hebrew
;br:Brazil
;pl:Polski
;ja:日本語
;es:Español
;kr:한국어
;fr:Français
;az:Azərbaycanca
REPORT_LANGUAGE = zh
;用例执行完后生成的测试报告格式
;目前支持 allure, xml, json (支持同时生成)
REPORT_FORMAT = allure, xml, json
;指定报告生成的路径(相对项目根目录下)
ALLURE_REPORT_PATH = report/
XML_REPORT_PATH = report/
JSON_REPORT_PATH = report/
;=============================== GLOBAL CONFIG ===================================
[globalconfig]
;测试机的密码
PASSWORD = 1
;图像识别重试次数
IMAGE_MATCH_NUMBER = 1
;图像识别重试每次间隔等待时间
IMAGE_MATCH_WAIT_TIME = 1
;图像识别匹配度
IMAGE_RATE = 0.9
;截取当前屏幕实时图像保存路径,用于图像识别坐标
SCREEN_CACHE = /tmp/screen.png
;截取屏幕上指定区域图片,保存临时图片的路径
TMPDIR = /tmp/tmpdir
;系统主题
SYS_THEME = deepin
;OCR服务端地址不可随意修改
OCR_SERVER_HOST = youqu-dev.uniontech.com
;OpenCV服务端地址
OPENCV_SERVER_HOST = youqu-dev.uniontech.com
;=============================== PMS CONFIG ===================================
;PMS相关配置包含以下几个方面
;1.PMS测试套执行
;2.自动从PMS爬取数据并同步本地CSV文件
;3.PMS数据回填
[pms]
;PMS的用户名,如: ut001234
PMS_USER =
;PMS的密码
PMS_PASSWORD =
;PMS测试套的ID
;在PMS上查看用例“套件”链接: https://pms.uniontech.com/testsuite-view-495.html
;测试套ID为: 495
SUITE_ID =
;数据回填必须关联PMS测试单
;在PMS上查看测试单链接: https://pms.uniontech.com/testtask-cases-20747.html
;测试单ID为: 20747
TASK_ID =
;将测试结果数据回填到PMS
;为空: 表示不回填,不会在每条用例执行完之后生成json结果文件;
;async: 表示逐条异步回填,后面一条执行开始时通过子线程对前一条用例的执行结果进行回填,如此实现时间效率最大化;
;finish: 表示所有用例执行完成之后逐个回填(PMS不支持并发);
SEND_PMS =
;数据回填的触发者
;auto: 框架自动回填,配合SEND_PMS配置使用,你可以选择在不同的阶段进行数据回填;
;hand: 手动回填,每条用例仍然会生成json文件,但框架不会进行数据回填,需要你可以在你想要发送的时间点手动触发回填;
TRIGGER = auto
;PMS回填的重试次数
;如果接口请求失败,会进行重试
SEND_PMS_RETRY_NUMBER = 2
[csv_link_pms_lib]
;caselib: 用例库
;testcase: 产品库用例
CASE_FROM = caselib
[csv_link_pms_id]
;同步PMS数据到本地CSV文件必须要配置的配置项
;key是本地CSV文件的文件名称;
;value是对应PMS上的模块ID;
;比如要同步音乐的数据, 首先需要将配置 APP_NAME = deepin-music
;CSV文件名称为music.csv其在PMS上的用例为: https://pms.uniontech.com/caselib-browse-81.html
;因此应该配置为: music = 81
;这样才能将PMS与本地CSV文件建立联系。
;如果你的应用分了很多模块,只需要将对应的信息依次配置好就行了。
music =
[export_csv]
;导出的csv文件名称默认 case_list.csv
CSV_FILE = case_list.csv
;exportcsv 命令导出 case_list.csv 文件时配置的字段名,用例名称默认存在第一列,无需添加
CSV_HEARD = 用例级别,用例类型,测试级别,是否跳过
[log_cli]
;日志相关配置(不打印构造函数和魔法函数的功能说明)
;批量执行时,终端输出的日志级别 DEBUG/INFO/ERROR
LOG_LEVEL = DEBUG
# ============= 自动输出日志的配置 ================
;支持类名以 xxx 开头的,自动将函数说明打印为日志, 多个参数以逗号隔开
CLASS_NAME_STARTSWITH = Assert
;支持类名以 xxx 结尾的,自动将函数说明打印为日志,多个参数以逗号隔开
CLASS_NAME_ENDSWITH = Widget
;支持类名包含 xxx 的,自动将函数说明打印为日志,多个参数以逗号隔开
CLASS_NAME_CONTAIN = ShortCut
# ==============================================
```
配置完成之后,直接在命令行执行 `manage.py` 就好了。
```shell
youqu manage.py run
```
##### 2.1.2. 命令行参数
#### 2.1. 命令行参数
通过命令行参数配置参数
以下为 `youqu manage.py run` 提供的一些参数选项:
以下为 `youqu manage.py run` 提供的一些常用的参数选项:
```coffeescript
-h, --help show this help message and exit
@ -426,7 +189,236 @@ youqu manage.py run --app deepin-music --keywords "xxx" --tags "xxx"
--app 入参还支持 `autotest_xxx``apps/autotest_xxx` 两种写法,方便在输入命令的过程中使用补全,下面的远程执行功能同样支持。
#### 2.2. 远程执行
#### 2.2. 配置文件
通过配置文件配置参数
在配置文件 `setting/globalconfig.ini` 里面支持配置对执行的一些参数进行配置,常用的如:
```ini
;=============================== RUN CONFIG ===================================
[run]
;执行的应用名称
;为空表示执行 apps/ 目录下所有应用的用例
APP_NAME =
;执行包含关键词的用例
KEYWORDS =
;执行包含用例标签的用例
;-----------------------------------------------------------
;1.KEYWORDS 和 TAGS 都为空表示执行 APP_NAME 的所有用例
;2.KEYWORDS 和 TAGS 都支持逻辑组合,即 and/or/not 的表达式
;比如TAGS = L1 or smoke ,表示执行标签带有 L1 或 somke 标签的用例;
;这两个参数也可以同时使用,可以组合出任意的用例集合,只有想不到没有办不到。
;-----------------------------------------------------------
TAGS =
;本地文件测试套,将要执行的用例写入指定的 csv 文件
;默认为空从基础框架根目录开始e.g. CASE_FILE = case_list.txt
;如果这里有值APP_NAME KEYWORDS TAGS 的配置均不生效
CASE_FILE =
;最大失败用例数量的占比
;比如:总执行用例数为 100, 若 MAX_FAIL = 0.5,则失败用例数达到 50 就会终止测试。
MAX_FAIL = 1
;单条用例的超时时间,如果一条用例的执行时间超时,这条用例会被停止,后续用例继续执行。
;单位为秒
;这是一个全局统一配置,如果某条用例需要单独配置超时时间,可以在用例中这样写:
;@pytest.mark.timeout(500)
;def test_xxx_001():
; ...
;会话超时(所有用例执行的超时时间)是根据全局超时配置和用例单独超时配置自动计算的。
CASE_TIME_OUT = 200
;失败用例重跑次数
;注意RERUN = 1 表示重跑 1 次,即第一次用例执行失败会自动重跑 1 次,总共执行 2 次;
;如果第 2 次执行成功,结果成功,失败亦为失败。
RERUN = 1
;失败录屏从第几次失败开始录制视频。
;比如 RECORD_FAILED_CASE = 1 ,表示用例第 1 次执行失败之后开始录屏RERUN >= RECORD_FAILED_CASE。
;1.关闭录屏RECORD_FAILED_CASE > RERUN
;2.每条用例都录屏RECORD_FAILED_CASE = 0
RECORD_FAILED_CASE = 1
;yes 每条用例执行之后进行环境清理
CLEAN_ALL = yes
;检查测试机分辨率, 比如1920x1080
;no: 表示不做分辨率校验
RESOLUTION = 1920x1080
;不跳过用例csv文件里面标记了 skip-xxx的用例不跳过
NOSKIP = no
;ignore fixed
;no只要标记了fixed的用例即使标记了skip-,也会执行;
;yesfixed不生效仅通过skip跳过用例
IFIXED = no
;要安装deb包的路径
;e.g : ~/Downloads/ 安装下载目录下的deb包如果是远程执行会自动拷贝到远程并安装。
DEB_PATH =
;DEBUG 模式执行用例,只收集不执行用例,也不做设备分辨率的检查。
DEBUG = no
;记录top命令查询的系统资源占用情况TOP = 3 表示记录前3个进程。
TOP =
;指定用例执行次数
REPEAT =
;默认在所有测试完成之后输出报错信息.
;yes, 测试过程中立即显示报错
DURING_FAIL = no
;注册自启服务
AUTOSTART = no
;测试机的密码
PASSWORD = 1
;图像识别重试次数
IMAGE_MATCH_NUMBER = 1
;图像识别重试每次间隔等待时间
IMAGE_MATCH_WAIT_TIME = 1
;图像识别匹配度
IMAGE_RATE = 0.9
;截取当前屏幕实时图像保存路径,用于图像识别坐标
SCREEN_CACHE = /tmp/screen.png
;截取屏幕上指定区域图片,保存临时图片的路径
TMPDIR = /tmp/tmpdir
;系统主题
SYS_THEME = deepin
;OCR服务端地址不可随意修改
OCR_SERVER_HOST = youqu-dev.uniontech.com
;OpenCV服务端地址
OPENCV_SERVER_HOST = youqu-dev.uniontech.com
;=============================== REPORT CONFIG ===================================
[report]
;测试报告的title
REPORT_TITLE = YouQu Report
;测试报告的name
REPORT_NAME = YouQu Report
;测试报告的默认语言
;en:English
;ru:Русский
;zh:中文
;de:Deutsch
;nl:Nederlands
;he:Hebrew
;br:Brazil
;pl:Polski
;ja:日本語
;es:Español
;kr:한국어
;fr:Français
;az:Azərbaycanca
REPORT_LANGUAGE = zh
;用例执行完后生成的测试报告格式
;目前支持 allure, xml, json (支持同时生成)
REPORT_FORMAT = allure, xml, json
;指定报告生成的路径(相对项目根目录下)
ALLURE_REPORT_PATH = report/
XML_REPORT_PATH = report/
JSON_REPORT_PATH = report/
;=============================== PMS CONFIG ===================================
;PMS相关配置包含以下几个方面
;1.PMS测试套执行
;2.自动从PMS爬取数据并同步本地CSV文件
;3.PMS数据回填
[pmsctl]
;PMS的用户名,如: ut001234
PMS_USER =
;PMS的密码
PMS_PASSWORD =
;PMS测试套的ID
;在PMS上查看用例“套件”链接: https://pms.uniontech.com/testsuite-view-495.html
;测试套ID为: 495
SUITE_ID =
;数据回填必须关联PMS测试单
;在PMS上查看测试单链接: https://pms.uniontech.com/testtask-cases-20747.html
;测试单ID为: 20747
TASK_ID =
;将测试结果数据回填到PMS
;为空: 表示不回填,不会在每条用例执行完之后生成json结果文件;
;async: 表示逐条异步回填,后面一条执行开始时通过子线程对前一条用例的执行结果进行回填,如此实现时间效率最大化;
;finish: 表示所有用例执行完成之后逐个回填(PMS不支持并发);
SEND_PMS =
;数据回填的触发者
;auto: 框架自动回填,配合SEND_PMS配置使用,你可以选择在不同的阶段进行数据回填;
;hand: 手动回填,每条用例仍然会生成json文件,但框架不会进行数据回填,需要你可以在你想要发送的时间点手动触发回填;
TRIGGER = auto
;PMS回填的重试次数
;如果接口请求失败,会进行重试
SEND_PMS_RETRY_NUMBER = 2
;caselib: 用例库
;testcase: 产品库用例
CASE_FROM = caselib
[pmsctl-pms_link_csv]
;同步PMS数据到本地CSV文件必须要配置的配置项
;key是本地CSV文件的文件名称;
;value是对应PMS上的模块ID;
;比如要同步音乐的数据, 首先需要将配置 APP_NAME = deepin-music
;CSV文件名称为music.csv其在PMS上的音乐用例库的URL为: https://pms.uniontech.com/caselib-browse-81.html
;因此应该配置为: music = 81
;这样才能将PMS与本地CSV文件建立联系。
;如果你的应用分了很多模块,只需要将对应的信息依次配置好就行了。
music =
[csvctl]
;将py文件的case id同步到csv文件
;yes, 开启同步
PY_ID_TO_CSV = no
;导出 case_list.csv 文件时配置的字段名,用例名称默认存在第一列,无需添加
EXPORT_CSV_HEARD = 用例级别,用例类型,测试级别,是否跳过
[log_cli]
;日志相关配置(不打印构造函数和魔法函数的功能说明)
;批量执行时,终端输出的日志级别 DEBUG/INFO/ERROR
LOG_LEVEL = DEBUG
# ============= 自动输出日志的配置 ================
;支持类名以 xxx 开头的,自动将函数说明打印为日志, 多个参数以逗号隔开
CLASS_NAME_STARTSWITH = Assert
;支持类名以 xxx 结尾的,自动将函数说明打印为日志,多个参数以逗号隔开
CLASS_NAME_ENDSWITH = Widget
;支持类名包含 xxx 的,自动将函数说明打印为日志,多个参数以逗号隔开
CLASS_NAME_CONTAIN = ShortCut
# ==============================================
```
配置完成之后,直接在命令行执行 `manage.py` 就好了。
### 3. 远程执行
远程执行就是用本地作为服务端控制远程机器执行,远程机器执行的用例相同;
@ -436,7 +428,7 @@ youqu manage.py run --app deepin-music --keywords "xxx" --tags "xxx"
youqu manage.py remote
```
##### 2.2.1. 远程多机器分布式异步执行
#### 3.1. 远程多机器分布式异步执行
![](https://pic.imgdb.cn/item/64f6d3c0661c6c8e549f8ca5.png)
@ -523,7 +515,7 @@ sudo systemctl enable ssh
配置文件其他相关配置项详细说明,请查看配置文件中的注释内容。
##### 2.2.2. 远程多机器分布式异步负载均衡执行
#### 3.2. 远程多机器分布式异步负载均衡执行
多机器分布式异步负载均衡执行也是用本地作为服务端控制远程机器执行,但远程机器执行的用例不同,而是所有远程机器执行的用例之和,为你想要执行的用例集;
@ -533,92 +525,15 @@ sudo systemctl enable ssh
![](https://pic.imgdb.cn/item/64f6d694661c6c8e54a1025b.png)
使用方法和前面一样,只是需要增加一个参数:
使用方法和前面一样,只是需要增加一个参数 `--parallel`
```shell
youqu manage.py remote -a deepin-music -c uos@10.8.13.33/uos@10.8.13.34 -k "xxx" -t "xxx" --parallel no
```
#### 2.3. PMS 数据回填
## 四、脚手架创建工程
测试单关联的用例,自动化测试对应的去跑这些关联的用例,并且将执行的结果回填的测试用例的状态里面。
PMS 数据回填主要有三种方式:
1异步回填
在用例执行的过程中,采用异步的方式去进行数据回填,直白的讲就是,第二条用例开始跑的时候,通过子线程去做第一条用例的数据回填,如此循环,直到所有用例执行结束;
这种方案的时间效率最高的,因为理论上用例的执行时间是大于数据回填的接口请求时间的,也就是说,当用例执行完之后,数据回填也完成了。
使用方法,在 `globalconfig.ini` 里面配置以下参数:(以下涉及到的参数配置都是在配置文件里面进行配置)
```ini
PMS_USER = PMS账号
PMS_PASSWORD = PMS密码
SEND_PMS = async
TASK_ID = 测试单ID
TRIGGER = auto
APP_NAME = 这个参数可填可不填,但是填了可以提高用例的执行速度,因为在用例收集阶段可以指定到具体的应用库。(下同)
```
2用例执行完之后回填
等所有用例执行完之后,再逐个进行回填的接口请求,此方案时间效率比较低。
使用方法:
```ini
PMS_USER = PMS账号
PMS_PASSWORD = PMS密码
SEND_PMS = finish
TASK_ID = 测试单ID
TRIGGER = auto
APP_NAME =
```
3手动回填
所有用例执行完之后不做回填的接口请求,后续手动将结果进行回填请求。
用例执行时配置:
```ini
PMS_USER = PMS账号
PMS_PASSWORD = PMS密码
SEND_PMS = finish
TASK_ID = 测试单ID
TRIGGER = hand
APP_NAME =
```
后续手动回填方法:
```shell
youqu manage.py pms --send2task yes
```
##### 可能遇到的问题
由同学可能会发现,怎么回填一次之后,后面想再次回填就不生效了;
这是因为为了应对前面提到的多种数据回填的方式,在 `report` 目录下会由 `pms_xxx` 开头的目录,记录了用例的执行结果和回填情况,如果这条用例之前已经回填过了,后续就不会再此触发回填了;
如果你想重新做回填,你可以把 `report/pms_xxx` 目录删掉,这样就可以重新做数据回填了;
#### 2.4. 导出 CSV 文件
框架提供导出指定标签用例的功能:
```shell
youqu manage.py exportcsv -a deepin-album -t CICD
```
表示导出 `deepin-album` 的用例中标记了 `CICD` 标签的用例,导出 `CSV` 文件的字段格式已经适配了 `CICD` 的要求。
#### 2.5. 脚手架新建 APP 工程
新建一个 APP 工程:
创建一个 APP 工程:
```shell
youqu manage.py startapp autotest_deepin_some
@ -656,37 +571,8 @@ apps
`autotest_deepin_some` 是你的工程名称,比如:`autotest_deepin_music`
在此基础上,你可以快速的开始你的 AT 项目。
在此基础上,你可以快速的开始你的 AT 项目,更重要的是确保创建工程的规范性
#### 2.6. 用例标签自动同步
---------------------------
用于自动同步 `PMS` 用例标签数据至本地 `CSV` 文件,主要通过以下几个配置来控制:
```ini
APP_NAME = # 指定要同步的应用名称
PMS_USER = # PMS的用户名
PMS_PASSWORD = # PMS的密码
```
`[csv_link_pms_id]` 节点下指定 `csv` 文件名与 `PMS` 用例模块的对应关系,比如:
```ini
[csv_link_pms_id]
;同步PMS数据到本地CSV文件必须要配置的配置项
;key是本地CSV文件的文件名称;
;value是对应PMS上的模块ID;
;比如要同步音乐的数据, 首先需要将配置 APP_NAME = deepin-music
;CSV文件名称为music.csv其在PMS上的用例为: https://pms.uniontech.com/testcase-browse-53.html
;因此应该配置为: music = 53
;这样才能将PMS与本地CSV文件建立联系。
;如果你的应用分了很多模块,只需要将对应的信息依次配置好就行了。
music = 53
```
将以上信息配置好之后,在命令行执行:
```shell
youqu manage.py pms --pms2csv yes
```
每次执行时原 `csv` 文件会自动备份在 `report/csv_back` 目录下,因此你不用担心脚本执行导致你的数据丢失。
更多内容请查看【框架功能介绍】

File diff suppressed because it is too large Load Diff

114
manage.py
View File

@ -78,6 +78,11 @@ class Manage:
client_password=None,
parallel=None,
autostart=None,
pyid2csv=None,
export_csv_file=None,
pms2csv=None,
pms_link_csv=None,
send2task=None,
):
self.default_app = app
self.default_keywords = keywords
@ -116,6 +121,11 @@ class Manage:
self.default_client_password = client_password
self.default_parallel = parallel
self.default_autostart = autostart
self.default_pyid2csv = pyid2csv
self.default_export_csv_file = export_csv_file
self.default_pms2csv = pms2csv
self.default_pms_link_csv = pms_link_csv
self.default_send2task = send2task
say(GlobalConfig.PROJECT_NAME)
version_font = "slick"
@ -135,8 +145,8 @@ class Manage:
subparsers = parser.add_subparsers(help="子命令")
sub_parser_remote = subparsers.add_parser(SubCmd.remote.value)
sub_parser_run = subparsers.add_parser(SubCmd.run.value)
sub_parser_pms = subparsers.add_parser(SubCmd.pms.value)
sub_parser_export_csv = subparsers.add_parser(SubCmd.exportcsv.value)
sub_parser_pms = subparsers.add_parser(SubCmd.pmsctl.value)
sub_parser_csv = subparsers.add_parser(SubCmd.csvctl.value)
help_tip = (
f"\033[0;32mmanage.py\033[0m 支持 \033[0;32m{[i.value for i in SubCmd]}\033[0m 命令, "
@ -152,15 +162,19 @@ class Manage:
elif self.cmd_args[0] == SubCmd.run.value:
_local_kwargs, _ = self.local_runner(parser, sub_parser_run)
LocalRunner(**_local_kwargs).local_run()
elif self.cmd_args[0] == SubCmd.pms.value:
elif self.cmd_args[0] == SubCmd.pmsctl.value:
self.pms_control(parser, sub_parser_pms)
elif self.cmd_args[0] == SubCmd.exportcsv.value:
self.export_csv(parser, sub_parser_export_csv)
elif self.cmd_args[0] == SubCmd.csvctl.value:
self.csv_control(parser, sub_parser_csv)
elif self.cmd_args[0] == SubCmd.startapp.value:
start_config_log = f"{SubCmd.startapp.value} 后面直接加工程名称,工程名称以 'autotest_' 开头"
try:
if self.cmd_args[1] in ("-h", "--help"):
print(start_config_log)
sys.exit(0)
self.start_app(self.cmd_args[1])
except IndexError:
print(f"参数异常 {SubCmd.startapp.value} 后面需要跟参数")
logger.error(f"参数异常: {start_config_log}")
elif self.cmd_args[0] in ["-h", "--help"]:
print(help_tip)
else:
@ -185,7 +199,7 @@ class Manage:
help="搭建测试环境,如果为yes不管send_code是否为yes都会发送代码到测试机."
)
sub_parser_remote.add_argument(
"-p", "--client_password", default="", help="测试机密码(全局)"
"-cp", "--client_password", default="", help="测试机密码(全局)"
)
sub_parser_remote.add_argument(
"-y", "--parallel", default="",
@ -274,10 +288,10 @@ class Manage:
"--deb_path", default="", help="需要安装deb包的本地路径"
)
sub_parser_run.add_argument(
"--pms_user", default="", help="pms 用户名"
"-u", "--pms_user", default="", help="pms 用户名"
)
sub_parser_run.add_argument(
"--pms_password", default="", help="pms 密码"
"-p", "--pms_password", default="", help="pms 密码"
)
sub_parser_run.add_argument(
"--suite_id", default="", help="pms 测试套ID"
@ -309,7 +323,7 @@ class Manage:
"--line", default="", help="执行的业务线写入json文件"
)
sub_parser_run.add_argument(
"--autostart", default="", help="用例执行程序注册到开机自启服务"
"--autostart", default="", help="重启类场景开启letmego执行方案"
)
args = parser.parse_args()
local_kwargs = {
@ -359,7 +373,22 @@ class Manage:
def pms_control(self, parser=None, sub_parser_pms=None):
"""pms相关功能命令行参数"""
sub_parser_pms.add_argument(
"--pms2csv", choices=["yes", ""], default="", help="爬取数据到csv"
"-a", "--app", default="", help="应用名称deepin-music"
)
sub_parser_pms.add_argument(
"-u", "--pms_user", default="", help="pms 用户名"
)
sub_parser_pms.add_argument(
"-p", "--pms_password", default="", help="pms 密码"
)
sub_parser_pms.add_argument(
"-pls", "--pms_link_csv", default="",
help="pms 和 csv 的映射关系比如music:81/album:82多个配置使用'/'分隔"
)
sub_parser_pms.add_argument(
"-p2c", "--pms2csv", action='store_const', const=True, default=False,
help="从PMS爬取用例标签到csv文件"
)
sub_parser_pms.add_argument(
"--send2task",
@ -374,16 +403,29 @@ class Manage:
help="触发者"
)
args = parser.parse_args()
csv = args.pms2csv
send_to_task = args.send2task
task_id = args.task_id if args.task_id else GlobalConfig.TASK_ID
trigger = args.trigger if args.trigger else GlobalConfig.TRIGGER
if csv:
Pms2Csv().write_new_csv()
elif send_to_task and task_id and trigger == "hand":
pms_kwargs = {
Args.app_name.value: args.app or self.default_app,
Args.pms_user.value: args.pms_user or self.default_pms_user,
Args.pms_password.value: args.pms_password or self.default_pms_password,
Args.pms2csv.value: args.pms2csv or self.default_pms2csv,
Args.pms_link_csv.value: args.pms_link_csv or self.default_pms_link_csv,
Args.send2task.value: args.send2task or self.default_send2task,
Args.task_id.value: args.task_id or GlobalConfig.TASK_ID,
Args.trigger.value: args.trigger or GlobalConfig.TRIGGER,
}
if pms_kwargs.get(Args.pms2csv.value):
Pms2Csv(
app_name=pms_kwargs.get(Args.app_name.value),
user=pms_kwargs.get(Args.pms_user.value) or GlobalConfig.PMS_USER,
password=pms_kwargs.get(Args.pms_password.value) or GlobalConfig.PMS_PASSWORD,
pms_link_csv=pms_kwargs.get(Args.pms_link_csv.value),
).write_new_csv()
elif pms_kwargs.get(Args.send2task.value) \
and pms_kwargs.get(Args.task_id.value) \
and pms_kwargs.get(Args.trigger.value) == "hand":
Send2Pms().send2pms(
Send2Pms.case_res_path(task_id),
Send2Pms.data_send_result_csv(task_id)
Send2Pms.case_res_path(pms_kwargs.get(Args.task_id.value)),
Send2Pms.data_send_result_csv(pms_kwargs.get(Args.trigger.value))
)
@staticmethod
@ -394,26 +436,40 @@ class Manage:
start.copy_template_to_apps()
start.rewrite()
def export_csv(self, parser=None, sub_parser_export_csv=None):
"""导出 csv"""
sub_parser_export_csv.add_argument(
def csv_control(self, parser=None, sub_parser_csv=None):
"""csv相关功能命令参数"""
sub_parser_csv.add_argument(
"-a", "--app", default="", help="应用名称deepin-music"
)
sub_parser_export_csv.add_argument(
sub_parser_csv.add_argument(
"-k", "--keywords", default="", help="用例的关键词"
)
sub_parser_export_csv.add_argument(
sub_parser_csv.add_argument(
"-t", "--tags", default="", help="用例的标签"
)
sub_parser_csv.add_argument(
"-p2c", "--pyid2csv", action='store_const', const=True, default=False,
help="将用例py文件的case id同步到对应的csv文件中"
)
sub_parser_csv.add_argument(
"-ec", "--export_csv_file", default="",help="导出csv文件名称比如case_list.csv"
)
args = parser.parse_args()
export_kwargs = {
csv_kwargs = {
Args.app_name.value: args.app or self.default_app,
Args.keywords.value: args.keywords or self.default_keywords,
Args.tags.value: args.tags or self.default_tags,
"exportcsv": True
Args.pyid2csv.value: args.pyid2csv or self.default_pyid2csv,
Args.export_csv_file.value: args.export_csv_file or self.default_export_csv_file,
"collection_only": True
}
LocalRunner(**export_kwargs).local_run()
if csv_kwargs.get(Args.pyid2csv.value) or GlobalConfig.PY_ID_TO_CSV:
from src.csvctl import CsvControl
CsvControl(csv_kwargs.get(Args.app_name.value)).delete_mark_in_csv_if_not_exists_py()
if csv_kwargs.get(Args.pyid2csv.value) or csv_kwargs.get(Args.export_csv_file.value):
LocalRunner(**csv_kwargs).local_run()
else:
logger.error("需要传递一些参数,您可以使用 -h 或 --help 查看支持的参数")
if __name__ == "__main__":
try:

View File

@ -1,5 +1,5 @@
;=============================== CASE CONFIG ===================================
[case]
;=============================== RUN CONFIG ===================================
[run]
;执行的应用名称
;为空表示执行 apps/ 目录下所有应用的用例
APP_NAME =
@ -8,20 +8,19 @@ APP_NAME =
KEYWORDS =
;执行包含用例标签的用例
TAGS =
;-----------------------------------------------
;-----------------------------------------------------------
;1.KEYWORDS 和 TAGS 都为空表示执行 APP_NAME 的所有用例
;2.KEYWORDS 和 TAGS 都支持逻辑组合,即 and/or/not 的表达式
;e.g. TAGS = L1 or smoke
;-----------------------------------------------
;比如TAGS = L1 or smoke ,表示执行标签带有 L1 或 somke 标签的用例;
;这两个参数也可以同时使用,可以组合出任意的用例集合,只有想不到没有办不到。
;-----------------------------------------------------------
TAGS =
;本地文件测试套,将要执行的用例写入指定的 csv 文件
;默认为空从基础框架根目录开始e.g. CASE_FILE = case_list.txt
;如果这里有值APP_NAME KEYWORDS TAGS 的配置均不生效
CASE_FILE =
;=============================== RUNNER CONFIG ===================================
[runner]
;最大失败用例数量的占比
;比如:总执行用例数为 100, 若 MAX_FAIL = 0.5,则失败用例数达到 50 就会终止测试。
MAX_FAIL = 1
@ -81,6 +80,33 @@ DURING_FAIL = no
;注册自启服务
AUTOSTART = no
;测试机的密码
PASSWORD = 1
;图像识别重试次数
IMAGE_MATCH_NUMBER = 1
;图像识别重试每次间隔等待时间
IMAGE_MATCH_WAIT_TIME = 1
;图像识别匹配度
IMAGE_RATE = 0.9
;截取当前屏幕实时图像保存路径,用于图像识别坐标
SCREEN_CACHE = /tmp/screen.png
;截取屏幕上指定区域图片,保存临时图片的路径
TMPDIR = /tmp/tmpdir
;系统主题
SYS_THEME = deepin
;OCR服务端地址不可随意修改
OCR_SERVER_HOST = youqu-dev.uniontech.com
;OpenCV服务端地址
OPENCV_SERVER_HOST = youqu-dev.uniontech.com
;=============================== REPORT CONFIG ===================================
[report]
;测试报告的title
@ -114,41 +140,12 @@ ALLURE_REPORT_PATH = report/
XML_REPORT_PATH = report/
JSON_REPORT_PATH = report/
;=============================== GLOBAL CONFIG ===================================
[globalconfig]
;测试机的密码
PASSWORD = 1
;图像识别重试次数
IMAGE_MATCH_NUMBER = 1
;图像识别重试每次间隔等待时间
IMAGE_MATCH_WAIT_TIME = 1
;图像识别匹配度
IMAGE_RATE = 0.9
;截取当前屏幕实时图像保存路径,用于图像识别坐标
SCREEN_CACHE = /tmp/screen.png
;截取屏幕上指定区域图片,保存临时图片的路径
TMPDIR = /tmp/tmpdir
;系统主题
SYS_THEME = deepin
;OCR服务端地址不可随意修改
OCR_SERVER_HOST = youqu-dev.uniontech.com
;OpenCV服务端地址
OPENCV_SERVER_HOST = youqu-dev.uniontech.com
;=============================== PMS CONFIG ===================================
;PMS相关配置包含以下几个方面
;1.PMS测试套执行
;2.自动从PMS爬取数据并同步本地CSV文件
;3.PMS数据回填
[pms]
[pmsctl]
;PMS的用户名,如: ut001234
PMS_USER =
@ -180,28 +177,29 @@ TRIGGER = auto
;如果接口请求失败,会进行重试
SEND_PMS_RETRY_NUMBER = 2
[csv_link_pms_lib]
;caselib: 用例库
;testcase: 产品库用例
CASE_FROM = caselib
[csv_link_pms_id]
[pmsctl-pms_link_csv]
;同步PMS数据到本地CSV文件必须要配置的配置项
;key是本地CSV文件的文件名称;
;value是对应PMS上的模块ID;
;比如要同步音乐的数据, 首先需要将配置 APP_NAME = deepin-music
;CSV文件名称为music.csv其在PMS上的用例为: https://pms.uniontech.com/caselib-browse-81.html
;CSV文件名称为music.csv其在PMS上的音乐用例库的URL为: https://pms.uniontech.com/caselib-browse-81.html
;因此应该配置为: music = 81
;这样才能将PMS与本地CSV文件建立联系。
;如果你的应用分了很多模块,只需要将对应的信息依次配置好就行了。
music =
[export_csv]
;导出的csv文件名称默认 case_list.csv
CSV_FILE = case_list.csv
[csvctl]
;将py文件的case id同步到csv文件
;yes, 开启同步
PY_ID_TO_CSV = no
;导出 case_list.csv 文件时配置的字段名,用例名称默认存在第一列,无需添加
EXPORT_CSV_HEARD = 用例级别,用例类型,测试级别,是否跳过
;exportcsv 命令导出 case_list.csv 文件时配置的字段名,用例名称默认存在第一列,无需添加
CSV_HEARD = 用例级别,用例类型,测试级别,是否跳过
[log_cli]
;日志相关配置(不打印构造函数和魔法函数的功能说明)

View File

@ -68,28 +68,38 @@ class _GlobalConfig:
# ====================== GLOBAL CONFIG INI ======================
# Get config file object
GLOBAL_CONFIG_FILE_PATH = join(SETTING_PATH, "globalconfig.ini")
# [case]
case_cfg = GetCfg(GLOBAL_CONFIG_FILE_PATH, "case")
APP_NAME = case_cfg.get("APP_NAME", default="")
KEYWORDS = case_cfg.get("KEYWORDS", default="")
TAGS = case_cfg.get("TAGS", default="")
CASE_FILE = case_cfg.get("CASE_FILE", default="")
# [runner]
runner_cfg = GetCfg(GLOBAL_CONFIG_FILE_PATH, "runner")
RERUN = runner_cfg.get("RERUN", default=1)
RECORD_FAILED_CASE = runner_cfg.get("RECORD_FAILED_CASE", default=1)
MAX_FAIL = runner_cfg.get("MAX_FAIL", default=1)
CASE_TIME_OUT = runner_cfg.get("CASE_TIME_OUT", default=200)
CLEAN_ALL = runner_cfg.get("CLEAN_ALL", default="yes")
RESOLUTION = runner_cfg.get("RESOLUTION", default="1920x1080")
NOSKIP = runner_cfg.get_bool("NOSKIP", default=False)
IFIXED = runner_cfg.get_bool("IFIXED", default=False)
DURING_FAIL = runner_cfg.get_bool("DURING_FAIL", default=False)
AUTOSTART = runner_cfg.get_bool("AUTOSTART", default=False)
TOP = runner_cfg.get("TOP", default="")
REPEAT = runner_cfg.get("REPEAT", default="")
DEB_PATH = runner_cfg.get("DEB_PATH", default="~/Downloads/")
DEBUG = runner_cfg.get_bool("DEBUG", default=False)
# [run]
run_cfg = GetCfg(GLOBAL_CONFIG_FILE_PATH, "run")
APP_NAME = run_cfg.get("APP_NAME", default="")
KEYWORDS = run_cfg.get("KEYWORDS", default="")
TAGS = run_cfg.get("TAGS", default="")
CASE_FILE = run_cfg.get("CASE_FILE", default="")
RERUN = run_cfg.get("RERUN", default=1)
RECORD_FAILED_CASE = run_cfg.get("RECORD_FAILED_CASE", default=1)
MAX_FAIL = run_cfg.get("MAX_FAIL", default=1)
CASE_TIME_OUT = run_cfg.get("CASE_TIME_OUT", default=200)
CLEAN_ALL = run_cfg.get("CLEAN_ALL", default="yes")
RESOLUTION = run_cfg.get("RESOLUTION", default="1920x1080")
NOSKIP = run_cfg.get_bool("NOSKIP", default=False)
IFIXED = run_cfg.get_bool("IFIXED", default=False)
DURING_FAIL = run_cfg.get_bool("DURING_FAIL", default=False)
AUTOSTART = run_cfg.get_bool("AUTOSTART", default=False)
TOP = run_cfg.get("TOP", default="")
REPEAT = run_cfg.get("REPEAT", default="")
DEB_PATH = run_cfg.get("DEB_PATH", default="~/Downloads/")
DEBUG = run_cfg.get_bool("DEBUG", default=False)
PASSWORD = run_cfg.get("PASSWORD", default="1")
if not PASSWORD:
raise ValueError("测试机密码不能未空")
IMAGE_MATCH_NUMBER = run_cfg.get("IMAGE_MATCH_NUMBER", default=1)
IMAGE_MATCH_WAIT_TIME = run_cfg.get("IMAGE_MATCH_WAIT_TIME", default=1)
IMAGE_RATE = run_cfg.get("IMAGE_RATE", default=0.9)
SCREEN_CACHE = run_cfg.get("SCREEN_CACHE", default="/tmp/screen.png")
TMPDIR = run_cfg.get("TMPDIR", default="/tmp/tmpdir")
SYS_THEME = run_cfg.get("SYS_THEME", default="deepin")
OCR_SERVER_HOST = run_cfg.get("OCR_SERVER_HOST", default="localhost")
OPENCV_SERVER_HOST = run_cfg.get("OPENCV_SERVER_HOST", default="localhost")
# [report]
report_cfg = GetCfg(GLOBAL_CONFIG_FILE_PATH, "report")
REPORT_TITLE = report_cfg.get("REPORT_TITLE", default="YouQu Report")
@ -105,22 +115,9 @@ class _GlobalConfig:
JSON_REPORT_PATH = join(
ROOT_DIR, report_cfg.get("JSON_REPORT_PATH", default="report/")
)
# [globalconfig]
global_cfg = GetCfg(GLOBAL_CONFIG_FILE_PATH, "globalconfig")
PASSWORD = global_cfg.get("PASSWORD", default="1")
if not PASSWORD:
raise ValueError("测试机密码不能未空")
IMAGE_MATCH_NUMBER = global_cfg.get("IMAGE_MATCH_NUMBER", default=1)
IMAGE_MATCH_WAIT_TIME = global_cfg.get("IMAGE_MATCH_WAIT_TIME", default=1)
IMAGE_RATE = global_cfg.get("IMAGE_RATE", default=0.9)
SCREEN_CACHE = global_cfg.get("SCREEN_CACHE", default="/tmp/screen.png")
TMPDIR = global_cfg.get("TMPDIR", default="/tmp/tmpdir")
SYS_THEME = global_cfg.get("SYS_THEME", default="deepin")
OCR_SERVER_HOST = global_cfg.get("OCR_SERVER_HOST", default="localhost")
OPENCV_SERVER_HOST = global_cfg.get("OPENCV_SERVER_HOST", default="localhost")
# [pms]
pms_cfg = GetCfg(GLOBAL_CONFIG_FILE_PATH, "pms")
# [pmsctl]
pms_cfg = GetCfg(GLOBAL_CONFIG_FILE_PATH, "pmsctl")
PMS_USER = pms_cfg.get("PMS_USER", default="")
PMS_PASSWORD = pms_cfg.get("PMS_PASSWORD", default="")
SUITE_ID = pms_cfg.get("SUITE_ID", default="")
@ -132,6 +129,26 @@ class _GlobalConfig:
if TRIGGER not in ("auto", "hand"):
raise ValueError
SEND_PMS_RETRY_NUMBER = pms_cfg.get("SEND_PMS_RETRY_NUMBER", default=2)
CASE_FROM = pms_cfg.get("CASE_FROM", default="caselib")
# [csvctl]
csv_cfg = GetCfg(GLOBAL_CONFIG_FILE_PATH, "csvctl")
PY_ID_TO_CSV = csv_cfg.get_bool("PY_ID_TO_CSV", default=False)
EXPORT_CSV_HEARD = csv_cfg.get("EXPORT_CSV_HEARD", default="用例级别,用例类型,测试级别,是否跳过").replace(" ", "")
# [log_cli]
log_cli = GetCfg(GLOBAL_CONFIG_FILE_PATH, "log_cli")
LOG_LEVEL = log_cli.get("LOG_LEVEL", default="INFO")
CLASS_NAME_STARTSWITH = tuple(
log_cli.get("CLASS_NAME_STARTSWITH", default="Assert").replace(" ", "").split(",")
)
CLASS_NAME_ENDSWITH = tuple(
log_cli.get("CLASS_NAME_ENDSWITH", default="Widget").replace(" ", "").split(",")
)
CLASS_NAME_CONTAIN = tuple(
log_cli.get("CLASS_NAME_CONTAIN", default="ShortCut").replace(" ", "").split(",")
)
# ====================== 动态获取变量 ======================
# username
USERNAME = getuser()
@ -183,22 +200,6 @@ class _GlobalConfig:
top_cmd = "top -b -d 3 -w 512"
# [export_csv]
export_csv = GetCfg(GLOBAL_CONFIG_FILE_PATH, "export_csv")
CSV_FILE = export_csv.get("CSV_FILE", default="case_list.csv")
CSV_HEARD = export_csv.get("CSV_HEARD", default="用例级别,用例类型,测试级别,是否跳过").replace(" ", "")
# [log_cli]
log_cli = GetCfg(GLOBAL_CONFIG_FILE_PATH, "log_cli")
LOG_LEVEL = log_cli.get("LOG_LEVEL", default="INFO")
CLASS_NAME_STARTSWITH = tuple(
log_cli.get("CLASS_NAME_STARTSWITH", default="Assert").replace(" ", "").split(",")
)
CLASS_NAME_ENDSWITH = tuple(
log_cli.get("CLASS_NAME_ENDSWITH", default="Widget").replace(" ", "").split(",")
)
CLASS_NAME_CONTAIN = tuple(
log_cli.get("CLASS_NAME_CONTAIN", default="ShortCut").replace(" ", "").split(",")
)
GITHUB_URL = "https://github.com/linuxdeepin/deepin-autotest-framework"
DOCS_URL = "https://linuxdeepin.github.io/deepin-autotest-framework"
PyPI_URL = "https://pypi.org/project/youqu"
@ -228,11 +229,17 @@ class ConfStr(Enum):
@unique
class FixedCsvTitle(Enum):
case_id = "脚本ID"
pms_case_id = "PMS用例ID"
case_level = "用例级别"
case_type = "用例类型"
device_type = "设备类型"
case_from = "用例来源"
online_obj = "上线对象"
test_level = "测试级别"
skip_reason = "跳过原因"
fixed = "确认修复"
removed = "废弃用例"
pms_case_id = "PMS用例ID"
case_level = "用例级别"
@unique

14
setting/pmsmark.ini Normal file
View File

@ -0,0 +1,14 @@
[pms-mark-to-csv-mark]
;压力
reliability = STR
;功能
feature = FUNC
;安全
security = SEC
;兼容性
compatibility = CTS
;PERF
performance = PERF
;接口
interface = API

View File

@ -1 +1 @@
脚本ID,PMS用例ID,用例级别,跳过原因,确认修复,废弃用例
${FIXEDCSVTITLE}

View File

@ -65,7 +65,12 @@
## basic_frame_tag = dep_cfg.get("autotest-basic-frame")
## if _GlobalConfig.current_tag != basic_frame_tag:
## logger.error("应用库与基础框架对应版本不一致!")
## logger.error(
## "应用库与基础框架对应版本不一致!"
## f"YouQu版本为:{_GlobalConfig.current_tag},应用库版本为:{basic_frame_tag}"
## "如需如果您确认当前使用的YouQu版本正确请检查您的应用工程根目录下control文件"
## "记录的依赖的YouQu版本是否一致~"
## )
##Config = _Config()

93
src/csvctl.py Normal file
View File

@ -0,0 +1,93 @@
#!/usr/bin/env python3
# _*_ coding:utf-8 _*_
# SPDX-FileCopyrightText: 2023 UnionTech Software Technology Co., Ltd.
# SPDX-License-Identifier: GPL-2.0-only
import os
import re
from setting.globalconfig import GlobalConfig
from src.rtk._base import transform_app_name
class CsvControl:
"""csv control"""
def __init__(self, app_name=None):
self.walk_dir = (
f"{GlobalConfig.APPS_PATH}/autotest_{transform_app_name(app_name).replace('-', '_')}"
if app_name
else GlobalConfig.APPS_PATH
)
print(1)
def scan_csv_and_py(self):
"""scan csv and case py"""
csv_path_dict = {}
py_path_dict = {}
for root, _, files in os.walk(self.walk_dir):
py_files = []
for file in files:
if file.endswith(".csv") and file != "case_list.csv":
csv_path_dict[os.path.splitext(file)[0]] = f"{root}/{file}"
if file.startswith("test_") and file.endswith(".py"):
case_name = []
_case_name = re.findall(
r"test_(.*?)_(\d+)_\d+.py|test_(.*?)_(\d+).py", file
)
if _case_name:
_case_name = _case_name[0]
if isinstance(_case_name, tuple):
for i in _case_name:
if i:
case_name.append(i)
py_files.append([f"{root}/{file}", case_name[-1]])
py_path_dict[case_name[0]] = py_files
if not (csv_path_dict and py_path_dict):
return None
return csv_path_dict, py_path_dict
def delete_mark_in_csv_if_not_exists_py(self):
"""delete mark in csv if not exists case py"""
res = self.scan_csv_and_py()
if res is None:
return
csv_path_dict, py_path_dict = res
for csv_name in csv_path_dict:
for case_name in py_path_dict:
if csv_name == case_name:
csv_path = csv_path_dict.get(csv_name)
with open(csv_path, "r", encoding="utf-8") as f:
csv_txt_list = f.readlines()
taglines = [txt.strip().split(",") for txt in csv_txt_list[1:]]
new_taglines = []
py_case_paths = py_path_dict.get(case_name)
for tag in taglines:
try:
csv_case_id = f"{int(tag[0]):0>3}"
except ValueError as e:
raise ValueError(f"文件:{csv_path} 里面似乎格式有点问题,出现了一个报错:{e}")
for py_case in py_case_paths:
py_case_id = py_case[-1]
if csv_case_id == py_case_id:
new_taglines.append(tag)
break
else:
print(f"{tag} will remove from {csv_path}")
if new_taglines != taglines:
bak_path = f"{GlobalConfig.REPORT_PATH}/pyid2csv_back"
if not os.path.exists(bak_path):
os.makedirs(bak_path)
os.system(
f"cp {csv_path} {bak_path}/{GlobalConfig.TIME_STRING}_{csv_name}.csv"
)
new_csv_list = [csv_txt_list[0]] + [
",".join(i) + "\n" for i in new_taglines
]
with open(csv_path, "w+", encoding="utf-8") as f:
f.writelines(new_csv_list)
if __name__ == "__main__":
CsvControl().delete_mark_in_csv_if_not_exists_py()

View File

@ -8,23 +8,17 @@
# pylint: disable=C0301,C0115,R0903,C0103,C0201,R1710,R0914,W1514,R0914,R1702
import json
import os
import re
from configparser import ConfigParser
from os.path import splitext
from time import strftime
from pprint import pprint
from setting.globalconfig import FixedCsvTitle
from setting.globalconfig import GetCfg
from setting.globalconfig import GlobalConfig
from src import logger
from src import logger
from src.pms._base import MAX_CASE_NUMBER
from src.pms._base import _Base
from src.pms._base import _unicode_to_cn
class CsvTitle:
case_id = "PMS用例ID"
case_level = "用例级别"
from src.rtk._base import transform_app_name
class Pms2Csv(_Base):
@ -32,33 +26,51 @@ class Pms2Csv(_Base):
__author__ = "huangmingqiang@uniontech.com"
def __init__(self):
super().__init__()
self.APP_NAME = GlobalConfig.APP_NAME
self.project_dir = f"autotest_{self.APP_NAME.replace('-', '_')}"
if not os.path.exists(f"{GlobalConfig.APPS_PATH}/{self.project_dir}"):
logger.error(f"{self.project_dir} 似乎不存在 !")
raise ValueError
config_error_log = "请检查您传递的 '命令行参数' 或 setting/globalconfig.ini 里的配置项"
self.PMS_USER = GlobalConfig.PMS_USER
self.PMS_PASSWORD = GlobalConfig.PMS_PASSWORD
def __init__(self, app_name=None, user=None, password=None, pms_link_csv=None):
super().__init__(user=user, password=password)
self.walk_dir = (
f"{GlobalConfig.APPS_PATH}/{transform_app_name(app_name)}"
if app_name
else GlobalConfig.APPS_PATH
)
conf = ConfigParser()
conf.read(GlobalConfig.GLOBAL_CONFIG_FILE_PATH)
self.csv_names = conf.options("csv_link_pms_id")
self.csv_link_cfg = GetCfg(
GlobalConfig.GLOBAL_CONFIG_FILE_PATH, "csv_link_pms_id"
ini_csv_names = conf.options("pmsctl-pms_link_csv")
ini_csv_pms_map = GetCfg(
GlobalConfig.GLOBAL_CONFIG_FILE_PATH, "pmsctl-pms_link_csv"
)
csv_link_lib_cfg = GetCfg(
GlobalConfig.GLOBAL_CONFIG_FILE_PATH, "csv_link_pms_lib"
)
self.CASE_FROM = csv_link_lib_cfg.get("CASE_FROM", default="caselib")
cli_csv_pms_map = {}
cli_csv_names = []
if pms_link_csv:
_cli_csv_names = pms_link_csv.split("/")
for i in _cli_csv_names:
pls = i.split(":")
if len(pls) != 2:
raise ValueError("--pms_link_csv 参数的值可能有问题")
csv_name, pms_product_id = pls
cli_csv_names.append(csv_name.strip())
cli_csv_pms_map[csv_name.strip()] = pms_product_id.strip()
def get_data(self, app_case_id):
self.csv_names = cli_csv_names or ini_csv_names
self.csv_link_cfg = cli_csv_pms_map or ini_csv_pms_map
if not self.csv_names:
raise ValueError(self.config_error_log)
self.pms_mark = GetCfg(
f"{GlobalConfig.SETTING_PATH}/pmsmark.ini", "pms-mark-to-csv-mark"
)
def get_data_from_pms(self, app_case_id):
"""获取pms上数据"""
if not app_case_id:
raise ValueError(self.config_error_log)
case_url = (
f"https://pms.uniontech.com/{self.CASE_FROM}-browse-"
f'{app_case_id}-{"-" if self.CASE_FROM == "testcase" else ""}all-0-id_desc-0-{MAX_CASE_NUMBER}.json'
f"https://pms.uniontech.com/{GlobalConfig.CASE_FROM}-browse-"
f'{app_case_id}-{"-" if GlobalConfig.CASE_FROM == "testcase" else ""}all-0-id_desc-0-{MAX_CASE_NUMBER}.json'
)
res = self.rx.open_url(case_url)
res_str = _unicode_to_cn(res)
@ -66,149 +78,142 @@ class Pms2Csv(_Base):
try:
res_dict = json.loads(res_str)
except json.decoder.JSONDecodeError:
logger.error(f"爬取pms数据失败, 请检查模块 id 是否为: {app_case_id}")
logger.error(f"获取pms数据失败, {self.config_error_log}")
return
cases = res_dict.get("data").get("cases")
res_data = {}
for i in cases:
case_id = cases.get(i).get("id")
case_level = cases.get(i).get("pri")
case_title = cases.get(i).get("title")
# 从用例标题中取出自动化用例id [001]
at_case_id = re.findall(r"\[(\d{3})\]", case_title)
# 如果id存在并且之前没有出现过
# 如果出现相同的id只取第一个
if at_case_id and at_case_id[0] not in res_data.keys():
# 组装成一个字典
res_data[at_case_id[0]] = {
"case_id": case_id, # 用例在PMS上ID
"case_level": case_level, # 用例级别
"case_title": case_title, # 用例标题
case = cases.get(i)
case_id = case.get("id")
case_level = case.get("pri")
case_type = self.pms_mark.get(case.get("type"))
if case_type:
res_data[case_id] = {
"case_level": f"L{case_level}",
"case_type": case_type,
}
if not res_data:
logger.error("未从pms获取到数据, 请检查配置")
logger.error(f"未从pms获取到数据, {self.config_error_log}")
raise ValueError
return res_data
def read_csv(self):
"""读取本地csv文件数据"""
csv_path_dict = {}
# 默认的csv文件备份路径
csv_bak_path = f"{GlobalConfig.REPORT_PATH}/csv_back"
csv_bak_path = f"{GlobalConfig.REPORT_PATH}/pms2csv_back"
if not os.path.exists(csv_bak_path):
os.makedirs(csv_bak_path)
for root, _, files in os.walk(f"{GlobalConfig.APPS_PATH}/{self.project_dir}"):
for root, _, files in os.walk(self.walk_dir):
for file in files:
# 必须是标签csv文件排除一些ddt的csv文件
if file.endswith(".csv") and splitext(file)[0] in self.csv_names:
csv_path_dict[splitext(file)[0]] = f"{root}/{file}"
# 备份csv文件
os.system(
f"cp {root}/{file} {csv_bak_path}/{strftime('%Y%m%d%H%M%S')}_{file}"
f"cp {root}/{file} {csv_bak_path}/{GlobalConfig.TIME_STRING}_{file}"
)
if not csv_path_dict:
logger.error(f"{self.APP_NAME} 目录下未找到csv文件")
raise ValueError
raise ValueError(f"{self.walk_dir} 目录下未找到csv文件")
pms_id_index = None
level_index = None
res_tags = {}
csv_title_dict = {}
csv_heads_dict = {}
for csv_name in csv_path_dict:
with open(csv_path_dict.get(csv_name), "r") as f:
with open(csv_path_dict.get(csv_name), "r", encoding="utf-8") as f:
txt_list = f.readlines()
csv_titles = txt_list[0].strip().split(",")
for index, title in enumerate(csv_titles):
# 找到在表头中对应的索引
if title.strip() == CsvTitle.case_id:
pms_id_index = index - 1
elif title.strip() == CsvTitle.case_level:
level_index = index - 1
csv_heads = txt_list[0].strip().split(",")
csv_head_index_map = {}
for index, title in enumerate(csv_heads):
for i in FixedCsvTitle:
if i.value == title.strip():
csv_head_index_map[i.name] = {
"head_name": i.value,
"head_index": index,
}
# 读取到所有的标签
taglines = [txt.strip().split(",") for txt in txt_list[1:]]
id_tags_dict = {f"{int(i[0]):0>3}": i[1:] for i in taglines if i[0]}
id_tags_dict = {i[0]: i for i in taglines if i[0]}
res_tags[csv_name] = id_tags_dict
# csv文件的表头
csv_title_dict[csv_name] = csv_titles
# 将这些数据无情的返回, 其实可以进一步将这些返回数据整合一下, 但是累了, 就这样吧
return pms_id_index, level_index, res_tags, csv_path_dict, csv_title_dict
csv_heads_dict[csv_name] = csv_head_index_map
return res_tags, csv_heads_dict, csv_path_dict
def compare_pms_to_csv(self):
"""对比pms上数据和本地csv文件数据"""
# 接收csv文件里面的值
(
pms_id_index,
level_index,
_res_tags,
csv_path_dict,
csv_title_dict,
) = self.read_csv()
# 将csv文件里面的数据和pms上爬取的数据进行对比
new_csv_tags_map = {}
for csv_name in _res_tags:
# 每个csv文件处理一次
pms_tags_dict = self.get_data(self.csv_link_cfg.get(csv_name))
# 如果pms上没有爬取到继续处理下一个csv文件
(res_tags, csv_heads_dict, csv_path_dict) = self.read_csv()
new_csv_file_tags = {}
for csv_name in res_tags:
product_id = self.csv_link_cfg.get(csv_name)
pms_tags_dict = self.get_data_from_pms(product_id)
if pms_tags_dict is None:
continue
csv_tags_dict = _res_tags.get(csv_name)
new_csv_tags = {}
for csv_tag_id in csv_tags_dict:
csv_tags = csv_tags_dict.get(csv_tag_id)
# 拿着csv里面的id去和pms上的id匹配
for pms_tag_id in pms_tags_dict:
if pms_tag_id == csv_tag_id:
pms_tags = pms_tags_dict.get(pms_tag_id)
case_id = pms_tags.get("case_id")
csv_tags_dict = res_tags.get(csv_name)
csv_head_dict = csv_heads_dict.get(csv_name)
pms_case_id_index = case_level_index = case_type_index = None
pms_case_id_name = csv_head_dict.get(FixedCsvTitle.pms_case_id.name)
if pms_case_id_name:
pms_case_id_index = pms_case_id_name.get("head_index")
case_level_name = csv_head_dict.get(FixedCsvTitle.case_level.name)
if case_level_name:
case_level_index = case_level_name.get("head_index")
case_type_name = csv_head_dict.get(FixedCsvTitle.case_type.name)
if case_type_name:
case_type_index = case_type_name.get("head_index")
new_csv_tags = []
new_csv_tags.append(
[i.get("head_name") for i in list(csv_head_dict.values())]
)
for csv_case_id in csv_tags_dict:
for pms_case_id in pms_tags_dict:
if pms_case_id == csv_case_id:
pms_tags = pms_tags_dict.get(pms_case_id)
case_level = pms_tags.get("case_level")
case_level = f"L{case_level}"
# 循环处理每个字段
for target_index, value, title_name in [
[pms_id_index, case_id, CsvTitle.case_id],
[level_index, case_level, CsvTitle.case_level],
]:
# 如果没有索引说明原来csv文件中没有这一列直接添加到最后一列
if target_index is None:
csv_tags.append(value)
if CsvTitle.case_id not in csv_title_dict.get(csv_name):
csv_title_dict[csv_name].append(title_name)
else:
# 如果有索引,直接修改原来的数据
csv_tags[target_index] = value
case_type = pms_tags.get("case_type")
flag = False
if (
pms_case_id_index
and csv_tags_dict[csv_case_id][pms_case_id_index]
!= pms_case_id
):
csv_tags_dict[csv_case_id][pms_case_id_index] = pms_case_id
flag = True
if (
case_level_index
and csv_tags_dict[csv_case_id][case_level_index]
!= case_level
):
csv_tags_dict[csv_case_id][case_level_index] = case_level
flag = True
if (
case_type_index
and csv_tags_dict[csv_case_id][case_type_index] != case_type
):
csv_tags_dict[csv_case_id][case_type_index] = case_type
flag = True
new_tags = csv_tags_dict[csv_case_id]
if flag:
logger.info(
f"pms case id: {pms_case_id}, new tags:{new_tags}"
)
new_csv_tags.append(new_tags)
break
else:
# 如果此AT id没有找到对应的那需要将csv此行补位空字符串
for target_index in [pms_id_index, level_index]:
# 如果没有索引说明原来csv文件中没有这一列
# 添加一个空字符串到最后一列
if target_index is None:
csv_tags.append("")
new_csv_tags[csv_tag_id] = csv_tags
new_csv_tags_map[csv_name] = new_csv_tags
new_csv_tags.append(csv_tags_dict[csv_case_id])
return new_csv_tags_map, csv_path_dict, csv_title_dict
new_csv_file_tags[csv_path_dict.get(csv_name)] = new_csv_tags
return new_csv_file_tags
def write_new_csv(self):
"""写新的csv文件"""
new_csv_tags_map, csv_path_dict, csv_title_dict = self.compare_pms_to_csv()
# 将 new_csv_tags_map 里面的数据写成一个新的csv文件
for csv_name in new_csv_tags_map:
csv_path = csv_path_dict.get(csv_name)
new_csv_tags = new_csv_tags_map.get(csv_name)
new_csv_tags_list = []
# 先把表头放进去
new_csv_tags_list.append(",".join(csv_title_dict.get(csv_name)) + "\n")
# 组装成一行行的数据
for _id in new_csv_tags:
new_csv_list = new_csv_tags.get(_id)
new_csv_list.insert(0, _id)
new_csv_tags_list.append(",".join(new_csv_list) + "\n")
with open(csv_path, "w+", encoding="utf-8") as f:
f.writelines(new_csv_tags_list)
pprint(new_csv_tags_list, indent=4)
logger.info("同步完成")
new_csv_file_tags = self.compare_pms_to_csv()
for csv_file in new_csv_file_tags:
new_csv_tags = new_csv_file_tags.get(csv_file)
with open(csv_file, "w+", encoding="utf-8") as f:
for tags in new_csv_tags:
f.write(",".join(tags) + "\n")
logger.info(f"同步完成: {csv_file}")
if __name__ == "__main__":

View File

@ -17,9 +17,9 @@ class SubCmd(Enum):
"""SubCmd"""
run = "run"
remote = "remote"
pms = "pms"
pmsctl = "pmsctl"
startapp = "startapp"
exportcsv = "exportcsv"
csvctl = "csvctl"
@unique
@ -62,9 +62,14 @@ class Args(Enum):
client_password = "client_password"
parallel = "parallel"
autostart = "autostart"
pyid2csv = "pyid2csv"
export_csv_file = "export_csv_file"
pms2csv = "pms2csv"
pms_link_csv = "pms_link_csv"
send2task = "send2task"
def transform_app_name(real_app_name):
def transform_app_name(real_app_name):
"""转换 app_name"""
if not real_app_name:
return None

View File

@ -14,9 +14,9 @@ from os import listdir
from os import makedirs
from os import system
from os.path import exists
from os.path import expanduser
from os.path import isfile
from os.path import join
from os.path import expanduser
from time import sleep
from tkinter import Tk
@ -81,8 +81,10 @@ class LocalRunner:
project_name=None,
build_location=None,
line=None,
exportcsv=None,
collection_only=None,
autostart=None,
pyid2csv=None,
export_csv_file=None,
**kwargs,
):
logger("INFO")
@ -124,9 +126,11 @@ class LocalRunner:
self.project_name = project_name
self.build_location = build_location
self.line = line
self.exportcsv = exportcsv
self.collection_only = collection_only
self.pyid2csv = pyid2csv or GlobalConfig.PY_ID_TO_CSV
self.export_csv_file = export_csv_file or GlobalConfig.EXPORT_CSV_FILE
if not self.default.get(Args.debug.value) and not self.exportcsv:
if not self.default.get(Args.debug.value) and not self.collection_only:
screen = Tk()
x = screen.winfo_screenwidth()
y = screen.winfo_screenheight()
@ -231,8 +235,13 @@ class LocalRunner:
cmd.extend(["-k", f"'{default.get(Args.keywords.value)}'"])
if default.get(Args.tags.value):
cmd.extend(["-m", f"'{default.get(Args.tags.value)}'"])
if self.exportcsv:
if app_dir and app_dir != GlobalConfig.APPS_PATH:
cmd.extend(["--app_name", app_dir])
if self.pyid2csv:
cmd.extend(["--pyid2csv", "--verbosity=-1"])
if self.export_csv_file:
cmd.extend(["--export_csv_file", self.export_csv_file])
if self.collection_only:
cmd.append("--co")
return cmd
@ -305,7 +314,6 @@ class LocalRunner:
self.make_dir(
join(GlobalConfig.REPORT_PATH, GlobalConfig.ReportFormat.JSON)
)
return cmd
def change_working_dir(self):
@ -324,7 +332,7 @@ class LocalRunner:
print(f"WorkSpace: \n{case_path}")
chdir(case_path)
return working_dir
raise EnvironmentError(f"apps目录下未找到指定的{app_name}")
raise EnvironmentError(f"apps目录下未找到指定的 {app_name}")
return GlobalConfig.APPS_PATH
def local_run(self):
@ -353,7 +361,7 @@ class LocalRunner:
return
pytest.main([i.strip("'") for i in run_test_cmd_list[1:]])
if self.exportcsv:
if self.collection_only:
return
if self.project_name and self.build_location and self.line:
self.write_json(
@ -393,7 +401,9 @@ class LocalRunner:
if exists(letmego.conf.setting.RUNNING_MAN_FILE):
letmego.unregister_autostart_service()
letmego.clean_running_man()
letmego.clean_running_man(
copy_to=f"{GlobalConfig.REPORT_PATH}/_running_man_{GlobalConfig.TIME_STRING}.log"
)
@staticmethod
def get_result():
@ -406,7 +416,7 @@ class LocalRunner:
res = Counter([results_dict.get(i).get("result") for i in results_dict])
total = sum(res.values())
skiped = res.get("skip", 0)
total = total - skiped # 剔除skip的用例
total = total - skiped
passed = res.get("pass", 0)
failed = total - passed
pass_rate = f"{round((passed / total) * 100, 1)}%" if passed else "0%"

View File

@ -96,7 +96,7 @@ class RemoteRunner:
if not cli_client_dict and not self.ini_client_dict:
raise ValueError(
"未获取到测试机信息,请检查 setting/remote.ini 中 CLIENT LIST 是否配置,"
"或通过命令行 python3 manage.py remote -c user@ip:password 传入。"
"或通过命令行 remote -c user@ip:password 传入。"
)
self.default = {

View File

@ -12,6 +12,7 @@ from time import strftime
from configparser import ConfigParser
from setting.globalconfig import GlobalConfig
from setting.globalconfig import FixedCsvTitle
class StartApp:
@ -53,7 +54,7 @@ class StartApp:
shutil.move(f"{root}/{file}", f"{root}/{new_file}")
file = new_file
if ".py" in file:
if ".py" in file or ".csv" in file:
with open(f"{root}/{file}", "r") as f:
codes = f.readlines()
new_codes = []
@ -78,6 +79,10 @@ class StartApp:
code = re.sub(r"\${DATE}", strftime("%Y/%m/%d"), code)
if "${TIME}" in code:
code = re.sub(r"\${TIME}", strftime("%H:%M:%S"), code)
if "${FIXEDCSVTITLE}" in code:
code = re.sub(
r"\${FIXEDCSVTITLE}", ",".join([i.value for i in FixedCsvTitle]), code
)
new_codes.append(code)
with open(f"{root}/{file}", "w") as f:
f.writelines([i for i in new_codes])