完善各类通知,企业微信、钉钉、邮件

This commit is contained in:
chenyongzhiaaron 2023-08-08 18:10:58 +08:00
parent ffe6fb2553
commit 744b26c5d8
60 changed files with 11059 additions and 888 deletions

View File

@ -42,32 +42,42 @@
<option name="hideEmptyMiddlePackages" value="true" />
<option name="showLibraryContents" value="true" />
</component>
<component name="PropertiesComponent">{
&quot;keyToString&quot;: {
&quot;RunOnceActivity.OpenProjectViewOnStart&quot;: &quot;true&quot;,
&quot;RunOnceActivity.ShowReadmeOnStart&quot;: &quot;true&quot;,
&quot;WebServerToolWindowFactoryState&quot;: &quot;false&quot;,
&quot;git-widget-placeholder&quot;: &quot;master&quot;,
&quot;last_opened_file_path&quot;: &quot;D:/app/apitest&quot;,
&quot;node.js.detected.package.eslint&quot;: &quot;true&quot;,
&quot;node.js.detected.package.tslint&quot;: &quot;true&quot;,
&quot;node.js.selected.package.eslint&quot;: &quot;(autodetect)&quot;,
&quot;node.js.selected.package.tslint&quot;: &quot;(autodetect)&quot;,
&quot;nodejs_package_manager_path&quot;: &quot;npm&quot;,
&quot;settings.editor.selected.configurable&quot;: &quot;com.jetbrains.python.configuration.PyActiveSdkModuleConfigurable&quot;,
&quot;vue.rearranger.settings.migration&quot;: &quot;true&quot;
<component name="PropertiesComponent"><![CDATA[{
"keyToString": {
"RunOnceActivity.OpenProjectViewOnStart": "true",
"RunOnceActivity.ShowReadmeOnStart": "true",
"WebServerToolWindowFactoryState": "false",
"git-widget-placeholder": "master",
"last_opened_file_path": "D:/app/apitest/image",
"node.js.detected.package.eslint": "true",
"node.js.detected.package.tslint": "true",
"node.js.selected.package.eslint": "(autodetect)",
"node.js.selected.package.tslint": "(autodetect)",
"nodejs_package_manager_path": "npm",
"settings.editor.selected.configurable": "com.jetbrains.python.configuration.PyActiveSdkModuleConfigurable",
"vue.rearranger.settings.migration": "true"
},
"keyToStringList": {
"DatabaseDriversLRU": [
"mysql"
]
}
}</component>
}]]></component>
<component name="RecentsManager">
<key name="CopyFile.RECENT_KEYS">
<recent name="D:\app\apitest\image" />
<recent name="D:\app\apitest" />
</key>
<key name="MoveFile.RECENT_KEYS">
<recent name="D:\app\apitest" />
<recent name="D:\app\apitest\encryption_rules" />
<recent name="D:\app\apitest\common\validation" />
<recent name="D:\app\apitest\common\log_utils" />
<recent name="D:\app\apitest\common\parsing" />
</key>
</component>
<component name="RunManager" selected="Python 测试.Python 测试 (test_executor.py 内)">
<configuration name="assert_dict" type="PythonConfigurationType" factoryName="Python" temporary="true" nameIsGenerated="true">
<component name="RunManager" selected="Python.run">
<configuration name="dingding" type="PythonConfigurationType" factoryName="Python" temporary="true" nameIsGenerated="true">
<module name="api_project" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
@ -75,12 +85,12 @@
<env name="PYTHONUNBUFFERED" value="1" />
</envs>
<option name="SDK_HOME" value="" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$/common/data_extraction" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$/common/notiction" />
<option name="IS_MODULE_SDK" value="true" />
<option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/common/data_extraction/assert_dict.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/common/notiction/dingding.py" />
<option name="PARAMETERS" value="" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="false" />
@ -89,7 +99,7 @@
<option name="INPUT_FILE" value="" />
<method v="2" />
</configuration>
<configuration name="dependent_parameter" type="PythonConfigurationType" factoryName="Python" temporary="true" nameIsGenerated="true">
<configuration name="email_client (1)" type="PythonConfigurationType" factoryName="Python" temporary="true" nameIsGenerated="true">
<module name="api_project" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
@ -97,12 +107,12 @@
<env name="PYTHONUNBUFFERED" value="1" />
</envs>
<option name="SDK_HOME" value="" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$/common/data_extraction" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$/common/notiction" />
<option name="IS_MODULE_SDK" value="true" />
<option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/common/data_extraction/dependent_parameter.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/common/notiction/email_client.py" />
<option name="PARAMETERS" value="" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="false" />
@ -111,7 +121,7 @@
<option name="INPUT_FILE" value="" />
<method v="2" />
</configuration>
<configuration name="extractor" type="PythonConfigurationType" factoryName="Python" temporary="true" nameIsGenerated="true">
<configuration name="email_client" type="PythonConfigurationType" factoryName="Python" temporary="true" nameIsGenerated="true">
<module name="api_project" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
@ -119,12 +129,12 @@
<env name="PYTHONUNBUFFERED" value="1" />
</envs>
<option name="SDK_HOME" value="" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$/common/validation" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$/common/notiction" />
<option name="IS_MODULE_SDK" value="true" />
<option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/common/validation/extractor.py" />
<option name="SCRIPT_NAME" value="D:\app\apitest\common\notiction\email_client.py" />
<option name="PARAMETERS" value="" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="false" />
@ -133,7 +143,7 @@
<option name="INPUT_FILE" value="" />
<method v="2" />
</configuration>
<configuration name="validator" type="PythonConfigurationType" factoryName="Python" temporary="true" nameIsGenerated="true">
<configuration name="resultPush" type="PythonConfigurationType" factoryName="Python" temporary="true" nameIsGenerated="true">
<module name="api_project" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
@ -141,12 +151,12 @@
<env name="PYTHONUNBUFFERED" value="1" />
</envs>
<option name="SDK_HOME" value="" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$/common/validation" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$/unittestreportnew/core" />
<option name="IS_MODULE_SDK" value="true" />
<option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/common/validation/validator.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/unittestreportnew/core/resultPush.py" />
<option name="PARAMETERS" value="" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="false" />
@ -155,28 +165,35 @@
<option name="INPUT_FILE" value="" />
<method v="2" />
</configuration>
<configuration name="Python 测试 (test_executor.py 内)" type="tests" factoryName="Autodetect" temporary="true" nameIsGenerated="true">
<configuration name="run" type="PythonConfigurationType" factoryName="Python" temporary="true" nameIsGenerated="true">
<module name="api_project" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
<envs>
<env name="PYTHONUNBUFFERED" value="1" />
</envs>
<option name="SDK_HOME" value="" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$/test_script" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$" />
<option name="IS_MODULE_SDK" value="true" />
<option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="_new_additionalArguments" value="&quot;&quot;" />
<option name="_new_target" value="&quot;$PROJECT_DIR$/test_script/test_executor.py&quot;" />
<option name="_new_targetType" value="&quot;PATH&quot;" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/run.py" />
<option name="PARAMETERS" value="" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="false" />
<option name="MODULE_MODE" value="false" />
<option name="REDIRECT_INPUT" value="false" />
<option name="INPUT_FILE" value="" />
<method v="2" />
</configuration>
<recent_temporary>
<list>
<item itemvalue="Python 测试.Python 测试 (test_executor.py 内)" />
<item itemvalue="Python.assert_dict" />
<item itemvalue="Python.validator" />
<item itemvalue="Python.extractor" />
<item itemvalue="Python.dependent_parameter" />
<item itemvalue="Python.run" />
<item itemvalue="Python.dingding" />
<item itemvalue="Python.email_client (1)" />
<item itemvalue="Python.email_client" />
<item itemvalue="Python.resultPush" />
</list>
</recent_temporary>
</component>
@ -209,7 +226,8 @@
<workItem from="1690855882311" duration="1981000" />
<workItem from="1690873614157" duration="4828000" />
<workItem from="1690937027137" duration="37912000" />
<workItem from="1691034024300" duration="1062000" />
<workItem from="1691034024300" duration="10612000" />
<workItem from="1691370512935" duration="50653000" />
</task>
<task id="LOCAL-00001" summary="优化代码">
<option name="closed" value="true" />
@ -333,17 +351,6 @@
<MESSAGE value="增加处理函数调用链变量以及修复动态函数中传参失效的问题" />
<option name="LAST_COMMIT_MESSAGE" value="增加处理函数调用链变量以及修复动态函数中传参失效的问题" />
</component>
<component name="XDebuggerManager">
<breakpoint-manager>
<breakpoints>
<line-breakpoint enabled="true" suspend="THREAD" type="python-line">
<url>file://$PROJECT_DIR$/common/validation/extractor.py</url>
<line>10</line>
<option name="timeStamp" value="12" />
</line-breakpoint>
</breakpoints>
</breakpoint-manager>
</component>
<component name="com.github.evgenys91.machinet.common.dslhistory.DslHistoryState">
<option name="historyDtoById">
<map>
@ -362,23 +369,30 @@
</option>
</component>
<component name="com.intellij.coverage.CoverageDataManagerImpl">
<SUITE FILE_PATH="coverage/apitest$load_modules_from_folder.coverage" NAME="load_modules_from_folder 覆盖结果" MODIFIED="1690882059703" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/common/validation" />
<SUITE FILE_PATH="coverage/apitest$Unittest__test_executor_py__.coverage" NAME="Unittest (test_executor.py 内) 覆盖结果" MODIFIED="1690850721335" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/test_script" />
<SUITE FILE_PATH="coverage/apitest$http_client.coverage" NAME="http_client 覆盖结果" MODIFIED="1690850772526" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/common/http_client" />
<SUITE FILE_PATH="coverage/apitest$load_modules_from_folder.coverage" NAME="load_modules_from_folder 覆盖结果" MODIFIED="1691398471049" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/common/validation" />
<SUITE FILE_PATH="coverage/apitest$run.coverage" NAME="run 覆盖结果" MODIFIED="1691489293524" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$" />
<SUITE FILE_PATH="coverage/apitest$Unittest__test_api_py__.coverage" NAME="Unittest (test_api.py 内) 覆盖结果" MODIFIED="1689907531802" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/test_script" />
<SUITE FILE_PATH="coverage/apitest$extractor.coverage" NAME="extractor 覆盖结果" MODIFIED="1691031544934" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/common/validation" />
<SUITE FILE_PATH="coverage/apitest$dependent_parameter.coverage" NAME="dependent_parameter 覆盖结果" MODIFIED="1691028535712" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/common/data_extraction" />
<SUITE FILE_PATH="coverage/apitest$dingding.coverage" NAME="dingding 覆盖结果" MODIFIED="1691483158998" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/common/notiction" />
<SUITE FILE_PATH="coverage/apitest$assert_dict.coverage" NAME="assert_dict 覆盖结果" MODIFIED="1691034548959" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/common/data_extraction" />
<SUITE FILE_PATH="coverage/apitest$dependent_parameter.coverage" NAME="dependent_parameter 覆盖结果" MODIFIED="1691398478714" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/common/data_extraction" />
<SUITE FILE_PATH="coverage/apitest$re_chain.coverage" NAME="re_chain 覆盖结果" MODIFIED="1690962595251" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/debug" />
<SUITE FILE_PATH="coverage/apitest$validator.coverage" NAME="validator 覆盖结果" MODIFIED="1691032283545" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/common/validation" />
<SUITE FILE_PATH="coverage/apitest$loaders.coverage" NAME="loaders 覆盖结果" MODIFIED="1690942567666" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/common/validation" />
<SUITE FILE_PATH="coverage/apitest$.coverage" NAME=" 覆盖结果" MODIFIED="1691035043685" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/test_script" />
<SUITE FILE_PATH="coverage/apitest$validator.coverage" NAME="validator 覆盖结果" MODIFIED="1691395818973" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/common/validation" />
<SUITE FILE_PATH="coverage/apitest$exceptions.coverage" NAME="exceptions 覆盖结果" MODIFIED="1691395709236" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/common/utils" />
<SUITE FILE_PATH="coverage/apitest$requestRecord__1_.coverage" NAME="requestRecord (1) 覆盖结果" MODIFIED="1690531988570" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/debug" />
<SUITE FILE_PATH="coverage/apitest$__init__.coverage" NAME="__init__ 覆盖结果" MODIFIED="1691028135541" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/debug" />
<SUITE FILE_PATH="coverage/apitest$method_chain.coverage" NAME="method_chain 覆盖结果" MODIFIED="1690967090148" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/debug" />
<SUITE FILE_PATH="coverage/apitest$action.coverage" NAME="action 覆盖结果" MODIFIED="1689907783681" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/common" />
<SUITE FILE_PATH="coverage/apitest$mysql_client.coverage" NAME="mysql_client 覆盖结果" MODIFIED="1691399388558" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/common/database" />
<SUITE FILE_PATH="coverage/apitest$Unittest__test_executor_py__.coverage" NAME="Unittest (test_executor.py 内) 覆盖结果" MODIFIED="1690850721335" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/test_script" />
<SUITE FILE_PATH="coverage/apitest$http_client.coverage" NAME="http_client 覆盖结果" MODIFIED="1690850772526" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/common/http_client" />
<SUITE FILE_PATH="coverage/apitest$email.coverage" NAME="email_client 覆盖结果" MODIFIED="1691465734077" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/common/notiction" />
<SUITE FILE_PATH="coverage/apitest$extractor.coverage" NAME="extractor 覆盖结果" MODIFIED="1691031544934" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/common/validation" />
<SUITE FILE_PATH="coverage/apitest$email_client__1_.coverage" NAME="email_client (1) 覆盖结果" MODIFIED="1691481071975" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/common/notiction" />
<SUITE FILE_PATH="coverage/apitest$loaders.coverage" NAME="loaders 覆盖结果" MODIFIED="1690942567666" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/common/validation" />
<SUITE FILE_PATH="coverage/apitest$.coverage" NAME=" 覆盖结果" MODIFIED="1691371445339" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/test_script" />
<SUITE FILE_PATH="coverage/apitest$resultPush.coverage" NAME="resultPush 覆盖结果" MODIFIED="1691465454364" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/unittestreportnew/core" />
<SUITE FILE_PATH="coverage/apitest$method_chain.coverage" NAME="method_chain 覆盖结果" MODIFIED="1690967090148" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/debug" />
<SUITE FILE_PATH="coverage/apitest$encryption_str.coverage" NAME="encryption_str 覆盖结果" MODIFIED="1690796164466" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/common/crypto" />
<SUITE FILE_PATH="coverage/apitest$mysql_client.coverage" NAME="mysql_client 覆盖结果" MODIFIED="1690797332029" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/common/database" />
<SUITE FILE_PATH="coverage/apitest$data_extractor.coverage" NAME="data_extractor 覆盖结果" MODIFIED="1691398486016" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/common/data_extraction" />
<SUITE FILE_PATH="coverage/apitest$get_set.coverage" NAME="get_set 覆盖结果" MODIFIED="1689930576115" SOURCE_PROVIDER="com.intellij.coverage.DefaultCoverageFileProvider" RUNNER="coverage.py" COVERAGE_BY_TEST_ENABLED="true" COVERAGE_TRACING_ENABLED="false" WORKING_DIRECTORY="$PROJECT_DIR$/debug" />
</component>
</project>

File diff suppressed because one or more lines are too long

Binary file not shown.

View File

@ -7,13 +7,11 @@
@time: 2023/6/16 15:43
@desc:
"""
from common.crypto import logger
from common.crypto.encryption_rsa import Rsa
# from common.crypto.encryption_aes import DoAES
from common.utils.exceptions import EncryptionError
from encryption_rules import rules
@logger.log_decorator()
class EncryptData:
"""
数据加密入口
@ -32,7 +30,7 @@ class EncryptData:
try:
headers = encrypt_func(headers)
except Exception as e:
logger.error(f"{headers_crypto} 加密失败:{e}")
EncryptionError(headers_crypto, e)
if request_data_crypto:
encrypt_func = encryption_methods.get(request_data_crypto)
@ -40,6 +38,6 @@ class EncryptData:
try:
request_data = encrypt_func(request_data)
except Exception as e:
logger.error(f"{request_data_crypto} 加密失败:{e}")
EncryptionError(request_data_crypto, e)
return headers, request_data

View File

@ -7,6 +7,3 @@
@time: 2023/3/14 14:21
@desc:
"""
from common.log_utils.mylogger import MyLogger
logger = MyLogger()

View File

@ -13,7 +13,7 @@ sys.path.append("../")
sys.path.append("./common")
from jsonpath_ng import parse
from common.utils.environments import Environments
from common.data_extraction import logger
from common.utils.exceptions import logger, InvalidParameterFormatError, ParameterExtractionError
REPLACE_DICT = {
"null": None,
@ -42,7 +42,7 @@ class DataExtractor(Environments):
response = response
if not isinstance(response, (dict, str, list)):
logger.error(f"| 被提取对象非字典、非字符串、非列表不执行jsonpath提取,被提取对象: {response}")
InvalidParameterFormatError(response, "| 被提取对象非字典、非字符串、非列表不执行jsonpath提取")
return {}
if regex and keys:
self.substitute_regex(response, regex, keys)
@ -133,7 +133,7 @@ class DataExtractor(Environments):
self.update_environments(key, result[0]) if len(result) == 1 else self.update_environments(key,
result)
except Exception as e:
logger.error(f"| jsonpath表达式错误'{expression}': {e}")
ParameterExtractionError(expression, e)
if __name__ == '__main__':

View File

@ -9,8 +9,11 @@
"""
import json
from common.data_extraction import logger
from common.data_extraction.data_extractor import DataExtractor
from common.utils.exceptions import ParameterExtractionError, ResponseJsonConversionError
# from common.utils.exceptions import logger
class DependentParameter(DataExtractor):
@ -19,7 +22,6 @@ class DependentParameter(DataExtractor):
def __init__(self):
super().__init__()
@logger.log_decorator()
def replace_dependent_parameter(self, json_string):
"""
替换字符串中的关联参数并返回转化后的字典格式
@ -45,7 +47,9 @@ class DependentParameter(DataExtractor):
args_string = self.ARGS_MATCHER.search(first_method_call_match.group())
args_list = args_string.group(1).split(',') if args_string else []
else:
raise ValueError(f"函数写法错误:无法匹配函数调用格式,字符串为:{strings}")
raise ParameterExtractionError(key, "在关联参数表中查询不到,请检查关联参数字段提取及填写是否正常")
# raise ValueError(f"函数写法错误:无法匹配函数调用格式,字符串为:{strings}")
remaining_method_names = self.METHOD_NAME_MATCHER.findall(strings)
return first_fun, first_method_call, remaining_method_names, args_list
@ -61,7 +65,8 @@ class DependentParameter(DataExtractor):
obj = execute_method_chain(obj, remaining_methods, args=args)
json_string = json_string.replace(function_pattern, str(obj))
else:
logger.error(f"函数key:{key},在关联参数表中查询不到,请检查关联参数字段提取及填写是否正常\n")
ParameterExtractionError(key, "在关联参数表中查询不到,请检查关联参数字段提取及填写是否正常")
# logger.error(f"函数key:{key},在关联参数表中查询不到,请检查关联参数字段提取及填写是否正常\n")
break
else:
key = self.PARAMETER_MATCHER.search(json_string)
@ -75,14 +80,16 @@ class DependentParameter(DataExtractor):
obj = self.get_environments(k)[index] if isinstance(index, int) else self.get_environments(k)
json_string = json_string.replace(key.group(), str(obj))
else:
logger.error(f"字符串key:{key},字符串在关联参数表中查询不到,请检查关联参数字段提取及填写是否正常\n")
ParameterExtractionError(key, "在关联参数表中查询不到,请检查关联参数字段提取及填写是否正常")
# logger.error(f"字符串key:{key},字符串在关联参数表中查询不到,请检查关联参数字段提取及填写是否正常\n")
break
json_string = json_string.replace("True", "true").replace("False", "false")
if self.BRACE_MATCHER.search(json_string) and not self.FUNCTION_CHAIN_MATCHER.search(json_string):
try:
json_string = json.loads(json_string)
except json.JSONDecodeError as e:
logger.error(f"JSONDecodeError:{json_string}:{e}")
ResponseJsonConversionError(json_string,e)
# logger.error(f"JSONDecodeError:{json_string}:{e}")
return json_string

View File

@ -7,6 +7,3 @@
@time: 2023/3/13 14:48
@desc:
"""
from common.log_utils.mylogger import MyLogger
logger = MyLogger()

View File

@ -14,130 +14,127 @@ import pymysql
from dbutils.pooled_db import PooledDB
from pymysql.cursors import DictCursor
from common.database import logger
from common.utils.decorators import singleton
from common.utils.exceptions import DatabaseExceptionError, InvalidParameterFormatError
# @singleton
@singleton
class MysqlClient:
def __init__(self, db_config):
"""
初始化连接配置
Args:
db_config: 数据库连接配置字典
"""
if not db_config:
return
self.result = {}
try:
self.db_base = db_config if isinstance(db_config, dict) else json.loads(db_config)
print(self.db_base)
self.pool = PooledDB(creator=pymysql, maxconnections=10, **self.db_base)
self.conn = self.pool.connection()
self.cursor = self.conn.cursor(DictCursor)
except Exception as e:
logger.error(f"| 数据库链接失败: {e}")
# raise
def __init__(self, db_config):
"""
初始化连接配置
Args:
db_config: 数据库连接配置字典
"""
if not db_config:
return
self.result = {}
try:
self.db_base = db_config if isinstance(db_config, dict) else json.loads(db_config)
self.pool = PooledDB(creator=pymysql, maxconnections=10, **self.db_base)
self.conn = self.pool.connection()
self.cursor = self.conn.cursor(DictCursor)
except Exception as e:
DatabaseExceptionError(self.db_base, e)
# raise
# @logger.log_decorator()
def execute_sql(self, sql):
"""
执行 SQL 语句
def execute_sql(self, sql):
"""
执行 SQL 语句
Args:
sql: SQL 语句字典
{
"delete": {
"sql_name": "DELETE FROM table_name WHERE condition"
},
"update": {
"sql_name": "UPDATE table_name SET column1=value1 WHERE condition"
},
"insert": {
"sql_name": "INSERT INTO table_name (column1, column2) VALUES (value1, value2)"
},
"select": {
"sql_name": "SELECT * FROM table_name WHERE condition"
}
}
Args:
sql: SQL 语句字典
{
"delete": {
"sql_name": "DELETE FROM table_name WHERE condition"
},
"update": {
"sql_name": "UPDATE table_name SET column1=value1 WHERE condition"
},
"insert": {
"sql_name": "INSERT INTO table_name (column1, column2) VALUES (value1, value2)"
},
"select": {
"sql_name": "SELECT * FROM table_name WHERE condition"
}
}
Returns:
执行结果字典
{
"sql_name": [result1, result2, ...]
}
"""
if not sql:
return
try:
for method, sql_data in sql.items():
execute_method = getattr(self, f"_execute_{method}", None)
if not execute_method:
logger.error("| sql字典集编写格式不符合规范")
raise ValueError("| Invalid SQL method")
logger.info(f"| 执行 sql 语句集: {sql_data}")
execute_method(sql_data)
Returns:
执行结果字典
{
"sql_name": [result1, result2, ...]
}
"""
if not sql:
return
try:
for method, sql_data in sql.items():
execute_method = getattr(self, f"_execute_{method}", None)
if not execute_method:
InvalidParameterFormatError(sql, "sql字典集编写格式不符合规范")
raise ValueError("| Invalid SQL method")
execute_method(sql_data)
self.cursor.close()
self.conn.close()
return self.result
self.cursor.close()
self.conn.close()
except Exception as e:
DatabaseExceptionError(sql, e)
raise
return self.result
def _execute_write(self, sql_data):
"""
执行通用的写入操作INSERTUPDATEDELETE
"""
for sql_name, sql_ in sql_data.items():
try:
self.cursor.execute(str(sql_))
except Exception as err:
DatabaseExceptionError(sql_, err)
raise err
self.cursor.connection.commit()
except Exception as e:
logger.error(f"| 数据库操作异常: {e}")
raise
def _execute_select(self, sql_data):
"""
执行 SELECT 语句
def _execute_write(self, sql_data):
"""
执行通用的写入操作INSERTUPDATEDELETE
"""
for sql_name, sql_ in sql_data.items():
try:
self.cursor.execute(str(sql_))
except Exception as err:
logger.error(f"| 执行 SQL 异常: {sql_}")
raise err
self.cursor.connection.commit()
Args:
cursor: 数据库游标
sql_data: SQL 语句数据字典
{
"sql_name": "SELECT * FROM table_name WHERE condition"
}
result: 字典结果
def _execute_select(self, sql_data):
"""
执行 SELECT 语句
Raises:
Exception: 执行异常
"""
for sql_name, sql_ in sql_data.items():
try:
self.cursor.execute(sql_)
self.result[sql_name] = self.cursor.fetchall()
Args:
cursor: 数据库游标
sql_data: SQL 语句数据字典
{
"sql_name": "SELECT * FROM table_name WHERE condition"
}
result: 字典结果
Raises:
Exception: 执行异常
"""
for sql_name, sql_ in sql_data.items():
try:
self.cursor.execute(sql_)
self.result[sql_name] = self.cursor.fetchall()
except Exception as err:
logger.error(f"| 查询异常 sql: {sql_}")
raise err
except Exception as err:
DatabaseExceptionError(sql_, err)
raise err
if __name__ == '__main__':
sql_2 = {
"select":
{
# "select_one": "select username,password as pwd from lea.user where username ='luoshunwen003';"
}
}
database_2 = {
"host": "localhost",
"port": 3306,
"database": "lea",
"user": "root",
"password": "admin"
}
res = MysqlClient(database_2).execute_sql(sql_2)
print("数据执行结果", res)
sql_2 = {
"select":
{
# "select_one": "select username,password as pwd from lea.user where username ='luoshunwen003';"
}
}
database_2 = {
"host": "localhost",
"port": 3306,
"database": "lea",
"user": "root",
"password": "admin"
}
res = MysqlClient(database_2).execute_sql(sql_2)
print("数据执行结果", res)
# t = DataExtractor()
# t.substitute_data(res, jp_dict={"total": "$.select_sale[0].total", "total_1": "$..total"})

View File

@ -7,6 +7,8 @@ import sys
import requests
import urllib3
from common.utils.exceptions import ResponseJsonConversionError
sys.path.append("../")
sys.path.append("./common")
@ -15,7 +17,7 @@ from common.file_handling.file_utils import FileUtils
from common.utils.decorators import request_retry_on_exception
class Pyt(LoadModulesFromFolder):
class HttpClient(LoadModulesFromFolder):
session = requests.Session()
def __init__(self):
@ -76,6 +78,7 @@ class Pyt(LoadModulesFromFolder):
try:
self.response_json = self.response.json()
except Exception as e:
ResponseJsonConversionError(self.response.text, str(e))
self.response_json = None
return self.response
@ -89,5 +92,5 @@ if __name__ == '__main__':
'data': {},
'files': ['test.txt']
}
pyt = Pyt()
pyt = HttpClient()
pyt.http_client(hst, url, method, **kwargs)

