fix: 修复了一些已知问题;

Description:

Log:
This commit is contained in:
mikigo 2023-11-10 15:21:10 +08:00
parent f2c4122c4c
commit ed2fc05764
9 changed files with 100 additions and 61 deletions

View File

@ -1,2 +1,2 @@
[current]
tag = 2.3.1
tag = 2.3.2-dev

View File

@ -337,17 +337,22 @@ def pytest_collection_modifyitems(session):
try:
csv_name, _id = findall(r"test_(.*?)_(\d+)", item.name)[0]
_case_id = findall(r"test_.*?_(\d+)", item.fspath.purebasename)[0]
_case_name, _case_id = findall(r"test_(.*?)_(\d+)", item.fspath.purebasename)[0]
if _id != _case_id:
raise ValueError
if _case_name != csv_name:
raise FileNotFoundError
except IndexError:
skip_text = f"\n用例函数名称缺少用例id:[{item.nodeid}]"
skip_text = f"用例函数名称缺少用例id:[{item.nodeid}]"
logger.error(skip_text)
add_mark(item, ConfStr.SKIP.value, (skip_text,), {})
except ValueError:
skip_text = f"\n用例py文件的id与用例函数的id不一致:[{item.nodeid}]"
skip_text = f"用例py文件的id与用例函数的id不一致:[{item.nodeid}]"
logger.error(skip_text)
add_mark(item, ConfStr.SKIP.value, (skip_text,), {})
except FileNotFoundError:
logger.error(f"用例py文件的名称与用例函数的名称不一致:[{item.nodeid}]")
session.items.remove(item)
else:
csv_path = csv_path_dict.get(csv_name)
if not csv_path:

View File

@ -1,5 +1,15 @@
# 版本更新记录
## 2.3.2unreleased
**New**
- 由于 `PMS` 用例管理系统存在缺陷,框架移除从 `CSV` 反向同步标签到 `PMS` 功能;
**Fix**
- 优化数据回填逻辑,修复同一个用例 `py` 包含多个用例,数据回填时,中间的失败结果被后续用例更新为的问题;
## 2.3.12023/11/8
**New**

View File