View File

@ -11,13 +11,11 @@ from time import perf_counter
from loguru import logger
from common.utils.decorators import singleton
from config import Config
LOG_DIR = Config.log_path
@singleton
class MyLogger:
"""
根据时间文件大小切割日志

75
common/notice/dingding.py Normal file
View File

@ -0,0 +1,75 @@
#!/usr/bin/env python
# encoding: utf-8
"""
@author: kira
@contact: 262667641@qq.com
@file: dingding.py
@time: 2023/8/8 10:59
@desc:
"""
import base64
import hashlib
import hmac
import time
import urllib.parse
import requests
from config import Config
class DingTalk:
"""顶顶通知"""
def __init__(self, title, notice_content, except_info):
""""""
self.url = Config.dingtalk_notice.get("url")
self.notice_content = notice_content
self.title = title
self.except_info = except_info
self.secret = Config.dingtalk_notice.get("secret")
def sign(self):
"""加签"""
timestamp = str(round(time.time() * 1000))
secret_enc = self.secret.encode('utf-8')
string_to_sign = '{}\n{}'.format(timestamp, self.secret)
string_to_sign_enc = string_to_sign.encode('utf-8')
hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
return {"sign": sign, "timestamp": timestamp}
def content(self):
"""markdown 内容"""
if Config.dingtalk_notice.get("except_info"):
self.notice_content += '\n ### 未通过用例详情:\n'
self.notice_content += self.except_info
data = {
"msgtype": "markdown",
"markdown": {
"title": '{}({})'.format(self.title, Config.dingtalk_notice.get("key")),
"text": self.notice_content
},
"at": {
"atMobiles": Config.dingtalk_notice.get("atMobiles"),
"isAtAll": Config.dingtalk_notice.get("isatall")
}
}
return data
def send_info(self):
"""发送钉钉消息"""
notice_content = self.content()
if self.secret:
sign = self.sign()
else:
sign = None
try:
requests.post(url=self.url, json=notice_content, params=sign)
except Exception as e:
print("发送钉钉异常", e)
if __name__ == '__main__':
texts = "#### 杭州天气 @150XXXXXXXX \n > 9度西北风1级空气良89相对温度73%\n > ![screenshot](https://img.alicdn.com/tfs/TB1NwmBEL9TBuNjy1zbXXXpepXa-2400-1218.png)\n > ###### 10点20分发布 [天气](https://www.dingtalk.com) \n"
print(DingTalk("杭州天气", texts).send_info())

View File

@ -0,0 +1,74 @@
#!/usr/bin/env python
# encoding: utf-8
"""
@author: kira
@contact: 262667641@qq.com
@file: email.py
@time: 2023/8/8 10:58
@desc:
"""
import smtplib
from email.header import Header # 将各类信息定义成对象,比如标题等。
from email.mime.application import MIMEApplication
from email.mime.multipart import MIMEMultipart # 定义带有附件的邮件对象
from email.mime.text import MIMEText
from config import Config
class SendEmail:
"""Send mail"""
def __init__(self):
"""
:param host: smtp server address
:param port: smtp server report
:param user: Email account number
:param password: SMTP service authorization code of mailbox
"""
# 邮箱服务器地址
self.host = Config.mail_data.get("host")
# 用户名
self.user = Config.mail_data.get("user")
# 密码(部分邮箱为授权码)
self.password = Config.mail_data.get("password")
# 邮件发送方邮箱地址
self.sender = Config.mail_data.get("sender")
# 25 为 SMTP 端口号
self.port = Config.mail_data.get("port")
# 定义接收放的邮箱(可以是多个)
self.receivers = Config.mail_data.get("receivers")
def content(self, content=None, file_path=None):
"""内容"""
msg = MIMEMultipart() # 如果一份邮件含有附件,则必须定义 multipart/mixed 类型
msg['Form'] = Header(self.sender)
msg['Subject'] = Header('接口自动化测试报告', 'utf-8') # 主题
msg['To'] = ','.join(self.receivers)
# 编辑邮件
# ---邮件正文内容---
message = MIMEText(content, 'html', 'utf-8')
msg.attach(message)
# 测试用例与测试报告一起
if file_path:
for ph in file_path:
filename = ph.split('\\')[-1]
with open(ph, 'rb') as f:
ctx = f.read()
part = MIMEApplication(ctx)
part.add_header('Content-Disposition', 'attachment', filename=filename)
msg.attach(part) # 附件模块添加到 MIMEMultipart
return msg.as_string()
def send_mail(self, content, file_path=None):
"""发送邮件"""
try:
s = smtplib.SMTP_SSL(self.host, self.port)
# s.starttls()
s.login(self.user, self.password)
s.sendmail(self.sender, self.receivers, self.content(content, file_path))
s.quit()
except Exception as e:
print("发送邮件异常", e)

109
common/notice/weChat.py Normal file
View File

@ -0,0 +1,109 @@
#!/usr/bin/env python
# encoding: utf-8
"""
@author: kira
@contact: 262667641@qq.com
@file: weChat.py
@time: 2023/8/8 10:59
@desc:
"""
import requests
import urllib3
urllib3.disable_warnings()
from config import Config
class WeChat:
headers = {"Content-Type": "application/json"}
def __init__(self, notice_content):
self.send_url = Config.weixin_notice.get('send_url')
self.up_url = Config.weixin_notice.get("upload_url")
self.notice_content = notice_content
self.file_lists = Config.weixin_notice.get("file_lists")
def send_markdown(self):
"""
发送markdown 请求
Returns:
"""
notice_content = self.notice_content
send_markdown_data = {
"msgtype": "markdown", # 消息类型此时固定为markdown
"markdown": {
"content": notice_content
}}
# "markdown": {
# "content": f"# **提醒!自动化测试反馈**\n#### **请相关同事注意,及时跟进!**\n"
# f"> 项目名称:<font color=\"info\">{project_name}</font> \n"
# f"> 项目指定端:<font color=\"info\">{project_port}</font> \n"
# f"> 测试用例总数:<font color=\"info\">{total_cases}条</font>;测试用例通过率:<font color=\"info\">{pass_rate}</font>\n"
# "> **--------------------运行详情--------------------**\n"
# f"> **成功数:**<font color=\"info\">{success_cases}</font>\n**失败数:**<font color=\"warning\">{fail_cases}</font>\n "
# f"> **跳过数:**<font color=\"info\">{skip_cases}</font>\n**错误数:**<font color=\"comment\">{error_cases}</font>\n"
# f"> ##### **报告链接:** [jenkins报告,请点击后进入查看]{report_url}"
# # 加粗:**需要加粗的字**
# 引用:> 需要引用的文字
# 字体颜色(只支持3种内置颜色)
# 标题 支持1至6级标题注意#与文字中间要有空格)
# 绿色info、灰色comment、橙红warning
# }
# }
requests.post(url=self.send_url, headers=self.headers, json=send_markdown_data, verify=False).json()
def send_file(self, file_path):
"""
文件路径
Args:
file_path:
Returns:
"""
media_id = self.upload_media(file_path)
for i in media_id:
send_data = {"msgtype": "file", "file": {"media_id": i}}
requests.post(self.send_url, headers=self.headers, json=send_data, verify=False)
import time
time.sleep(2)
def upload_media(self, file_lists) -> list:
"""
Args:
file_lists: 文件路径
Returns:上传文件后返回的每一个文件id
"""
# print(f"file path: {file_path}")
media_ids = []
if file_lists:
for fp in file_lists:
with open(fp, "rb") as f:
send_data = {"media": f}
res_html = requests.post(self.up_url, files=send_data, verify=False).json()
media_id = res_html.get("media_id")
media_ids.append(media_id)
return media_ids
def send_main(self):
"""
发送markdown及上传文件
Args:
dirs文件夹路径或文件路径
Returns:
"""
try:
self.send_markdown()
except Exception as e:
print("发送markdown信息异常", e)
try:
self.send_file(self.file_lists)
except Exception as e:
print("发送企业微信附件异常", e)

View File

@ -1,109 +0,0 @@
# -*- coding:utf-8 -*-
"""
Time: 2020/6/17/017 10:52
Author: 陈勇志
Email:262667641@qq.com
Project:api_project
"""
import time
import requests
import urllib3
from common.file_handling.file_utils import FileUtils
urllib3.disable_warnings()
class WxWorkSms:
# header = {"Content-Type": "multipart/form-data"}
headers = {"Content-Type": "application/json"}
def __init__(self, key):
self.send_url = f"https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key={key}"
self.up_url = f"https://qyapi.weixin.qq.com/cgi-bin/webhook/upload_media?key={key}&type=file"
def send_markdown(self, project_name, project_port, total_cases, pass_rate, success_cases, fail_cases, skip_cases,
error_cases, report_url):
"""
发送markdown 请求
Returns:
"""
send_markdown_data = {
"msgtype": "markdown", # 消息类型此时固定为markdown
"markdown": {
"content": f"# **提醒!自动化测试反馈**\n#### **请相关同事注意,及时跟进!**\n"
f"> 项目名称:<font color=\"info\">{project_name}</font> \n"
f"> 项目指定端:<font color=\"info\">{project_port}</font> \n"
f"> 测试用例总数:<font color=\"info\">{total_cases}条</font>;测试用例通过率:<font color=\"info\">{pass_rate}</font>\n"
"> **--------------------运行详情--------------------**\n"
f"> **成功数:**<font color=\"info\">{success_cases}</font>\n**失败数:**<font color=\"warning\">{fail_cases}</font>\n "
f"> **跳过数:**<font color=\"info\">{skip_cases}</font>\n**错误数:**<font color=\"comment\">{error_cases}</font>\n"
f"> ##### **报告链接:** [jenkins报告,请点击后进入查看]{report_url}"
# 加粗:**需要加粗的字**
# 引用:> 需要引用的文字
# 字体颜色(只支持3种内置颜色)
# 标题 支持1至6级标题注意#与文字中间要有空格)
# 绿色info、灰色comment、橙红warning
}
}
requests.post(url=self.send_url, headers=self.headers, json=send_markdown_data, verify=False).json()
def send_file(self, file_path):
"""
文件路径
Args:
file_path:
Returns:
"""
media_id = self.upload_media(file_path)
for i in media_id:
send_data = {"msgtype": "file", "file": {"media_id": i}}
requests.post(self.send_url, headers=self.headers, json=send_data, verify=False)
time.sleep(2)
def upload_media(self, file_path) -> list:
"""
Args:
file_path: 文件路径
Returns:上传文件后返回的每一个文件id
"""
# print(f"file path: {file_path}")
media_ids = []
for fp in file_path:
with open(fp, "rb") as f:
send_data = {"media": f}
res_html = requests.post(self.up_url, files=send_data, verify=False).json()
media_id = res_html.get("media_id")
media_ids.append(media_id)
return media_ids
def send_main(self, folder_path, project_name, project_port, total_cases, pass_rate, success_cases, fail_cases,
skip_cases,
error_cases, report_url):
"""
发送markdown及上传文件
Args:
dirs文件夹路径
Returns:
"""
self.send_markdown(
project_name, project_port, total_cases, pass_rate, success_cases, fail_cases, skip_cases,
error_cases, report_url
)
file_path = FileUtils.get_all_path(folder_path)
self.send_file(file_path)
if __name__ == '__main__':
dirs = r'D:\apk_api\api-test-project\OutPut\Reports'
WxWorkSms('8b1647d4-dc32-447c-b524-548acf18a938').send_main(dirs, 2, 3, 4, 5, 6, 7, 8, 9, 10)
# WxWorkSms('8b1647d4-dc32-447c-b524-548acf18a938').send_markdown(1, 2, 3, 4, 5, 6, 7, 8, 9)

View File

@ -13,7 +13,6 @@ import time
from common import bif_functions
from common.crypto.encrypt_data import EncryptData
from common.database.mysql_client import MysqlClient
from common.log_utils.mylogger import MyLogger
from common.utils.decorators import singleton
from common.utils.exceptions import *
from common.validation.extractor import Extractor
@ -22,15 +21,14 @@ from common.validation.validator import Validator
@singleton
class Action(Extractor, LoadScript, Validator, MysqlClient):
class Action(Extractor, LoadScript, Validator):
def __init__(self, initialize_data=None, db_config=None):
super().__init__()
MysqlClient.__init__(self, db_config)
self.db_config = db_config
self.encrypt = EncryptData()
self.__variables = {}
self.set_environments(initialize_data)
self.set_bif_fun(bif_functions)
self.logger = MyLogger()
def execute_dynamic_code(self, item, code):
self.variables = item
@ -72,12 +70,13 @@ class Action(Extractor, LoadScript, Validator, MysqlClient):
try:
self.substitute_data(self.response_json, regex=regex, keys=keys, deps=deps, jp_dict=jp_dict)
except Exception as err:
self.logger.error(f"| 分析响应失败:{sheet}_{iid}_{name}_{desc}"
f"\nregex={regex};"
f" \nkeys={keys};"
f"\ndeps={deps};"
f"\njp_dict={jp_dict}"
f"\n{err}")
msg = f"| 分析响应失败:{sheet}_{iid}_{name}_{desc}"
f"\nregex={regex};"
f" \nkeys={keys};"
f"\ndeps={deps};"
f"\njp_dict={jp_dict}"
f"\n{err}"
ParameterExtractionError(msg, err)
def execute_validation(self, excel, sheet, iid, name, desc, expected):
try:
@ -85,13 +84,14 @@ class Action(Extractor, LoadScript, Validator, MysqlClient):
result = "PASS"
except Exception as e:
result = "FAIL"
self.logger.error(f"| exception case:**{sheet}_{iid}_{name}_{desc}**\n{e}")
raise AssertionFailedError(self.assertions)
error_info = f"| exception case:**{sheet}_{iid}_{name}_{desc},{self.assertions}"
AssertionFailedError(error_info, e)
raise e
finally:
print(f'| Assertion Result-->{self.assertions}')
print(f'| 断言结果-->{self.assertions}\n')
response = self.response.text if self.response is not None else str(self.response)
# excel.write_back(sheet_name=sheet, i=iid, response=response, test_result=result,
# assert_log=str(self.assertions))
excel.write_back(sheet_name=sheet, i=iid, response=response, test_result=result,
assert_log=str(self.assertions))
@staticmethod
def base_info(item):
@ -158,27 +158,23 @@ class Action(Extractor, LoadScript, Validator, MysqlClient):
if not is_run or is_run.upper() != "YES":
return True
def pause_execution(self, sleep_time):
@staticmethod
def pause_execution(sleep_time):
if sleep_time:
try:
time.sleep(sleep_time)
except Exception as e:
raise MyBaseException(f"暂停时间必须是数字!")
raise InvalidSleepTimeError(f"{sleep_time}", e)
def exc_sql(self, item):
sql, sql_params_dict = self.sql_info(item)
sql = self.replace_dependent_parameter(sql)
if sql:
try:
execute_sql_results = self.execute_sql(sql)
except DatabaseExceptionError as e:
raise DatabaseExceptionError(sql, str(e))
else:
if execute_sql_results and sql_params_dict:
try:
self.substitute_data(execute_sql_results, jp_dict=sql_params_dict)
except Exception as e:
ParameterExtractionError(sql_params_dict, str(e))
client = MysqlClient(self.db_config)
execute_sql_results = client.execute_sql(sql)
print(f"| 执行 sql 成功--> {execute_sql_results}")
if execute_sql_results and sql_params_dict:
self.substitute_data(execute_sql_results, jp_dict=sql_params_dict)
if __name__ == '__main__':

View File

@ -7,9 +7,13 @@
@time: 2023/3/21 17:41
@desc:
"""
import time
import json
from functools import wraps
import yaml
from common.utils.exceptions import RequestSendingError
def singleton(cls):
"""
@ -29,6 +33,8 @@ def singleton(cls):
def request_retry_on_exception(retries=2, delay=1.5):
"""失败请求重发"""
def request_decorator(func):
e = None
@ -51,8 +57,97 @@ def request_retry_on_exception(retries=2, delay=1.5):
time.sleep(delay)
else:
return response
raise Exception(f"| 请求重试**{retries}**次失败,请检查!!{e}")
raise RequestSendingError(kwargs, e)
return wrapper
return request_decorator
def list_data(datas):
"""
:param datas: 测试数据
:return:
"""
def wrapper(func):
setattr(func, "PARAMS", datas)
return func
return wrapper
def yaml_data(file_path):
"""
:param file_path: yaml文件路径
:return:
"""
def wrapper(func):
try:
with open(file_path, "r", encoding="utf-8") as f:
datas = yaml.load(f, Loader=yaml.FullLoader)
except:
with open(file_path, "r", encoding="gbk") as f:
datas = yaml.load(f, Loader=yaml.FullLoader)
setattr(func, "PARAMS", datas)
return func
return wrapper
def json_data(file_path):
"""
:param file_path: json文件路径
:return:
"""
def wrapper(func):
try:
with open(file_path, "r", encoding="utf-8") as f:
datas = json.load(f)
except:
with open(file_path, "r", encoding="gbk") as f:
datas = json.load(f)
setattr(func, "PARAMS", datas)
return func
return wrapper
import time
import traceback
def run_count(count, interval, func, *args, **kwargs):
"""运行计数"""
for i in range(count):
try:
func(*args, **kwargs)
except Exception as e:
print("====用例执行失败===")
traceback.print_exc()
if i + 1 == count:
raise e
else:
print("==============开始第{}次重运行=============".format(i))
time.sleep(interval)
else:
break
def rerun(count, interval=2):
"""
单个测试用例重运行的装饰器,注意点如果使用了ddt,那么该方法要在用在ddt之前
:param count: 失败重运行次数
:param interval: 每次重运行间隔时间,默认三秒钟
:return:
"""
def wrapper(func):
def decorator(*args, **kwargs):
run_count(count, interval, func, *args, **kwargs)
return decorator
return wrapper

View File

@ -7,11 +7,15 @@
@time: 2023/8/1 9:12
@desc:
"""
from common.log_utils.mylogger import MyLogger
logger = MyLogger()
class MyBaseException(Exception):
def __init__(self, msg):
self.msg = msg
self.logger = logger
def __str__(self):
return self.msg
@ -21,53 +25,118 @@ class RequestSendingError(MyBaseException):
"""请求异常"""
ERROR_CODE = 1001
def __init__(self, url, reason):
msg = f"请求异常:URL={url}, 原因={reason}"
def __init__(self, request_info, reason):
msg = f"请求异常:request_info={request_info}, 原因={reason}"
super().__init__(msg)
self.logger.error(msg)
class DatabaseExceptionError(MyBaseException):
"""数据库异常"""
ERROR_CODE = 1002
def __init__(self, operation, reason):
msg = f"数据库异常:操作={operation}, 原因={reason}"
def __init__(self, operation_info, reason):
msg = f"数据库异常:操作信息={operation_info}, 原因={reason}"
super().__init__(msg)
self.logger.error(msg)
class ParameterExtractionError(MyBaseException):
"""参数提取异常"""
ERROR_CODE = 1003
def __init__(self, parameter_path, reason):
msg = f"参数提取异常:参数路径={parameter_path}, 原因={reason}"
def __init__(self, parameter_info, reason):
msg = f"参数提取异常:参数信息={parameter_info}, 原因={reason}"
super().__init__(msg)
self.logger.error(msg)
class ParameterReplacementError(MyBaseException):
"""参数替换异常"""
ERROR_CODE = 1004
def __init__(self, parameter_name, reason):
msg = f"参数替换异常:参数名称={parameter_name}, 原因={reason}"
def __init__(self, parameter_info, reason):
msg = f"参数替换异常:参数名称={parameter_info}, 原因={reason}"
super().__init__(msg)
self.logger.error(msg)
class AssertionFailedError(MyBaseException):
"""断言异常"""
ERROR_CODE = 1005
def __init__(self, assertion):
msg = f"断言失败:{assertion}"
print(msg)
def __init__(self, assertion, reason):
msg = f"执行断言失败:断言信息={assertion}, 原因={reason}"
super().__init__(msg)
self.logger.error(msg)
class ExecuteDynamiCodeError(MyBaseException):
"""执行动态代码异常"""
ERROR_CODE = 1006
def __init__(self, code, reason):
msg = f"执行动态代码异常:动态代码={code}, 原因={reason}"
def __init__(self, code_info, reason):
msg = f"执行动态代码异常:动态代码信息={code_info}, 原因={reason}"
print(msg)
super().__init__(msg)
self.logger.error(msg)
class InvalidSleepTimeError(MyBaseException):
"""无效的暂停时间异常"""
ERROR_CODE = 1007
def __init__(self, sleep_time, reason):
msg = f"无效的暂停时间sleep_time={sleep_time},原因={reason}"
super().__init__(msg)
self.logger.error(msg)
class ScriptNotFoundError(MyBaseException):
"""脚本不存在异常"""
ERROR_CODE = 1008
def __init__(self, script_info, reason):
msg = f"脚本不存在异常script_info={script_info},原因={reason}"
super().__init__(msg)
self.logger.error(msg)
class InvalidParameterFormatError(MyBaseException):
"""无效的参数格式异常"""
ERROR_CODE = 1009
def __init__(self, parameter_info, reason):
msg = f"无效的参数格式异常parameter_info={parameter_info},原因={reason}"
super().__init__(msg)
self.logger.error(msg)
class ResponseJsonConversionError(MyBaseException):
"""响应内容转换为 JSON 格式异常"""
ERROR_CODE = 1010
def __init__(self, response_text, reason):
msg = f"响应内容转换为 JSON 格式异常:响应内容={response_text}, 原因={reason}"
super().__init__(msg)
self.logger.error(msg)
class DynamicLoadingError(MyBaseException):
"""动态加载模块或文件异常"""
ERROR_CODE = 1011
def __init__(self, code_info, reason):
msg = f"动态加载模块或文件发生异常:动态模块或文件={code_info}, 原因={reason}"
super().__init__(msg)
self.logger.error(msg)
class EncryptionError(MyBaseException):
"""加密失败异常"""
ERROR_CODE = 1012
def __init__(self, method_name, error_message):
msg = f"加密失败异常: 加密方法={method_name} 原因={error_message}"
super().__init__(msg)

View File

@ -12,85 +12,87 @@ import json
import jsonpath
from common.utils.exceptions import ParameterExtractionError
from common.validation import logger
class Extractor:
"""
提取器
主要功能
1格式化输出变量
2从响应中提取需要输出的变量信息并返回
"""
"""
提取器
主要功能
1格式化输出变量
2从响应中提取需要输出的变量信息并返回
"""
def __init__(self):
self.output_variables_mapping = {}
def __init__(self):
self.output_variables_mapping = {}
def uniform_output(self, output_variables):
"""
统一格式化测试用例的输出变量output
Args:
output_variables: listdictstr 示例["a","b",{"a":"ac"}] or {"a":"ac"} or "a"
def uniform_output(self, output_variables):
"""
统一格式化测试用例的输出变量output
Args:
output_variables: listdictstr 示例["a","b",{"a":"ac"}] or {"a":"ac"} or "a"
Returns: 示例[{"alias_key":"original_key"}]
list
"""
if isinstance(output_variables, list):
for output_variable in output_variables:
self.uniform_output(output_variable)
elif isinstance(output_variables, dict):
for alias_key, original_key in output_variables.items():
if not isinstance(alias_key, str):
alias_key = json.dumps(alias_key, ensure_ascii=False)
if not isinstance(original_key, str):
original_key = json.dumps(original_key, ensure_ascii=False)
self.output_variables_mapping.update({alias_key: original_key})
elif isinstance(output_variables, str):
self.output_variables_mapping.update({output_variables: output_variables})
else:
raise Exception("参数格式错误!")
Returns: 示例[{"alias_key":"original_key"}]
list
"""
if isinstance(output_variables, list):
for output_variable in output_variables:
self.uniform_output(output_variable)
elif isinstance(output_variables, dict):
for alias_key, original_key in output_variables.items():
if not isinstance(alias_key, str):
alias_key = json.dumps(alias_key, ensure_ascii=False)
if not isinstance(original_key, str):
original_key = json.dumps(original_key, ensure_ascii=False)
self.output_variables_mapping.update({alias_key: original_key})
elif isinstance(output_variables, str):
self.output_variables_mapping.update({output_variables: output_variables})
else:
raise Exception("参数格式错误!")
def extract_output(self, resp_obj=None):
"""
从接口返回中提取待输出变量的值
Args:
resp_obj: ResponseObject对象的resp_obj属性
def extract_output(self, resp_obj=None):
"""
从接口返回中提取待输出变量的值
Args:
resp_obj: ResponseObject对象的resp_obj属性
Returns: output_variables_mapping 从resp_obj中提取后的mapping
Returns: output_variables_mapping 从resp_obj中提取后的mapping
"""
return {alias_key: self.extract_value_by_jsonpath(resp_obj=resp_obj, expr=original_key) for
alias_key, original_key in self.output_variables_mapping.items()}
"""
return {alias_key: self.extract_value_by_jsonpath(resp_obj=resp_obj, expr=original_key) for
alias_key, original_key in self.output_variables_mapping.items()}
@staticmethod
def extract_value_by_jsonpath(resp_obj=None, expr=None):
"""
根据jsonpath从resp_obj中提取相应的值
Args:
resp_obj: ResponseObject实例
expr: 提取条件
@staticmethod
def extract_value_by_jsonpath(resp_obj=None, expr=None):
"""
根据jsonpath从resp_obj中提取相应的值
Args:
resp_obj: ResponseObject实例
expr: 提取条件
Returns:
Returns:
"""
# logger.debug(f'正在执行数据提取:提取数据源内容:{resp_obj},{type(resp_obj)}')
# logger.debug('正在执行数据提取:提取表达式:{expr}'.format(expr=expr))
try:
result = jsonpath.jsonpath(resp_obj if isinstance(resp_obj, (dict, list)) else json.dumps(resp_obj), expr)
except Exception as e:
return expr
else:
if result is False:
result = []
logger.error(f'提取失败:提取表达式:{expr},没有提取到对应的值')
elif isinstance(result, list):
if len(result) == 1:
result = result[0]
# logger.info(f'提取成功,输出结果,提取表达式:{expr},提取结果:{result}')
return result
"""
# logger.debug(f'正在执行数据提取:提取数据源内容:{resp_obj},{type(resp_obj)}')
# logger.debug('正在执行数据提取:提取表达式:{expr}'.format(expr=expr))
try:
result = jsonpath.jsonpath(resp_obj if isinstance(resp_obj, (dict, list)) else json.dumps(resp_obj), expr)
except Exception as e:
ParameterExtractionError(expr, e)
return expr
else:
if result is False:
result = []
logger.error(f'提取失败:提取表达式:{expr},没有提取到对应的值')
elif isinstance(result, list):
if len(result) == 1:
result = result[0]
# logger.info(f'提取成功,输出结果,提取表达式:{expr},提取结果:{result}')
return result
if __name__ == '__main__':
r_obg = {"data": ["key", 1, "val", 2]}
Extractor.extract_value_by_jsonpath(r_obg, "$.data[0]")
Extractor.extract_value_by_jsonpath(r_obg, 200)
r_obg = {"data": ["key", 1, "val", 2]}
Extractor.extract_value_by_jsonpath(r_obg, "$.data[0]")
Extractor.extract_value_by_jsonpath(r_obg, 200)