@ -83,13 +83,13 @@ youqu manage.py pmsctl -p2c -u ut00xxxx -p you_password -plc music:81
每次操作会将 `CSV` 文件先备份到 `report/pms2csv_back` 目录下;
### 3. 从CSV文件同步标签到PMS
### ~~3. 从CSV文件同步标签到PMS~~
用于将 `CSV` 文件中的标签同步到 `PMS` 用例库用例中;
~~用于将 `CSV` 文件中的标签同步到 `PMS` 用例库用例中;~~
**【使用方法一】**
~~**【使用方法一】**~~
配置文件方式,通过以下几个配置来控制:
~~配置文件方式,通过以下几个配置来控制:~~
```ini
APP_NAME =
@ -103,15 +103,15 @@ PMS_PASSWORD =
CSV_NAME_TO_PMS =
```
配置好后,在命令行执行:
~~配置好后,在命令行执行:~~
```shell
youqu manage.py pmsctl -c2p
```
**【使用方法二】**
~~**【使用方法二】**~~
完全通过命令行参数传入:
~~完全通过命令行参数传入:~~
```shell
youqu manage.py pmsctl -c2p -a apps/autotest_com_deepin_lianliankan/ -c lianliankan -u my_user -p my_pwd

View File

@ -395,14 +395,6 @@ class Manage:
"-p2c", "--pms2csv", action='store_const', const=True, default=False,
help="从PMS爬取用例标签到csv文件"
)
sub_parser_pms.add_argument(
"-c2p", "--csv2pms", action='store_const', const=True, default=False,
help="将csv文件里面的标签同步到PMS"
)
sub_parser_pms.add_argument(
"-c", "--csv_name", default="",
help="将csv文件里面的标签同步到PMS时csv文件的名称不加后缀"
)
sub_parser_pms.add_argument(
"--send2task",
choices=["yes", ""],
@ -421,8 +413,6 @@ class Manage:
Args.pms_user.value: args.pms_user or self.default_pms_user,
Args.pms_password.value: args.pms_password or self.default_pms_password,
Args.pms2csv.value: args.pms2csv or self.default_pms2csv,
Args.csv2pms.value: args.csv2pms or self.default_csv2pms,
Args.csv_name.value: args.csv_name or self.default_csv_name,
Args.pms_link_csv.value: args.pms_link_csv or self.default_pms_link_csv,
Args.send2task.value: args.send2task or self.default_send2task,
Args.task_id.value: args.task_id or GlobalConfig.TASK_ID,
@ -435,13 +425,6 @@ class Manage:
password=pms_kwargs.get(Args.pms_password.value) or GlobalConfig.PMS_PASSWORD,
pms_link_csv=pms_kwargs.get(Args.pms_link_csv.value),
).write_new_csv()
elif pms_kwargs.get(Args.csv2pms.value):
Csv2Pms(
app_name=pms_kwargs.get(Args.app_name.value),
user=pms_kwargs.get(Args.pms_user.value) or GlobalConfig.PMS_USER,
password=pms_kwargs.get(Args.pms_password.value) or GlobalConfig.PMS_PASSWORD,
csv_name=pms_kwargs.get(Args.csv_name.value),
).post_to_pms()
elif (
pms_kwargs.get(Args.send2task.value)
and pms_kwargs.get(Args.task_id.value)

View File

@ -6,8 +6,9 @@
# SPDX-License-Identifier: GPL-2.0-only
# pylint: disable=C0114
import json
from os.path import exists
import os
from os import makedirs
from os.path import exists
from re import findall, sub
from setting.globalconfig import GlobalConfig
@ -99,9 +100,31 @@ def write_case_result(item, report):
case_result_tpl["from_case_id"] = from_case_id
case_result_tpl["task_id"] = taskid or suiteid
case_result_tpl["at_case_id"] = at_case_id
case_result_tpl["item"] = item.name
case_result_tpl["result"] = "pass" if report.outcome == "passed" else "fail"
case_res_path = item.session.case_res_path
if not exists(case_res_path):
makedirs(case_res_path)
with open(f"{case_res_path}/{item.name}.json", "w+", encoding="utf-8") as _f:
json_file_name = f"{os.path.splitext(item.fspath.basename)[0]}.json"
abs_json_file_path = f"{case_res_path}/{json_file_name}"
if exists(abs_json_file_path):
with open(abs_json_file_path, "r", encoding="utf-8") as _f:
case_res_from_json = json.load(_f)
if item.execution_count >= 2:
if (
case_res_from_json.get("result") == "fail"
and case_result_tpl["result"] == "pass"
and case_res_from_json.get("item") == case_result_tpl["item"]
):
case_result_tpl["result"] = "cover-pass"
with open(abs_json_file_path, "w+", encoding="utf-8") as _f:
_f.write(json.dumps(case_result_tpl, indent=2, ensure_ascii=False))
else:
if case_res_from_json.get("result") in ("pass", "cover-pass") and case_result_tpl["result"] == "fail":
with open(abs_json_file_path, "w+", encoding="utf-8") as _f:
_f.write(json.dumps(case_result_tpl, indent=2, ensure_ascii=False))
else:
with open(abs_json_file_path, "w+", encoding="utf-8") as _f:
_f.write(json.dumps(case_result_tpl, indent=2, ensure_ascii=False))

View File

@ -60,31 +60,46 @@ class Send2Pms(_Base):
for case_name_json in os.listdir(case_res_path):
if not case_name_json.endswith(".json"):
continue
# 读取本地json文件中的数据
with open(f"{case_res_path}/{case_name_json}", "r") as f:
with open(f"{case_res_path}/{case_name_json}", "r", encoding="utf-8") as f:
data = json.load(f)
if not exists(data_send_result_csv):
with open(data_send_result_csv, "w+") as f:
with open(data_send_result_csv, "w+", encoding="utf-8") as f:
pass
with open(data_send_result_csv, "r") as f:
with open(data_send_result_csv, "r", encoding="utf-8") as f:
reqeusted = f.read()
case_name = case_name_json.split(".")[0]
case_result = data.get("result")
if case_name not in reqeusted:
self.push(data_send_result_csv, data, case_name, case_result)
else:
if f"{case_name},fail" in reqeusted and case_result == "cover-pass":
fix_requested = re.sub(rf"{case_name},fail,request_.*?\n", "", reqeusted)
with open(data_send_result_csv, "w+", encoding="utf-8") as f:
f.write(fix_requested)
self.push(data_send_result_csv, data, case_name, case_result)
elif f"{case_name},pass" in reqeusted and case_result == "fail":
fix_requested = re.sub(rf"{case_name},pass,request_.*?\n", "", reqeusted)
with open(data_send_result_csv, "w+", encoding="utf-8") as f:
f.write(fix_requested)
self.push(data_send_result_csv, data, case_name, case_result)
# 如果这条用例已经回填过数据
if case_name in reqeusted:
continue
with open(data_send_result_csv, "a+") as f:
def push(self, data_send_result_csv, data, case_name, case_result):
if data["result"] == "cover-pass":
data["result"] = "pass"
data.pop("item")
with open(data_send_result_csv, "a+", encoding="utf-8") as f:
for _ in range(int(GlobalConfig.SEND_PMS_RETRY_NUMBER)):
# 请求
status_code = self.post_to_pms(**data)
if status_code == 200:
logger.info(f"{runs_id_cmd_log(data)} 数据回填成功 😃")
f.write(f"{case_name},request_ok\n")
logger.info(f"{runs_id_cmd_log(data)} 数据回填成功 ✔")
if case_result == "cover-pass" and f"{case_name},pass" in f.read():
break
f.write(f"{case_name},{data['result']},request_ok\n")
break
else:
logger.info(f"{runs_id_cmd_log(data)} 数据回填失败 😡")
f.write(f"{case_name},request_fail\n")
logger.info(f"{runs_id_cmd_log(data)} 数据回填失败 ")
f.write(f"{case_name},{data['result']},request_fail\n")
@staticmethod
def case_res_path(taskid):

View File

@ -67,10 +67,8 @@ class Args(Enum):
pyid2csv = "pyid2csv"
export_csv_file = "export_csv_file"
pms2csv = "pms2csv"
csv2pms = "csv2pms"
pms_link_csv = "pms_link_csv"
send2task = "send2task"
csv_name = "csv_name"
def transform_app_name(app_name):

View File

@ -158,6 +158,7 @@ class RemoteRunner:
".idea",
".git",
"docs",
"site",
"README.md",
"README.zh_CN.md",
"RELEASE.md",
@ -173,6 +174,10 @@ class RemoteRunner:
f"{self.rsync % (password,)} {exclude} {GlobalConfig.ROOT_DIR}/* "
f"{user}@{_ip}:~/{self.server_project_path}/ {self.empty}"
)
system(
f"{self.rsync % (password,)} {exclude} {GlobalConfig.ROOT_DIR}/.env "
f"{user}@{_ip}:~/{self.server_project_path}/ {self.empty}"
)
logger.info(f"代码发送成功 - < {user}@{_ip} >")
def build_client_env(self, user, _ip, password):