View File

@ -2,63 +2,55 @@ import importlib.util
import os
import sys
from common.utils.exceptions import ScriptNotFoundError
sys.path.append('..')
sys.path.append('../utils')
from common.log_utils.mylogger import MyLogger
logger = MyLogger()
class ScriptNotFoundError(Exception):
pass
class LoadScript:
# @logger.log_decorator()
def load_script(self, script_path):
def load_script(self, script_path):
"""
加载脚本文件并返回模块对象
"""
加载脚本文件并返回模块对象
Args:
script_path (str): 脚本文件的路径
Args:
script_path (str): 脚本文件的路径
Returns:
module: 脚本文件对应的模块对象
"""
try:
spec = importlib.util.spec_from_file_location(os.path.basename(script_path), script_path)
script_module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(script_module)
return script_module
except FileNotFoundError:
raise ScriptNotFoundError(script_path)
Returns:
module: 脚本文件对应的模块对象
"""
try:
spec = importlib.util.spec_from_file_location(os.path.basename(script_path), script_path)
script_module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(script_module)
return script_module
except Exception as e:
raise ScriptNotFoundError(script_path, e)
@logger.log_decorator()
def load_and_execute_script(self, script_directory, script_name, method_name, request):
"""
加载并执行脚本文件中的指定方法
Args:
request: 请求数据
script_directory (str): 脚本文件所在的目录
script_name (str): 脚本文件的名称
method_name (str): 要执行的方法的名称
"""
script_path = os.path.join(script_directory, script_name)
try:
script = self.load_script(script_path)
if hasattr(script, method_name):
method = getattr(script, method_name)
return method(request)
except ScriptNotFoundError:
return request
def load_and_execute_script(self, script_directory, script_name, method_name, request):
"""
加载并执行脚本文件中的指定方法
Args:
request: 请求数据
script_directory (str): 脚本文件所在的目录
script_name (str): 脚本文件的名称
method_name (str): 要执行的方法的名称
"""
script_path = os.path.join(script_directory, script_name)
try:
script = self.load_script(script_path)
if hasattr(script, method_name):
method = getattr(script, method_name)
return method(request)
except Exception as e:
ScriptNotFoundError(script_path, e)
return request
if __name__ == '__main__':
from config import Config
from config import Config
SCRIPTS_DIR = Config.SCRIPTS_DIR
load_and_exe_s = LoadScript()
load_and_exe_s.load_and_execute_script(SCRIPTS_DIR, 'request_script_sheetname_id.py', 'setup', {"y": "z"})
SCRIPTS_DIR = Config.SCRIPTS_DIR
load_and_exe_s = LoadScript()
load_and_exe_s.load_and_execute_script(SCRIPTS_DIR, 'request_script_sheetname_id.py', 'setup', {"y": "z"})

View File

@ -12,14 +12,13 @@ import os
import types
from common.data_extraction.dependent_parameter import DependentParameter
from common.validation import logger
from common.utils.exceptions import DynamicLoadingError
class LoadModulesFromFolder(DependentParameter):
def __init__(self):
super().__init__()
@logger.log_decorator()
def load_modules_from_folder(self, folder_or_mnodule):
"""
动态加载文件或模块
@ -50,7 +49,9 @@ class LoadModulesFromFolder(DependentParameter):
if callable(o):
self.update_environments(n, o)
else:
raise TypeError("folder_or_module should be either a folder path (str) or a module (types.ModuleType).")
raise DynamicLoadingError(folder_or_mnodule,
"older_or_module should be either a folder path (str) or a module ("
"types.ModuleType).")
if __name__ == '__main__':

View File

@ -9,12 +9,12 @@
# -------------------------------------------------------------------------------
import types
from common.http_client.http_client import Pyt
from common.http_client.http_client import HttpClient
from common.validation import comparators
from common.validation import logger
class Loaders(Pyt):
class Loaders(HttpClient):
def __init__(self):
super().__init__()

View File

@ -8,7 +8,7 @@
# -------------------------------------------------------------------------------
import json
from common.validation import logger
from common.utils.exceptions import InvalidParameterFormatError
from common.validation.comparator_dict import comparator_dict
from common.validation.extractor import Extractor
from common.validation.loaders import Loaders
@ -58,7 +58,7 @@ class Validator(Loaders):
"comparator": comparator
})
else:
logger.error("参数格式错误!")
InvalidParameterFormatError(validate_variables, "参数格式错误!")
def validate(self, resp=None):
"""
@ -110,7 +110,8 @@ class Validator(Loaders):
self.assertions.clear()
self.uniform_validate(validate_variables)
if not self.validate_variables_list:
raise "uniform_validate 执行失败,无法进行 validate 校验"
raise InvalidParameterFormatError(self.validate_variables_list,
"uniform_validate 执行失败,无法进行 validate 校验")
self.validate(resp)

View File

@ -22,9 +22,35 @@ class Config:
# 测试报告及 logger 所在路径
# *****************************************************************
test_report = os.path.join(base_path, "output", "reports")
test_report_file = os.path.join(base_path, "output", "reports","report.html")
log_path = os.path.join(base_path, "output", "log")
SCRIPTS_DIR = os.path.join(base_path, "scripts")
# 邮件配置信息
mail_data = {
"host": "smtp.qq.com", # 邮件服务地址
"user": "262667641@qq.com", # 用户名
"password": "ztvqsnikiupvbghe", # 密码(部分邮箱为授权码)# 密码
"sender": "262667641@qq.com", # 发送人
"port": 465, # smtp 端口号
"receivers": ['262667641@qq.com', '125109524@qq.com'] # 接收方的邮箱
}
# 企业微信机器人配置信息
weixin_notice = {
"send_url": "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=8b1647d4-dc32-447c-b524-548acf18a938",
"upload_url": "https://qyapi.weixin.qq.com/cgi-bin/webhook/upload_media?type=file&key=8b1647d4-dc32-447c-b524-548acf18a938",
"file_lists": [test_report_file, test_case] # 需要推送的文件的路径
}
# 钉钉机器人配置信息
dingtalk_notice = {
"url": "https://oapi.dingtalk.com/robot/send?access_token=7d1e11079e00a4ca9f11283f526349abd5ba3f792ef7bcb346909ff215af02de",
"secret": "SEC441dbbdb8dbe150e5fc3e348bb449d3113b1be1a90be527b898ccd78c51566c1",
"key": "", # 安全关键字
"atMobiles": "18127813600", # 需要@指定人员的手机号
"isAtAll": True, # 是否@ 所有人
"except_info": False # 是否发送测试不通过的异常数据
}
if __name__ == '__main__':
test = Config()

View File

@ -15,15 +15,13 @@ from natsort import natsorted
__all__ = ["md5_sign", "sha1_sign"]
from common.crypto.encryption_str import sha1_secret_str, md5
from extensions import logger
@logger.log_decorator()
def md5_sign(data: dict):
"""
数据加签
Args:
**data:需要加钱的数据
**data:需要加钱的数据DD
Returns:
@ -42,7 +40,6 @@ def md5_sign(data: dict):
return {**data, **{"sign": sign_value}}
@logger.log_decorator()
def sha1_sign(post_data: dict):
timestamp = int(round(time.time() * 1000)) # 毫秒级时间戳
argument = {"secretKey": "", "timestamp": timestamp} # 加密加盐参数

View File

@ -7,8 +7,8 @@
@time: 2023/3/14 16:21
@desc:
"""
from common.log_utils.mylogger import MyLogger
# from common.log_utils.mylogger import MyLogger
logger = MyLogger()
# logger = MyLogger()
from .dynamic_scaling_methods import *

0
file.txt Normal file
View File

BIN
image/test_img.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 530 KiB

File diff suppressed because one or more lines are too long

10
run.py
View File

@ -4,7 +4,6 @@
# @Email : 262667641@qq.com
# @File : run_main.py
# @Project : risk_api_project
import sys
import unittest
@ -13,8 +12,8 @@ sys.path.append("./")
sys.path.append('cases')
from config import Config
from unittestreport import TestRunner
from common.utils.WxworkSms import WxWorkSms
# from unittestreport import TestRunner
from unittestreportnew import TestRunner
def run():
@ -23,7 +22,10 @@ def run():
runner = TestRunner(test_case, report_dir=test_report, title="接口自动化测试报告", templates=2, tester="kira",
desc="自动化测试")
runner.run()
WxWorkSms('8b1647d4-dc32-447c-b524-548acf18a938').send_main(test_report, 1, 2, 3, 4, 5, 6, 7, 8, 9)
# get_failed_test_cases = runner.get_failed_test_cases()
# runner.email_notice()
# runner.dingtalk_notice()
runner.weixin_notice()
if __name__ == '__main__':

8
templates/__init__.py Normal file
View File

@ -0,0 +1,8 @@
"""
============================
Author:柠檬班-木森
Time:2020/7/16 17:47
E-mail:3247119728@qq.com
Company:湖南零檬信息技术有限公司
============================
"""

9
templates/dingtalk.md Normal file
View File

@ -0,0 +1,9 @@
### 【{{title}}】测试结果
##### 测试人员: {{tester}}
##### 开始时间: {{begin_time}}
##### 执行时间: {{runtime}}
##### 用例总数: {{all}}
##### 成功用例: {{success}}
##### 失败用例: {{fail}}
##### 错误用例: {{error}}
##### 跳过用例: {{skip}}

456
templates/templates.html Normal file
View File

@ -0,0 +1,456 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>测试报告</title>
<link rel="stylesheet" href="https://cdn.staticfile.org/twitter-bootstrap/4.5.0/css/bootstrap.min.css">
<script src=" https://cdn.staticfile.org/jquery/2.0.0/jquery.min.js"></script>
<script src="https://cdn.staticfile.org/echarts/5.1.2/echarts.min.js"></script>
<!-- 页面样式-->
<style type="text/css">
/*标题样式*/
.title {
width: auto;
height: 60px;
text-align: center;
font: bolder 38px/60px "Microsoft YaHei UI";
}
/*汇总信息样式*/
.summary {
width: 90%;
position: absolute;
top: 120px;
margin-left: 5%;
}
.text-left {
font: bolder 20px/30px "Microsoft YaHei UI";
}
.left {
width: 50%;
float: left;
}
.right {
width: 50%;
float: right;
}
.desc {
float: left;
width: 100%;
}
.list-group-item span {
font: normal 16px/38px "Microsoft YaHei UI";
padding: 30px;
}
.list-group-item {
position: relative;
display: block;
padding: .4rem 1.25rem;
background-color: #fff;
border: 1px solid rgba(0, 0, 0, .125);
}
/* 执行信息样式 */
.test_info {
width: 90%;
position: absolute;
top: 900px;
margin-left: 5%;
color: #28a745 !important;
}
.table td, th {
border: solid 2px rgba(9, 122, 51, 0.11) !important;
padding: 0;
line-height: 40px;
text-align: center;
}
select {
border: 0;
padding: 0;
margin: 0;
height: 2em;
width: 8em;
margin-left: 2em;
}
option {
text-align: center;
height: 36px;
font: none 18px/36px "Microsoft YaHei UI";
color: #28a745 !important;
}
.test_log {
background: rgba(163, 171, 189, 0.15);
width: 100%;
height: 50px;
border-top: none;
border-bottom: none;
display: none;
text-align: left;
}
.test_log td {
text-align: left;
height: 30px;
margin: 0;
padding-left: 3em;
padding-right:3em;
font: none 18px/24px "Microsoft YaHei UI";
color: #9e141a;
}
pre {
margin: 0;
white-space: pre-wrap;
white-space: -moz-pre-wrap;
white-space: -o-pre-wrap;
word-wrap: break-word;
line-height: 22px;
font-size: 14px
}
/* 测试图表显示*/
.char {
width: 90%;
position: absolute;
top: 450px;
margin-left: 5%;
color: #28a745 !important;
}
</style>
</head>
<body>
<!--报告标题-->
<div class="title text-success">
<div class="shadow-lg p-3 mb-5 bg-white rounded">{{ title }}</div>
</div>
<!--汇总信息-->
<div class="summary">
<p class="text-left text-success">测试结果汇总</p>
<div class="left">
<ul class="list-group">
<li class="list-group-item">
<button type="button" class="btn btn-success">测试人员</button>
<span class="text-dark">{{ tester }}</span>
</li>
<li class="list-group-item">
<button type="button" class="btn btn-success">开始时间</button>
<span class="text-dark">{{ begin_time }}</span>
</li>
<li class="list-group-item">
<button type="button" class="btn btn-success">执行时间</button>
<span class="text-dark">{{ runtime }}</span>
</li>
<li class="list-group-item">
<button type="button" class="btn btn-success">用例总数</button>
<span class="text-dark">{{ all }}</span>
</li>
</ul>
</div>
<div class="right">
<ul class="list-group">
<li class="list-group-item">
<button type="button" class="btn btn-success">成功用例</button>
<span class="text-success">{{ success }}</span>
</li>
<li class="list-group-item">
<button type="button" class="btn btn-warning">失败用例</button>
<span class="text-warning">{{ fail }}</span>
</li>
<li class="list-group-item">
<button type="button" class="btn btn-danger">错误用例</button>
<span class="text-danger">{{ error }}</span>
</li>
<li class="list-group-item">
<button type="button" class="btn btn-secondary">跳过用例</button>
<span class="text-secondary">{{ skip }}</span>
</li>
</ul>
</div>
<div class="desc">
<ul class="list-group">
<li class="list-group-item">
<button type="button" class="btn btn-success">描述信息</button>
<span class="text-secondary">{{ desc }}</span>
</li>
</ul>
</div>
</div>
<!--测试图表-->
<div class="char">
<p class="text-left text-success">图表展示</p>
<div id="char2" style="width: 49%;height: 400px;float: left"></div>
<div id="char" style="width: 49%;height: 400px ;float: left"></div>
</div>
<!--详细信息-->
<div class="test_info">
<p class="text-left text-success">详细信息</p>
<div class="table_data">
<table class="table">
<thead class="bg-success text-light">
<tr>
<th scope="col" style="width: 5%;padding: 0">编号</th>
<th scope="col" style="width: 20%;padding: 0">
<span>测试类</span>
<select id="testClass">
<option>所有</option>
{% for foo in testClass %}
<option>{{ foo }}</option>
{% endfor %}
</select>
</th>
<th scope="col" style="width: 15%;padding: 0">测试方法</th>
<th scope="col" style="width: 20%;padding: 0">用例描述</th>
<th scope="col" style="width: 15%;padding: 0">执行时间</th>
<th scope="col" style="width: 20%;padding: 0">
<span>执行结果</span>
<select id="testResult">
<option>所有</option>
<option class="text-success">成功</option>
<option class="text-warning">失败</option>
<option class="text-danger">错误</option>
<option class="text-info">跳过</option>
</select>
</th>
<th scope="col" style="width: 10%;padding: 0">详细信息</th>
</tr>
</thead>
<tbody>
{% for foo in results %}
<tr>
<td>{{ loop.index }}</td>
<td class="{{ foo.class_name }}">{{ foo.class_name }}</td>
<td>{{ foo.method_name }}</td>
<td>{{ foo.method_doc }}</td>
<td>{{ foo.run_time }}</td>
{% if foo.state == '成功' %}
<td class="text-success">{{ foo.state }}</td>
{% elif foo.state == '失败' %}
<td class="text-warning">{{ foo.state }}</td>
{% elif foo.state == '错误' %}
<td class="text-danger">{{ foo.state }}</td>
{% else %}
<td class="text-info">{{ foo.state }}</td>
{% endif %}
<td>
<button type="button" class="btn btn-success btn_info">查看详情</button>
</td>
</tr>
<tr class="test_log">
<td colspan="7" class="small text-muted" style=" word-wrap:break-word; word-break:break-all">
{% for item in foo.run_info %}
<pre>{{ item }}</pre>
{% endfor %}
{% if foo.run_info == [] %}
<pre>无内容输出!</pre>
{% endif %}
</td>
</tr>
{% endfor %}
</tbody>
</table>
</div>
<div style="height: 200px"></div>
</div>
<script>
var tbodyTr = $('tbody tr');
var testResult = $("#testResult");
var testClass = $("#testClass");
<!-- 用例执行详细信息显示切换-->
$(".btn_info").click(function () {
$(this).parent().parent().next().toggle();
});
// 当选择用例类之后触发
testClass.change(function () {
var cls = $(this).val();
var res = testResult.val();
elementDisplay(cls, res);
sort()
});
testResult.change(function () {
var res = $(this).val();
var cls = testClass.val();
elementDisplay(cls, res);
sort()
});
function elementDisplay(cls, res) {
// 用例数据的显示
if (cls === "所有") {
if (res === "所有") {
tbodyTr.has('button').show();
} else if (res === '成功') {
tbodyTr.hide();
tbodyTr.has('button').has('.text-success').show()
} else if (res === '失败') {
tbodyTr.hide();
tbodyTr.has('button').has('.text-warning').show()
} else if (res === '错误') {
tbodyTr.hide();
tbodyTr.has('button').has('.text-danger').show()
} else if (res === '跳过') {
tbodyTr.hide();
tbodyTr.has('button').has('.text-info').show()
}
} else {
if (res === "所有") {
tbodyTr.hide();
tbodyTr.has('button').has('.' + cls + '').show()
} else if (res === '成功') {
tbodyTr.hide();
tbodyTr.has('button').has('.' + cls + '').has('.text-success').show()
} else if (res === '失败') {
tbodyTr.hide();
tbodyTr.has('button').has('.' + cls + '').has('.text-warning').show()
} else if (res === '错误') {
tbodyTr.hide();
tbodyTr.has('button').has('.' + cls + '').has('.text-danger').show()
} else if (res === '跳过') {
tbodyTr.hide();
tbodyTr.has('button').has('.' + cls + '').has('.text-info').show()
}
}
}
function sort() {
//重新排列显示序号
// 选择所有可以见的tr
var visibleTr = tbodyTr.filter(":visible");
visibleTr.each(function (index, element) {
element.firstElementChild.innerHTML = index + 1;
})
}
</script>
<script type="text/javascript">
// 基于准备好的dom初始化echarts实例
var myChart = echarts.init(document.getElementById('char'));
var myChart2 = echarts.init(document.getElementById('char2'));
// 指定图表的配置项和数据
option = {
color: ['#00a10a', '#ddb518', 'rgba(204,46,41,0.73)', '#85898c'],
tooltip: {
trigger: 'item',
formatter: '{a} <br/>{b}: {c} ({d}%)'
},
legend: {
orient: 'vertical',
left: 10,
data: ['通过', '失败', '错误', '跳过']
},
series: [
{
name: '测试结果',
type: 'pie',
radius: ['50%', '70%'],
avoidLabelOverlap: false,
label: {
show: false,
position: 'center'
},
emphasis: {
label: {
show: true,
fontSize: '30',
fontWeight: 'bold'
}
},
labelLine: {
show: false
},
data: [
{value: {{success}}, name: '通过'},
{value: {{fail}}, name: '失败'},
{value: {{error}}, name: '错误'},
{value: {{skip}}, name: '跳过'}
]
}
]
};
option2 = {
tooltip: {
formatter: '{a} <br/>{b} : {c}%'
},
toolbox: {
feature: {
restore: {},
saveAsImage: {}
}
},
series: [
{
name: '测试结果',
type: 'gauge',
detail: {formatter: '{{pass_rate}}%'},
data: [{value: '{{pass_rate}}', name: '用例通过率'}],
axisLine: {
lineStyle: {
color: [
[0.2, '#c20000'],
[0.8, '#ddb518'],
[1, '#00a10a']]
}
}
}
]
};
myChart2.setOption(option2);
// 使用刚指定的配置项和数据显示图表。
myChart.setOption(option);
</script>
</body>
</html>

View File

@ -0,0 +1,90 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>测试报告</title>
<!-- 页面样式-->
<style type="text/css">
.main {
height: 300px;
}
/*汇总信息样式*/
.summary {
width: 90%;
position: absolute;
margin-left: 5%;
}
.title {
width: auto;
height: 60px;
text-align: center;
font: bolder 38px/60px "Microsoft YaHei UI";
color: #02992c;
margin-bottom: 50px;
}
.text-left {
text-align: left;
font: bolder 20px/30px "Microsoft YaHei UI";
color: #02992c;
margin-bottom: 20px;
}
td, th {
border: solid 1px rgba(133, 137, 140, 0.76);
height: 50px;
width: 200px;
}
td {
color: #02992c;
padding-left: 5%;
}
th {
color: #00a10a;
}
</style>
</head>
<body>
<div class="main">
<!--汇总信息-->
<div class="summary">
<div class="text-left">【{{ title }}】用例执行汇总信息如下:</div>
<table width=60% border="1" cellpadding="0" cellspacing="0" bordercolor="#FF0000"
style="border-collapse:collapse;">
<tr>
<th>测试人员</th>
<td>{{ tester }}</td>
<th>成功用例</th>
<td><span>{{ success }}</span></td>
</tr>
<tr>
<th>开始时间</th>
<td><span>{{ begin_time }}</span></td>
<th style="color: #cc8b0c">失败用例</th>
<td style="color: #cc8b0c">{{ fail }}</td>
</tr>
<tr>
<th>执行时间</th>
<td><span>{{ runtime }}</span></td>
<th style="color: #cc0109">错误用例</th>
<td><span style="color: #cc0109">{{ error }}</span></td>
</tr>
<tr>
<th>用例总数</th>
<td><span>{{ all }}</span></td>
<th style="color: #575a5c">跳过用例</th>
<td><span style="color: #575a5c">{{ skip }}</span></td>
</tr>
</table>
</div>
</div>
</body>
</html>

799
templates/templates2.html Normal file
View File

@ -0,0 +1,799 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>{{ title }}</title>
<link rel="stylesheet" href="https://cdn.staticfile.org/twitter-bootstrap/4.5.0/css/bootstrap.min.css">
<script src=" https://cdn.staticfile.org/jquery/2.0.0/jquery.min.js"></script>
<script src="https://cdn.staticfile.org/echarts/5.1.2/echarts.min.js"></script>
<!-- 页面样式-->
<style type="text/css">
/*标题样式*/
.main {
background: url("image/test_img.png") no-repeat;
background-size: 100%;
-webkit-background-size: 100%;
width: 100%;
height: 100%;
}
.title {
width: auto;
height: 80px;
text-align: center;
font: bolder 30px/80px "Microsoft YaHei UI";
color: #fff;
background: rgba(3, 14, 70, 0.5);
border-bottom: solid 1px rgb(3, 14, 70);
margin-bottom: 10px;
}
.content-box {
height: 700px;
width: 100%;
display: flex;
margin-bottom: 10px;
}
.box1 {
flex: 3;
}
.box2 {
flex: 5;
margin: 0 10px;
}
.box3 {
flex: 4;
}
.panel {
position: relative;
border: 1px solid rgba(25, 186, 139, 0.17);
background: rgba(3, 14, 70, 0.5);
padding: 0 10px 50px;
margin-bottom: 10px;
}
.panel h2, .test_info h2 {
height: 40px;
line-height: 40px;
text-align: center;
color: #007bff;
font-size: 20px;
font-weight: bold;
margin: 0;
}
.panel .chart {
height: 250px;
}
.panel .chart2 {
height: 350px;
}
.panel .desc {
height: 150px;
}
.panel .desc .info {
font: normal 18px/25px "Microsoft YaHei UI";
color: #fff;
display: flex;
}
.panel .desc .info div {
padding: 10px;
border: solid 1px #6c757d;
}
.panel .desc .info div span {
margin-left: 20px;
}
/* 执行信息样式 */
.test_info {
width: 100%;
padding: 20px 100px;
background: rgba(3, 14, 70, 0.5);
}
.table td, .table th {
border: solid 1px #5765a4 !important;
padding: 0 !important;
line-height: 40px;
text-align: center;
height: 40px;
color: #fff;
}
select {
border: 0;
padding: 0;
margin: 0;
height: 2em;
width: 8em;
margin-left: 2em;
}
option {
text-align: center;
height: 36px;
font: none 18px/36px "Microsoft YaHei UI";
color: #28a745 !important;
}
.test_log {
background: rgba(163, 171, 189, 0.15);
width: 100%;
height: 50px;
border-top: none;
border-bottom: none;
display: none;
text-align: left;
}
.test_log td {
text-align: left;
height: 30px;
margin: 0;
padding-left: 3em;
padding-right: 3em;
font: none 18px/24px "Microsoft YaHei UI";
}
pre {
margin: 0;
white-space: pre-wrap;
white-space: -moz-pre-wrap;
white-space: -o-pre-wrap;
word-wrap: break-word;
color: #fff;
line-height: 22px;
font-size: 14px
}
.chart4 {
overflow: auto;
width: 100%;
height: 600px;
}
.chart4::-webkit-scrollbar {
width: 10px;
}
.chart4::-webkit-scrollbar-thumb {
/*滚动条里面小方块*/
border-radius: 5px;
-webkit-box-shadow: inset 0 0 5px rgba(4, 0, 225, 0.62);
background: #272789;
height: 10px;
}
.chart4::-webkit-scrollbar-track {
/*滚动条里面轨道*/
-webkit-box-shadow: inset 0 0 5px rgba(0, 21, 255, 0.54);
border-radius: 10px;
background: rgba(214, 214, 214, 0.64);
}
/*详细内容描述的小标题*/
.table_title {
text-align: center;
background: rgba(1, 2, 37, 0.72);
font: bold 18px/30px 'Microsoft YaHei UI';
color: #fff;
border-radius: 10px
}
</style>
</head>
<body>
<div class="main">
<div class="title">
{{ title }}
</div>
<div class="content-box">
<div class="box1">
<div class="panel">
<h2>执行结果</h2>
<div class="chart" id="char3"></div>
<div class="panel-footer"></div>
</div>
<div class="panel ">
<h2>成功占比</h2>
<div class="chart" id="char1"></div>
<div class="panel-footer"></div>
</div>
</div>
<div class="box2">
<div class="panel">
<h2>运行信息</h2>
<div class="desc">
<div class="info">
<div style="flex:5">
<button type="button" class="btn btn-primary btn-sm">开始时间</button>
<span>{{ begin_time }}</span>
</div>
<div style="flex:5">
<button type="button" class="btn btn-primary btn-sm">用例总数</button>
<span>{{ all }}</span>
</div>
</div>
<div class="info">
<div style="flex:5">
<button type="button" class="btn btn-primary btn-sm">运行时长</button>
<span>{{ runtime }}</span>
</div>
<div style="flex:5">
<button type="button" class="btn btn-primary btn-sm ">测试人员</button>
<span>{{ tester }}</span>
</div>
</div>
<div class="info">
<div style="flex:5">
<button type="button" class="btn btn-primary btn-sm">成功用例</button>
<span>{{ success }}</span>
</div>
<div style="flex:5">
<button type="button" class="btn btn-primary btn-sm ">通过率</button>
<span>{{pass_rate}}%</span>
</div>
</div>
</div>
</div>
<div class="panel">
<h2>通过率趋势图</h2>
<div class="chart2" id="char2"></div>
</div>
</div>
<div class="box3">
<div class="panel">
<h2>历史构建结果</h2>
<div class="chart4">
<table class="table" style="color: #d6d6d6;padding: 0">
<thead>
<tr>
<th scope="col">执行时间</th>
<th scope="col">用例总数</th>
<th scope="col">成功用例数</th>
<th scope="col">通过率</th>
</tr>
</thead>
<tbody>
{% for hos in history[::-1] %}
<tr>
<th scope="row">{{hos.begin_time}}</th>
<td>{{hos.all}}</td>
<td>{{hos.success}}</td>
<td>{{hos.pass_rate}}%</td>
</tr>
{% endfor %}
</tbody>
</table>
</div>
</div>
</div>
</div>
<div class="test_info">
<h2>本次运行详情</h2>
<div class="table_data">
<table class="table" style="color: #fff">
<thead class="text-light" style="background: rgba(3, 14, 70, 0.5)">
<tr>
<th scope="col" style="width: 5%;padding: 0">编号</th>
<th scope="col" style="width: 20%;padding: 0">
<span>测试类</span>
<select id="testClass">
<option>所有</option>
{% for foo in testClass %}
<option>{{ foo }}</option>
{% endfor %}
</select>
</th>
<th scope="col" style="width: 15%;padding: 0">测试方法</th>
<th scope="col" style="width: 20%;padding: 0">用例描述</th>
<th scope="col" style="width: 15%;padding: 0">执行时间</th>
<th scope="col" style="width: 20%;padding: 0">
<span>执行结果</span>
<select id="testResult">
<option>所有</option>
<option class="text-success">成功</option>
<option class="text-danger">失败</option>
<option class="text-warning">错误</option>
<option class="text-info">跳过</option>
</select>
</th>
<th scope="col" style="width: 10%;padding: 0">详细信息</th>
</tr>
</thead>
<tbody>
{% for foo in results %}
<tr class="case_">
<td>{{ loop.index }}</td>
<td class="{{ foo.class_name }}">{{ foo.class_name }}</td>
<td>{{ foo.method_name }}</td>
<td>{{ foo.method_doc }}</td>
<td>{{ foo.run_time }}</td>
{% if foo.state == '成功' %}
<td class="text-success">{{ foo.state }}</td>
{% elif foo.state == '失败' %}
<td class="text-warning">{{ foo.state }}</td>
{% elif foo.state == '错误' %}
<td class="text-danger">{{ foo.state }}</td>
{% else %}
<td class="text-info">{{ foo.state }}</td>
{% endif %}
<td>
<button type="button" class="btn btn_info btn-primary btn-sm">查看详情</button>
</td>
</tr>
<tr class="test_log">
<td colspan="7" class="small text-muted" style=" word-wrap:break-word; word-break:break-all">
{% for item in foo.run_info %}
<pre>{{ item }}</pre>
{% endfor %}
{% if foo.run_info == [] %}
<pre>无内容输出!</pre>
{% endif %}
</td>
</tr>
{% endfor %}
</table>
</div>
<div style="height: 200px"></div>
</div>
</div>
<script type="text/javascript">
const resulte = {
"success": `{{success}}`,
"all": `{{all}}`,
"fail": `{{fail}}`,
"skip": '{{skip}}',
"error": `{{error}}`,
"runtime": '{{runtime}}',
"begin_time": "{{runtime}}",
"pass_rate": '{{pass_rate}}',
}
;
const history = {{history}};
var passRate = [];
var dTime = [];
history.forEach(function (item, index, array) {
passRate.push(item.pass_rate);
dTime.push(item.begin_time)
});
if (passRate.length === 1) {
passRate.unshift(0);
dTime.unshift(0)
}
function char01() {
let myChart = echarts.init(document.getElementById('char1'));
let option = {
color: ['#28a745', '#ffc107', '#dc3545', '#17a2b8'],
tooltip: {
trigger: 'item',
formatter: '{a} <br/>{b}: {c} ({d}%)',
backgroundColor: 'rgba(3, 14, 70, 0.5)',
borderColor: '#333',
textStyle: {
color: '#fff',
fontSize: "13"
},
},
legend: {
bottom: "0%",
// 小图标的宽度和高度
itemWidth: 10,
itemHeight: 10,
data: ['通过', '失败', '错误', '跳过'],
textStyle: {
color: "rgba(255,255,255,.5)",
fontSize: "12"
},
},
series: [
{
name: '测试结果',
type: 'pie',
radius: ['50%', '70%'],
avoidLabelOverlap: false,
label: {
show: false,
position: 'center'
},
emphasis: {
label: {
show: true,
fontSize: '20',
fontWeight: 'bold',
color: '#fff',
}
},
labelLine: {
show: false
},
data: [
{value: resulte.success, name: '通过'},
{value: resulte.fail, name: '失败'},
{value: resulte.error, name: '错误'},
{value: resulte.skip, name: '跳过'}
]
}
]
};
myChart.setOption(option)
}
char01();
// 大图
function char02() {
// 基于准备好的dom初始化echarts实例
let myChart = echarts.init(document.getElementById("char2"));
// 2. 指定配置和数据
option = {
tooltip: {
trigger: 'axis',
formatter: '{a} <br/>{b}: ({c}%)',
backgroundColor: 'rgba(3, 14, 70, 0.5)',
borderColor: '#333',
textStyle: {
color: '#fff',
fontSize: "13"
},
},
grid: {
left: "10",
top: "30",
right: "10",
bottom: "0",
containLabel: true
},
xAxis: [{
type: "category",
boundaryGap: false,
show: false,
axisLabel: {
textStyle: {
color: "rgba(255,255,255,.6)",
fontSize: 12
}
},
axisLine: {
lineStyle: {
color: "rgba(255,255,255,.2)"
}
},
data: dTime
},
{
axisPointer: {
show: false
},
axisLine: {
show: false
},
position: "bottom",
offset: 20
}
],
yAxis: [{
type: "value",
axisTick: {
show: false
},
axisLine: {
lineStyle: {
color: "rgba(255,255,255,.1)"
}
},
axisLabel: {
textStyle: {
color: "rgba(255,255,255,.6)",
fontSize: 12
}
},
splitLine: {
lineStyle: {
color: "rgba(255,255,255,.1)"
}
}
}],
series: [{
name: "通过率",
type: "line",
smooth: true,
symbol: "circle",
symbolSize: 5,
showSymbol: true,
lineStyle: {
normal: {
color: "#0184d5",
width: 2
}
},
areaStyle: {
normal: {
color: new echarts.graphic.LinearGradient(
0,
0,
0,
1,
[{
offset: 0,
color: "rgba(1, 132, 213, 0.4)"
},
{
offset: 0.8,
color: "rgba(1, 132, 213, 0.1)"
}
],
false
),
shadowColor: "rgba(0, 0, 0, 0.1)"
}
},
itemStyle: {
normal: {
color: "#0184d5",
borderColor: "rgba(221, 220, 107, .1)",
borderWidth: 18
}
},
data: passRate
},
]
};
// 重新把配置好的新数据给实例对象
myChart.setOption(option);
}
char02();
function char03() {
// 基于准备好的dom初始化echarts实例
let myChart = echarts.init(document.getElementById("char3"));
var data = [resulte.success, resulte.fail, resulte.error, resulte.skip];
var titlename = ["通过用例", "失败用例", "错误用例", "跳过用例",];
var valdata = [resulte.all, resulte.all, resulte.all, resulte.all];
var myColor = ['#28a745', '#ffc107', '#dc3545', '#17a2b8'];
option = {
//图标位置
grid: {
top: "10%",
left: "22%",
bottom: "10%"
},
xAxis: {
show: false
},
yAxis: [{
show: true,
data: titlename,
inverse: true,
axisLine: {
show: false
},
splitLine: {
show: false
},
axisTick: {
show: false
},
axisLabel: {
color: "#fff",
rich: {
lg: {
backgroundColor: "#339911",
color: "#fff",
borderRadius: 15,
align: "center",
width: 15,
height: 15
}
}
}
},
{
show: false,
inverse: true,
data: valdata,
axisLabel: {
textStyle: {
fontSize: 12,
color: "#fff"
}
},
axisTick: {
show: false
},
axisLine: {
show: false
},
}
],
series: [{
type: "bar",
yAxisIndex: 0,
data: data,
barCategoryGap: 50,
barWidth: 18,
itemStyle: {
normal: {
barBorderRadius: 20,
color: function (params) {
var num = myColor.length;
return myColor[params.dataIndex % num];
}
}
},
label: {
normal: {
show: true,
position: "right",
formatter: "{c}条",
color:"#fff",
}
}
},
{
type: "bar",
yAxisIndex: 1,
barCategoryGap: 50,
data: valdata,
barWidth: 20,
itemStyle: {
normal: {
color: "none",
borderColor: "#00c1de",
borderWidth: 2,
barBorderRadius: 15
}
}
}
]
};
// 使用刚指定的配置项和数据显示图表。
myChart.setOption(option);
window.addEventListener("resize", function () {
myChart.resize();
});
}
char03()
</script>
<script>
var tbodyTr = $('tbody .case_');
var testResult = $("#testResult");
var testClass = $("#testClass");
<!-- 用例执行详细信息显示切换-->
$(".btn_info").click(function () {
$(this).parent().parent().next().toggle();
});
// 当选择用例类之后触发
testClass.change(function () {
$('.test_log').hide();
var cls = $(this).val();
var res = testResult.val();
elementDisplay(cls, res);
sort()
});
testResult.change(function () {
var res = $(this).val();
var cls = testClass.val();
elementDisplay(cls, res);
sort()
});
function elementDisplay(cls, res) {
// 用例数据的显示
if (cls === "所有") {
if (res === "所有") {
tbodyTr.has('button').show();
} else if (res === '成功') {
tbodyTr.hide();
tbodyTr.has('button').has('.text-success').show()
} else if (res === '错误') {
tbodyTr.hide();
tbodyTr.has('button').has('.text-danger').show()
} else if (res === '失败') {
tbodyTr.hide();
tbodyTr.has('button').has('.text-warning').show()
} else if (res === '跳过') {
tbodyTr.hide();
tbodyTr.has('button').has('.text-info').show()
}
} else {
if (res === "所有") {
tbodyTr.hide();
tbodyTr.has('button').has('.' + cls + '').show()
} else if (res === '成功') {
tbodyTr.hide();
tbodyTr.has('button').has('.' + cls + '').has('.text-success').show()
} else if (res === '错误') {
tbodyTr.hide();
tbodyTr.has('button').has('.' + cls + '').has('.text-danger').show()
} else if (res === '失败') {
tbodyTr.hide();
tbodyTr.has('button').has('.' + cls + '').has('.text-warning').show()
} else if (res === '跳过') {
tbodyTr.hide();
tbodyTr.has('button').has('.' + cls + '').has('.text-info').show()
}
}
}
function sort() {
//重新排列显示序号
// 选择所有可以见的tr
var visibleTr = tbodyTr.filter(":visible");
visibleTr.each(function (index, element) {
element.firstElementChild.innerHTML = index + 1;
})
}
$('.nav-tabs li').click(function () {
$(this).find('a').addClass('active');
$(this).siblings().find('a').removeClass('active');
$(this).parent().next().children('.tab-content div').eq($(this).index()).addClass('active show').siblings().removeClass('active show')
});
</script>
</body>
</html>

7375
templates/templates3.html Normal file

File diff suppressed because it is too large Load Diff

20
templates/weChat.md Normal file
View File

@ -0,0 +1,20 @@
# **提醒!自动化测试反馈**\n#### **请相关同事注意,及时跟进!**
> 项目名称:<font color='info'>{{title}}</font>
> 测试人员:<font color='info'>{{tester}}</font>
> 开始时间:<font color='info'>{{begin_time}}</font>
> 运行时间:<font color='info'>{{runtime}}</font>
> 测试用例总数:<font color='info'>{{all}}</font>
> 测试用例通过率:<font color='info'>{pass_rate}%</font>
> 成功数: <font color='info'>{{success}}</font>
> 失败数:<font color='warning'>{{fail}}</font>
> 跳过数:<font color='info'>{{skip}}, </font>
> 错误数:<font color='comment'>{{error}}</font>
> **--------------------上一次运行结果--------------------**
> 测试用例总数: <font color='info'>{{ history[-1]['all'] }}</font>
> 测试用例通过率:<font color='info'>{{history[-1]['pass_rate']}}%</font>
> 成功数:<font color='info'>{{history[-1]['success']}}</font>
> 失败数:<font color='warning'>{{history[-1]['fail']}}</font>
> 跳过数:<font color='info'>{{history[-1]['skip']}}</font>
> 错误数:<font color='comment'>{{history[-1]['error']}}</font>
> ##### **报告链接:** [jenkins报告,请点击后进入查看](report_url)

View File

@ -9,12 +9,13 @@
"""
import unittest
from ddt import ddt, data
import extensions
from common.file_handling.do_excel import DoExcel
from common.utils.action import Action
from config import Config
from unittestreportnew import list_data
# from ddt import ddt, data
from unittestreportnew.core.dataDriver import ddt
test_file = Config.test_case # 获取 excel 文件路径
excel = DoExcel(test_file)
@ -34,7 +35,8 @@ class TestProjectApi(unittest.TestCase):
def setUp(self) -> None:
pass
@data(*test_case)
# @data(*test_case)
@list_data(test_case)
def test_api(self, item):
sheet, iid, condition, st, name, desc, h_crypto, r_crypto, method, expected = self.action.base_info(item)
if self.action.is_run(condition):
@ -62,7 +64,6 @@ class TestProjectApi(unittest.TestCase):
@classmethod
def tearDownClass(cls) -> None:
excel.close_excel()
cls.action.logger.info(f"所有用例执行完毕")
if __name__ == '__main__':

View File

@ -0,0 +1,14 @@
"""
unittestreport基于unittest扩展了5个功能
1html测试报告的生成(三个风格)
2测试用例失败重运行
3测试报告邮件发送功能
4数据驱动
5多线程执行测试用例
"""
from .core.testRunner import TestRunner,Load
from common.utils.decorators import list_data,json_data,yaml_data
from common.utils.decorators import rerun,run_count
# from .core.reRun import rerun

View File

View File

@ -0,0 +1,65 @@
from functools import wraps
def _create_test_name(index, name, title):
"""
Create a new test name based on index and name.
:param index: Index for generating the test name.
:param name: Base name for the test.
:param title: Base title for the test.
:return: Generated test name.
"""
test_name = f"{name}_{index + 1:03}_{title}"
return test_name
def _set_function_attributes(func, original_func, new_name, test_desc):
"""
Set attributes of a function.
:param func: The function to set attributes for.
:param original_func: The original function being wrapped.
:param new_name: New name for the function.
:param test_desc: New documentation for the function.
"""
func.__wrapped__ = original_func
func.__name__ = new_name
func.__doc__ = test_desc
def _update_func(new_func_name, params, test_desc, func, *args, **kwargs):
"""
Create a wrapper function with updated attributes.
:param new_func_name: New name for the wrapper function.
:param params: Test parameters.
:param test_desc: Test description.
:param func: Original function to be wrapped.
:param args: Additional positional arguments for the function.
:param kwargs: Additional keyword arguments for the function.
:return: Wrapped function.
"""
@wraps(func)
def wrapper(self):
return func(self, params, *args, **kwargs)
_set_function_attributes(wrapper, func, new_func_name, test_desc)
return wrapper
def ddt(cls):
"""
:param cls: 测试类
:return:
"""
for func_name, func in list(cls.__dict__.items()):
if hasattr(func, "PARAMS"):
for index, case_data in enumerate(getattr(func, "PARAMS")):
name = str(case_data.get("Name", "缺少Name"))
test_desc = str(case_data.get("Description", "缺少Description"))
new_test_name = _create_test_name(index, func_name, name)
func2 = _update_func(new_test_name, case_data, test_desc, func)
setattr(cls, new_test_name, func2)
else:
# Avoid name clashes
delattr(cls, func_name)
return cls

View File

@ -0,0 +1,635 @@
import re
import sys
import inspect
import warnings
from functools import wraps
from types import MethodType as MethodType
from collections import namedtuple
try:
from collections import OrderedDict as MaybeOrderedDict
except ImportError:
MaybeOrderedDict = dict
from unittest import TestCase
try:
from unittest import SkipTest
except ImportError:
class SkipTest(Exception):
pass
PY3 = sys.version_info[0] == 3
PY2 = sys.version_info[0] == 2
if PY3:
class InstanceType():
pass
lzip = lambda *a: list(zip(*a))
text_type = str
string_types = str,
bytes_type = bytes
def make_method(func, instance, type):
if instance is None:
return func
return MethodType(func, instance)
CompatArgSpec = namedtuple("CompatArgSpec", "args varargs keywords defaults")
def getargspec(func):
if PY2:
return CompatArgSpec(*inspect.getargspec(func))
args = inspect.getfullargspec(func)
if args.kwonlyargs:
raise TypeError((
"parameterized does not (yet) support functions with keyword "
"only arguments, but %r has keyword only arguments. "
"Please open an issue with your usecase if this affects you: "
"https://github.com/wolever/parameterized/issues/new"
) % (func,))
return CompatArgSpec(*args[:4])
def skip_on_empty_helper(*a, **kw):
raise SkipTest("parameterized input is empty")
def reapply_patches_if_need(func):
def dummy_wrapper(orgfunc):
@wraps(orgfunc)
def dummy_func(*args, **kwargs):
return orgfunc(*args, **kwargs)
return dummy_func
if hasattr(func, 'patchings'):
func = dummy_wrapper(func)
tmp_patchings = func.patchings
delattr(func, 'patchings')
for patch_obj in tmp_patchings:
func = patch_obj.decorate_callable(func)
return func
def delete_patches_if_need(func):
if hasattr(func, 'patchings'):
func.patchings[:] = []
_param = namedtuple("param", "args kwargs")
class param(_param):
""" Represents a single parameter to a test case.
For example::
>>> p = param("foo", bar=16)
>>> p
param("foo", bar=16)
>>> p.args
('foo', )
>>> p.kwargs
{'bar': 16}
Intended to be used as an argument to ``@parameterized``::
@parameterized([
param("foo", bar=16),
])
def test_stuff(foo, bar=16):
pass
"""
def __new__(cls, *args, **kwargs):
return _param.__new__(cls, args, kwargs)
@classmethod
def explicit(cls, args=None, kwargs=None):
""" Creates a ``param`` by explicitly specifying ``args`` and
``kwargs``::
>>> param.explicit([1,2,3])
param(*(1, 2, 3))
>>> param.explicit(kwargs={"foo": 42})
param(*(), **{"foo": "42"})
"""
args = args or ()
kwargs = kwargs or {}
return cls(*args, **kwargs)
@classmethod
def from_decorator(cls, args):
""" Returns an instance of ``param()`` for ``@parameterized`` argument
``args``::
>>> param.from_decorator((42, ))
param(args=(42, ), kwargs={})
>>> param.from_decorator("foo")
param(args=("foo", ), kwargs={})
"""
if isinstance(args, param):
return args
elif isinstance(args, string_types):
args = (args,)
try:
return cls(*args)
except TypeError as e:
if "after * must be" not in str(e):
raise
raise TypeError(
"Parameters must be tuples, but %r is not (hint: use '(%r, )')"
% (args, args),
)
def __repr__(self):
return "param(*%r, **%r)" % self
class QuietOrderedDict(MaybeOrderedDict):
""" When OrderedDict is available, use it to make sure that the kwargs in
doc strings are consistently ordered. """
__str__ = dict.__str__
__repr__ = dict.__repr__
def parameterized_argument_value_pairs(func, p):
"""Return tuples of parameterized arguments and their values.
This is useful if you are writing your own doc_func
function and need to know the values for each parameter name::
>>> def func(a, foo=None, bar=42, **kwargs): pass
>>> p = param(1, foo=7, extra=99)
>>> parameterized_argument_value_pairs(func, p)
[("a", 1), ("foo", 7), ("bar", 42), ("**kwargs", {"extra": 99})]
If the function's first argument is named ``self`` then it will be
ignored::
>>> def func(self, a): pass
>>> p = param(1)
>>> parameterized_argument_value_pairs(func, p)
[("a", 1)]
Additionally, empty ``*args`` or ``**kwargs`` will be ignored::
>>> def func(foo, *args): pass
>>> p = param(1)
>>> parameterized_argument_value_pairs(func, p)
[("foo", 1)]
>>> p = param(1, 16)
>>> parameterized_argument_value_pairs(func, p)
[("foo", 1), ("*args", (16, ))]
"""
argspec = getargspec(func)
arg_offset = 1 if argspec.args[:1] == ["self"] else 0
named_args = argspec.args[arg_offset:]
result = lzip(named_args, p.args)
named_args = argspec.args[len(result) + arg_offset:]
varargs = p.args[len(result):]
result.extend([
(name, p.kwargs.get(name, default))
for (name, default)
in zip(named_args, argspec.defaults or [])
])
seen_arg_names = set([n for (n, _) in result])
keywords = QuietOrderedDict(sorted([
(name, p.kwargs[name])
for name in p.kwargs
if name not in seen_arg_names
]))
if varargs:
result.append(("*%s" % (argspec.varargs,), tuple(varargs)))
if keywords:
result.append(("**%s" % (argspec.keywords,), keywords))
return result
def short_repr(x, n=64):
""" A shortened repr of ``x`` which is guaranteed to be ``unicode``::
>>> short_repr("foo")
u"foo"
>>> short_repr("123456789", n=4)
u"12...89"
"""
x_repr = repr(x)
if isinstance(x_repr, bytes_type):
try:
x_repr = text_type(x_repr, "utf-8")
except UnicodeDecodeError:
x_repr = text_type(x_repr, "latin1")
if len(x_repr) > n:
x_repr = x_repr[:n // 2] + "..." + x_repr[len(x_repr) - n // 2:]
return x_repr
def default_doc_func(func, num, p):
if func.__doc__ is None:
return None
all_args_with_values = parameterized_argument_value_pairs(func, p)
# Assumes that the function passed is a bound method.
descs = ["%s=%s" % (n, short_repr(v)) for n, v in all_args_with_values]
# The documentation might be a multiline string, so split it
# and just work with the first string, ignoring the period
# at the end if there is one.
first, nl, rest = func.__doc__.lstrip().partition("\n")
suffix = ""
if first.endswith("."):
suffix = "."
first = first[:-1]
args = "%s[with %s]" % (len(first) and " " or "", ", ".join(descs))
return "".join([first.rstrip(), args, suffix, nl, rest])
def default_name_func(func, num, p):
base_name = func.__name__
name_suffix = "_%s" % (num,)
if len(p.args) > 0 and isinstance(p.args[0], string_types):
name_suffix += "_" + parameterized.to_safe_name(p.args[0])
return base_name + name_suffix
_test_runner_override = None
_test_runner_guess = False
_test_runners = set(["unittest", "unittest2", "nose", "nose2", "pytest"])
_test_runner_aliases = {
"_pytest": "pytest",
}
def set_test_runner(name):
global _test_runner_override
if name not in _test_runners:
raise TypeError(
"Invalid test runner: %r (must be one of: %s)"
% (name, ", ".join(_test_runners)),
)
_test_runner_override = name
def detect_runner():
""" Guess which test runner we're using by traversing the stack and looking
for the first matching module. This *should* be reasonably safe, as
it's done during test disocvery where the test runner should be the
stack frame immediately outside. """
if _test_runner_override is not None:
return _test_runner_override
global _test_runner_guess
if _test_runner_guess is False:
stack = inspect.stack()
for record in reversed(stack):
frame = record[0]
module = frame.f_globals.get("__name__").partition(".")[0]
if module in _test_runner_aliases:
module = _test_runner_aliases[module]
if module in _test_runners:
_test_runner_guess = module
break
if record[1].endswith("python2.6/unittest.py"):
_test_runner_guess = "unittest"
break
else:
_test_runner_guess = None
return _test_runner_guess
class parameterized(object):
""" Parameterize a test case::
class TestInt(object):
@parameterized([
("A", 10),
("F", 15),
param("10", 42, base=42)
])
def test_int(self, input, expected, base=16):
actual = int(input, base=base)
assert_equal(actual, expected)
@parameterized([
(2, 3, 5)
(3, 5, 8),
])
def test_add(a, b, expected):
assert_equal(a + b, expected)
"""
def __init__(self, input, doc_func=None, skip_on_empty=False):
self.get_input = self.input_as_callable(input)
self.doc_func = doc_func or default_doc_func
self.skip_on_empty = skip_on_empty
def __call__(self, test_func):
self.assert_not_in_testcase_subclass()
@wraps(test_func)
def wrapper(test_self=None):
test_cls = test_self and type(test_self)
if test_self is not None:
if issubclass(test_cls, InstanceType):
raise TypeError((
"@parameterized can't be used with old-style classes, but "
"%r has an old-style class. Consider using a new-style "
"class, or '@parameterized.expand' "
"(see http://stackoverflow.com/q/54867/71522 for more "
"information on old-style classes)."
) % (test_self,))
original_doc = wrapper.__doc__
for num, args in enumerate(wrapper.parameterized_input):
p = param.from_decorator(args)
unbound_func, nose_tuple = self.param_as_nose_tuple(test_self, test_func, num, p)
try:
wrapper.__doc__ = nose_tuple[0].__doc__
# Nose uses `getattr(instance, test_func.__name__)` to get
# a method bound to the test instance (as opposed to a
# method bound to the instance of the class created when
# tests were being enumerated). Set a value here to make
# sure nose can get the correct test method.
if test_self is not None:
setattr(test_cls, test_func.__name__, unbound_func)
yield nose_tuple
finally:
if test_self is not None:
delattr(test_cls, test_func.__name__)
wrapper.__doc__ = original_doc
input = self.get_input()
if not input:
if not self.skip_on_empty:
raise ValueError(
"Parameters iterable is empty (hint: use "
"`parameterized([], skip_on_empty=True)` to skip "
"this test when the input is empty)"
)
wrapper = wraps(test_func)(skip_on_empty_helper)
wrapper.parameterized_input = input
wrapper.parameterized_func = test_func
test_func.__name__ = "_parameterized_original_%s" % (test_func.__name__,)
return wrapper
def param_as_nose_tuple(self, test_self, func, num, p):
nose_func = wraps(func)(lambda *args: func(*args[:-1], **args[-1]))
nose_func.__doc__ = self.doc_func(func, num, p)
# Track the unbound function because we need to setattr the unbound
# function onto the class for nose to work (see comments above), and
# Python 3 doesn't let us pull the function out of a bound method.
unbound_func = nose_func
if test_self is not None:
# Under nose on Py2 we need to return an unbound method to make
# sure that the `self` in the method is properly shared with the
# `self` used in `setUp` and `tearDown`. But only there. Everyone
# else needs a bound method.
func_self = (
None if PY2 and detect_runner() == "nose" else
test_self
)
nose_func = make_method(nose_func, func_self, type(test_self))
return unbound_func, (nose_func,) + p.args + (p.kwargs or {},)
def assert_not_in_testcase_subclass(self):
parent_classes = self._terrible_magic_get_defining_classes()
if any(issubclass(cls, TestCase) for cls in parent_classes):
raise Exception("Warning: '@parameterized' tests won't work "
"inside subclasses of 'TestCase' - use "
"'@parameterized.expand' instead.")
def _terrible_magic_get_defining_classes(self):
""" Returns the set of parent classes of the class currently being defined.
Will likely only work if called from the ``parameterized`` decorator.
This function is entirely @brandon_rhodes's fault, as he suggested
the implementation: http://stackoverflow.com/a/8793684/71522
"""
stack = inspect.stack()
if len(stack) <= 4:
return []
frame = stack[4]
code_context = frame[4] and frame[4][0].strip()
if not (code_context and code_context.startswith("class ")):
return []
_, _, parents = code_context.partition("(")
parents, _, _ = parents.partition(")")
return eval("[" + parents + "]", frame[0].f_globals, frame[0].f_locals)
@classmethod
def input_as_callable(cls, input):
if callable(input):
return lambda: cls.check_input_values(input())
input_values = cls.check_input_values(input)
return lambda: input_values
@classmethod
def check_input_values(cls, input_values):
# Explicitly convery non-list inputs to a list so that:
# 1. A helpful exception will be raised if they aren't iterable, and
# 2. Generators are unwrapped exactly once (otherwise `nosetests
# --processes=n` has issues; see:
# https://github.com/wolever/nose-parameterized/pull/31)
if not isinstance(input_values, list):
input_values = list(input_values)
return [param.from_decorator(p) for p in input_values]
@classmethod
def expand(cls, input, name_func=None, doc_func=None, skip_on_empty=False,
**legacy):
""" A "brute force" method of parameterizing test cases. Creates new
test cases and injects them into the namespace that the wrapped
function is being defined in. Useful for parameterizing tests in
subclasses of 'UnitTest', where Nose test generators don't work.
>>> @parameterized.expand([("foo", 1, 2)])
... def test_add1(name, input, expected):
... actual = add1(input)
... assert_equal(actual, expected)
...
>>> locals()
... 'test_add1_foo_0': <function ...> ...
>>>
"""
if "testcase_func_name" in legacy:
warnings.warn("testcase_func_name= is deprecated; use name_func=",
DeprecationWarning, stacklevel=2)
if not name_func:
name_func = legacy["testcase_func_name"]
if "testcase_func_doc" in legacy:
warnings.warn("testcase_func_doc= is deprecated; use doc_func=",
DeprecationWarning, stacklevel=2)
if not doc_func:
doc_func = legacy["testcase_func_doc"]
doc_func = doc_func or default_doc_func
name_func = name_func or default_name_func
def parameterized_expand_wrapper(f, instance=None):
stack = inspect.stack()
frame = stack[1]
frame_locals = frame[0].f_locals
parameters = cls.input_as_callable(input)()
if not parameters:
if not skip_on_empty:
raise ValueError(
"Parameters iterable is empty (hint: use "
"`parameterized.expand([], skip_on_empty=True)` to skip "
"this test when the input is empty)"
)
return wraps(f)(lambda: skip_on_empty_helper())
digits = len(str(len(parameters) - 1))
for num, p in enumerate(parameters):
name = name_func(f, "{num:0>{digits}}".format(digits=digits, num=num), p)
# If the original function has patches applied by 'mock.patch',
# re-construct all patches on the just former decoration layer
# of param_as_standalone_func so as not to share
# patch objects between new functions
nf = reapply_patches_if_need(f)
frame_locals[name] = cls.param_as_standalone_func(p, nf, name)
frame_locals[name].__doc__ = doc_func(f, num, p)
# Delete original patches to prevent new function from evaluating
# original patching object as well as re-constructed patches.
delete_patches_if_need(f)
f.__test__ = False
return parameterized_expand_wrapper
@classmethod
def param_as_standalone_func(cls, p, func, name):
@wraps(func)
def standalone_func(*a):
return func(*(a + p.args), **p.kwargs)
standalone_func.__name__ = name
# place_as is used by py.test to determine what source file should be
# used for this test.
standalone_func.place_as = func
# Remove __wrapped__ because py.test will try to look at __wrapped__
# to determine which parameters should be used with this test case,
# and obviously we don't need it to do any parameterization.
try:
del standalone_func.__wrapped__
except AttributeError:
pass
return standalone_func
@classmethod
def to_safe_name(cls, s):
return str(re.sub("[^a-zA-Z0-9_]+", "_", s))
def parameterized_class(attrs, input_values=None, class_name_func=None, classname_func=None):
""" Parameterizes a test class by setting attributes on the class.
Can be used in two ways:
1) With a list of dictionaries containing attributes to override::
@parameterized_class([
{ "username": "foo" },
{ "username": "bar", "access_level": 2 },
])
class TestUserAccessLevel(TestCase):
...
2) With a tuple of attributes, then a list of tuples of values:
@parameterized_class(("username", "access_level"), [
("foo", 1),
("bar", 2)
])
class TestUserAccessLevel(TestCase):
...
"""
if isinstance(attrs, string_types):
attrs = [attrs]
input_dicts = (
attrs if input_values is None else
[dict(zip(attrs, vals)) for vals in input_values]
)
class_name_func = class_name_func or default_class_name_func
if classname_func:
warnings.warn(
"classname_func= is deprecated; use class_name_func= instead. "
"See: https://github.com/wolever/parameterized/pull/74#issuecomment-613577057",
DeprecationWarning,
stacklevel=2,
)
class_name_func = lambda cls, idx, input: classname_func(cls, idx, input_dicts)
def decorator(base_class):
test_class_module = sys.modules[base_class.__module__].__dict__
for idx, input_dict in enumerate(input_dicts):
test_class_dict = dict(base_class.__dict__)
test_class_dict.update(input_dict)
name = class_name_func(base_class, idx, input_dict)
test_class_module[name] = type(name, (base_class,), test_class_dict)
# We need to leave the base class in place (see issue #73), but if we
# leave the test_ methods in place, the test runner will try to pick
# them up and run them... which doesn't make sense, since no parameters
# will have been applied.
# Address this by iterating over the base class and remove all test
# methods.
for method_name in list(base_class.__dict__):
if method_name.startswith("test_"):
delattr(base_class, method_name)
return base_class
return decorator
def get_class_name_suffix(params_dict):
if "name" in params_dict:
return parameterized.to_safe_name(params_dict["name"])
params_vals = (
params_dict.values() if PY3 else
(v for (_, v) in sorted(params_dict.items()))
)
return parameterized.to_safe_name(next((
v for v in params_vals
if isinstance(v, string_types)
), ""))
def default_class_name_func(cls, num, params_dict):
suffix = get_class_name_suffix(params_dict)
return "%s_%s%s" % (
cls.__name__,
num,
suffix and "_" + suffix,
)

View File

@ -0,0 +1,20 @@
import base64
import hashlib
import hmac
import smtplib
import time
import urllib.parse
from email.header import Header
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
import requests
from config import Config
#
#
# if __name__ == '__main__':
# # r'D:\app\apitest\cases\cases\test_cases.xlsx',r"D:\app\apitest\OutPut\reports\report.html"
# sm = SendEmail()
# sm.send_mail("111", [])

View File

@ -0,0 +1,188 @@
import re
import sys
import time
import traceback
import unittest
from io import StringIO
origin_stdout = sys.stdout
def output2console(s):
"""Output stdout content to console"""
tmp_stdout = sys.stdout
sys.stdout = origin_stdout
print(s, end='')
sys.stdout = tmp_stdout
class OutputRedirector(object):
""" Wrapper to redirect stdout or stderr """
def __init__(self, fp):
self.fp = fp
def write(self, s):
self.fp.write(s)
origin_stdout.write(str(s))
def writelines(self, lines):
self.fp.writelines(lines)
def flush(self):
self.fp.flush()
stdout_redirector = OutputRedirector(sys.stdout)
stderr_redirector = OutputRedirector(sys.stderr)
class TestResult(unittest.TestResult):
def __init__(self):
super().__init__()
self.fields = {
"success": 0,
"all": 0,
"fail": 0,
"skip": 0,
"error": 0,
"begin_time": "",
"results": [],
"testClass": set()
}
self.sys_stdout = None
self.sys_stderr = None
self.outputBuffer = None
def startTest(self, test):
super().startTest(test)
self.start_time = time.time()
self.outputBuffer = StringIO()
stdout_redirector.fp = self.outputBuffer
stderr_redirector.fp = self.outputBuffer
self.sys_stdout = sys.stdout
self.sys_stderr = sys.stderr
sys.stdout = stdout_redirector
sys.stderr = stderr_redirector
def complete_output(self):
if self.sys_stdout:
sys.stdout = self.sys_stdout
sys.stderr = self.sys_stderr
self.sys_stdout = None
self.sys_stderr = None
return self.outputBuffer.getvalue()
def stopTest(self, test):
test.run_time = '{:.3}s'.format((time.time() - self.start_time))
test.class_name = test.__class__.__qualname__
test.method_name = test.__dict__['_testMethodName']
test.method_doc = test.shortDescription()
self.fields['results'].append(test)
self.fields["testClass"].add(test.class_name)
self.complete_output()
def stopTestRun(self, title=None):
self.fields['fail'] = len(self.failures)
self.fields['error'] = len(self.errors)
self.fields['skip'] = len(self.skipped)
self.fields['all'] = sum(
[self.fields['fail'], self.fields['error'], self.fields['skip'], self.fields['success']])
self.fields['testClass'] = list(self.fields['testClass'])
def addSuccess(self, test):
self.fields["success"] += 1
test.state = '成功'
sys.stdout.write("{}执行——>【通过】\n".format(test))
logs = []
output = self.complete_output()
logs.append(output)
test.run_info = logs
def addFailure(self, test, err):
super().addFailure(test, err)
logs = []
test.state = '失败'
sys.stderr.write("{}执行——>【失败】\n".format(test))
output = self.complete_output()
logs.append(output)
logs.extend(traceback.format_exception(*err))
test.run_info = logs
def addSkip(self, test, reason):
super().addSkip(test, reason)
test.state = '跳过'
sys.stdout.write("{}执行--【跳过Skip】\n".format(test))
logs = [reason]
test.run_info = logs
def addError(self, test, err):
super().addError(test, err)
test.state = '错误'
sys.stderr.write("{}执行——>【错误Error】\n".format(test))
logs = []
logs.extend(traceback.format_exception(*err))
test.run_info = logs
if test.__class__.__qualname__ == '_ErrorHolder':
test.run_time = 0
res = re.search(r'(.*)\(.*\.(.*)\)', test.description)
test.class_name = res.group(2)
test.method_name = res.group(1)
test.method_doc = test.shortDescription()
self.fields['results'].append(test)
self.fields["testClass"].add(test.class_name)
else:
output = self.complete_output()
logs.append(output)
class ReRunResult(TestResult):
def __init__(self, count, interval):
super().__init__()
self.count = count
self.interval = interval
self.run_cases = []
def startTest(self, test):
if not hasattr(test, "count"):
super().startTest(test)
def stopTest(self, test):
if test not in self.run_cases:
self.run_cases.append(test)
super().stopTest(test)
def addFailure(self, test, err):
if not hasattr(test, 'count'):
test.count = 0
if test.count < self.count:
test.count += 1
sys.stderr.write("{}执行——>【失败Failure】\n".format(test))
for string in traceback.format_exception(*err):
sys.stderr.write(string)
sys.stderr.write("================{}重运行第{}次================\n".format(test, test.count))
time.sleep(self.interval)
test.run(self)
else:
super().addFailure(test, err)
if test.count != 0:
sys.stderr.write("================重运行{}次完毕================\n".format(test.count))
def addError(self, test, err):
if not hasattr(test, 'count'):
test.count = 0
if test.count < self.count:
test.count += 1
sys.stderr.write("{}执行——>【错误Error】\n".format(test))
for string in traceback.format_exception(*err):
sys.stderr.write(string)
sys.stderr.write("================{}重运行第{}次================\n".format(test, test.count))
time.sleep(self.interval)
test.run(self)
else:
super().addError(test, err)
if test.count != 0:
sys.stderr.write("================重运行{}次完毕================\n".format(test.count))

View File

@ -0,0 +1,247 @@
import copy
import json
import os
import time
import unittest
from concurrent.futures.thread import ThreadPoolExecutor
from json import JSONDecodeError
from jinja2 import Environment, FileSystemLoader
from common.notice.dingding import DingTalk
from common.notice.email_client import SendEmail
from common.notice.weChat import WeChat
from ..core.testResult import ReRunResult
Load = unittest.defaultTestLoader
class TestRunner:
def __init__(self, suite: unittest.TestSuite,
filename="report.html",
report_dir="./reports",
title='接口测试报告',
tester='测试人员',
desc="接口自动化测试报告",
templates=2
):
"""
测试运行器用于执行测试套件并生成测试报告
:param suite: 测试套件
:param filename: 报告文件名
:param report_dir: 报告保存路径
:param title: 报告标题
:param tester: 测试人员
:param desc: 报告描述
:param templates: 报告模板选择
"""
# super().__init__()
if not isinstance(suite, unittest.TestSuite):
raise TypeError("suite 参数不是一个测试套件")
if not isinstance(filename, str):
raise TypeError("filename 不是字符串类型")
if not filename.endswith(".html"):
filename = filename + ".html"
self.suite = suite
self.filename = filename
self.title = title
self.tester = tester
self.desc = desc
self.templates = templates
self.report_dir = report_dir
self.result = []
self.starttime = time.time()
self.res = None
self.env = None
self.template_path = os.path.join(os.path.dirname(__file__), '../../templates')
self.env = Environment(loader=FileSystemLoader(self.template_path))
def __classification_suite(self):
"""将测试套件按类别划分"""
suites_list = []
def find_test_cases(suite):
for item in suite:
if isinstance(item, unittest.TestCase):
suites_list.append(suite)
break
else:
find_test_cases(item)
find_test_cases(copy.deepcopy(self.suite))
return suites_list
def __get_reports(self):
"""生成测试报告"""
print("所有用例执行完毕,正在生成测试报告中......")
test_result = self.__calculate_test_result()
test_result['runtime'] = '{:.2f} S'.format(time.time() - self.starttime)
test_result["begin_time"] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(self.starttime))
test_result["title"] = self.title
test_result["tester"] = self.tester
test_result['desc'] = self.desc
test_result['pass_rate'] = int(test_result['success'] / test_result['all'] * 100) if test_result['all'] else 0
# 判断是否要生产测试报告
os.makedirs(self.report_dir, exist_ok=True)
# 获取历史执行数据
test_result['history'] = self.__handle_history_data(test_result)
self.template_file = self.__get_template_file()
report_content = self.__generating_templates_and_report_content(self.template_file, test_result)
# 写入报告文件
self.file_path = os.path.join(self.report_dir, self.filename)
with open(self.file_path, 'wb') as f:
f.write(report_content.encode('utf8'))
self.test_result = test_result
print(f"测试报告已经生成,报告路径为:{self.file_path}")
return test_result
def __calculate_test_result(self):
"""计算结果"""
test_result = {"success": 0, "all": 0, "fail": 0, "skip": 0, "error": 0, "results": [], "testClass": []}
for report_content in self.result:
for item in test_result:
test_result[item] += report_content.fields[item]
return test_result
def __get_template_file(self):
"""获取模板文件"""
template_file_mapping = {
2: "templates2.html",
3: "templates3.html"
}
return template_file_mapping.get(self.templates, "templates.html")
def __generating_templates_and_report_content(self, template_file, test_result):
"""渲染模板并生成报告内容"""
return self.env.get_template(template_file).render(test_result)
def __handle_history_data(self, test_result):
"""
处理历史数据
:return:
"""
try:
with open(os.path.join(self.report_dir, 'history.json'), 'r', encoding='utf-8') as f:
history = json.load(f)
except FileNotFoundError as e:
history = []
except JSONDecodeError as e:
history = []
history.append({'success': test_result['success'],
'all': test_result['all'],
'fail': test_result['fail'],
'skip': test_result['skip'],
'error': test_result['error'],
'runtime': test_result['runtime'],
'begin_time': test_result['begin_time'],
'pass_rate': test_result['pass_rate'],
})
with open(os.path.join(self.report_dir, 'history.json'), 'w', encoding='utf-8') as f:
json.dump(history, f, ensure_ascii=True)
return history
def run(self, thread_count=1, count=0, interval=2):
"""
The entrance to running tests
Note: if multiple test classes share a global variable, errors may occur due to resource competition
:param thread_count:Number of threads. default 1
:param count: Rerun times, default 0
:param interval: Rerun interval, default 2
:return: Test run results
"""
suites = self.__classification_suite()
if thread_count > 1:
with ThreadPoolExecutor(max_workers=thread_count) as ts:
for i in suites:
self.res = ReRunResult(count=count, interval=interval)
self.result.append(self.res)
ts.submit(i.run, result=self.res).add_done_callback(self.res.stopTestRun)
else:
self.res = ReRunResult(count=count, interval=interval)
self.result.append(self.res)
self.suite.run(self.res)
self.res.stopTestRun()
result = self.__get_reports()
return result
def rerun_run(self, count=0, interval=2):
"""
重新运行测试用例包括失败和错误的用例
:param count: 重新运行次数默认为0
:param interval: 重新运行间隔默认为2秒
:return: 重新运行间隔默认为2秒
"""
self.res = ReRunResult(count=count, interval=interval)
self.result.append(self.res)
test_case_suites = self.__classification_suite()
for test_case_suite in test_case_suites:
test_case_suite.run(self.res)
self.res.stopTestRun()
result = self.__get_reports()
return result
def get_except_info(self):
"""Get error reporting information for error cases and failure cases"""
except_info = []
num = 0
for i in self.result:
for texts in i.failures:
t, content = texts
num += 1
except_info.append("*{}、用例【{}】执行失败*\n失败信息如下:".format(num, t._testMethodDoc))
except_info.append(content)
for texts in i.errors:
num += 1
t, content = texts
except_info.append("*{}、用例【{}】执行错误*\n错误信息如下:".format(num, t._testMethodDoc))
except_info.append(content)
except_str = "\n".join(except_info)
return except_str
def get_failed_test_cases(self):
"""get error or failed testcase info"""
failed_test_cases = []
for res in self.result:
for failure in res.failures:
failed_test_cases.append(failure[0])
for error in res.errors:
failed_test_cases.append(error[0])
return failed_test_cases
def email_notice(self):
"""
发送邮件通知
"""
email_content = {"file": [os.path.abspath(self.file_path)],
"content": self.__generating_templates_and_report_content("templates03.html", self.test_result)
}
sm = SendEmail()
file_path = email_content["file"]
content = email_content["content"]
sm.send_mail(content=content, file_path=file_path)
def dingtalk_notice(self):
"""
发送钉钉机器人通知
"""
notice_content = self.__generating_templates_and_report_content('dingtalk.md', self.test_result)
except_info = self.get_except_info()
ding = DingTalk(self.title, notice_content, except_info)
ding.send_info()
def weixin_notice(self):
"""
推送企业微信机器人
"""
notice_content = self.__generating_templates_and_report_content('weChat.md', self.test_result)
wx = WeChat(notice_content)
wx.send_main()