删除敏感信息

This commit is contained in:
chenyongzhiaaron 2023-07-04 18:05:35 +08:00
parent ae6cf6eb48
commit 979f2ffdaf
90 changed files with 1569 additions and 7260 deletions

View File

@ -1,3 +0,0 @@
2023-06-30 09:01:03 | ERROR | 发送请求失败: Value for header {BSP_USER_ID: 216684145642752200} must be of type str or bytes, not <class 'int'>
2023-06-30 09:01:25 | ERROR | 发送请求失败: Value for header {projectId: 99000022} must be of type str or bytes, not <class 'int'>
2023-06-30 09:01:34 | ERROR | 发送请求失败: Method cannot contain non-token characters '/IBS/API/IBS-LMS-BASE/WORK-KIND/PAGE/LIST?T=1636702597000&CURRENT=1&SIZE=1000&PROJECTID=99000022&KEY=&SUPPLYID=&STATUS=0&ENABLED=1' (found at least '/')

View File

@ -1,188 +0,0 @@
{
"projectId": "99000022",
"suitability": "",
"avatar": "https://bimuse.bzlrobot.com/bsp/test/opi/document/v1/document/fileStore/viewPicture/6f2a66c24aa8446b8e5b2e828ca17909",
"isTeamLeader": 0,
"name": "张妹春",
"phone": "18188888888",
"workKindCode": "GZ_CGY",
"workKindName": "仓管员",
"isSpec": 0,
"idCard": {
"memberId": "1674348283581112321",
"name": "张妹春",
"cardNo": "230281196910179732",
"sex": "男",
"clan": "土族",
"birthday": "1969-10-17",
"age": 53,
"issuingAuthority": "",
"validityStart": "",
"validityEnd": "",
"address": "香港特别行政区北镇县秀英长春路y座",
"cardFront": "",
"cardEnd": "",
"avatar": ""
},
"supplier": {
"id": "1674345370494054401",
"createTime": "2023-06-29 17:14:14",
"updateTime": "2023-06-29 17:14:14",
"companyName": "测试公司",
"shortName": "",
"companyType": "CONSTRUCTION_UNIT",
"businessLicense": "993101016769636918",
"legalPerson": "张三",
"legalIdcard": "",
"bank": "",
"bankCardNumber": "",
"bankNumber": "",
"projectId": "99000022",
"projectName": "佛山陈村旧改项目",
"companyId": "1674345370041069569",
"entryStatus": 1,
"contractName": "",
"contractUrl": "",
"supplierType": "",
"outsourcerId": "",
"outsourcerName": "",
"inGroup": "0",
"tenantId": "",
"inTime": "2023-06-29 17:13:39",
"outTime": ""
},
"team": {
"id": "1674347643433529346",
"createTime": "2023-06-29 17:23:15",
"updateTime": "2023-06-29 17:23:15",
"name": "铝模",
"teamLibraryId": "1674347643160899585",
"projectId": "99000022",
"supplyId": "1674345370494054401",
"memberId": "",
"memberName": "",
"memberIdcard": "",
"memberPhone": "",
"principal": "",
"principalIdcard": "",
"principalPhone": "",
"status": "0",
"member": null,
"contractName": "",
"contractUrl": "",
"supplier": null
},
"memberInfo": {
"memberId": "1674348283581112321",
"education": "其他",
"political": "其他",
"maritalStatus": "UM",
"emergencyContact1": "KIRA",
"emergencyPhone1": "18222222222",
"emergencyContact2": "",
"emergencyPhone2": ""
},
"laborContractInfo": {
"memberId": "1674348283581112321",
"contractType": "劳动合同",
"singingDate": "",
"validityStart": "",
"validityEnd": "",
"contractAttachment": "",
"contractAttachmentName": "",
"attachments": []
},
"certInfo": {
"createTime": "2023-06-29 17:25:13",
"updateTime": "2023-06-29 17:43:13",
"memberId": "1674348283581112321",
"certName": "注册会计师",
"certNameCode": "",
"validityStart": "2023-06-30",
"validityEnd": "2023-07-01",
"certTypeName": "",
"certTypeCode": "",
"certCategory": "1",
"certNo": "900000000",
"certPhoto": "https://bimuse.bzlrobot.com/bsp/test/opi/document/v1/document/fileStore/viewPicture/4cc4ac79ddda4b7ebdc9ca1d8a1ee38b",
"certPhotoFileName": "一个TCP 连接可以发送多少个HTTP请求.png",
"issuingAuthority": "中国政府"
},
"specialCertInfo": {
"createTime": "2023-06-29 17:25:13",
"updateTime": "2023-06-29 17:43:13",
"memberId": "1674348283581112321",
"certName": "",
"certNameCode": "",
"validityStart": "2023-06-29",
"validityEnd": "",
"certTypeName": "",
"certTypeCode": "",
"certCategory": "2",
"certNo": "",
"certPhoto": "",
"certPhotoFileName": "",
"issuingAuthority": ""
},
"safeTrainingInfo": {
"createTime": "2023-06-29 17:25:13",
"updateTime": "2023-06-29 17:43:13",
"memberId": "1674348283581112321",
"trainingType": "SAFE",
"trainingDate": "2023-06-29",
"trainingPhoto": "https://bimuse.bzlrobot.com/bsp/test/opi/document/v1/document/fileStore/viewPicture/2ef488193b9d4a38bca6aca918ae17ce",
"trainingPhotoFileName": "一等奖-幼儿园公开课-小白鱼过生日.png"
},
"bankCardInfo": {
"memberId": "1674348283581112321",
"bankName": "中国农业银行",
"accountBank": "银行沪杭",
"cardNo": "9092324234",
"cardPhoto": "https://bimuse.bzlrobot.com/bsp/test/opi/document/v1/document/fileStore/viewPicture/7934b54c857a43098e9590a51d5c2a2b",
"cardPhotoFileName": "一个TCP 连接可以发送多少个HTTP请求.png",
"status": 0
},
"healthInfo": {
"createTime": "2023-06-29 17:25:13",
"updateTime": "2023-06-29 17:43:13",
"memberId": "1674348283581112321",
"laborId": "230281196910179732",
"laborName": "张妹春",
"deviceId": "",
"projectId": "99000022",
"healthStatus": "其他",
"height": "180",
"weight": "60",
"bmi": "500",
"systolicPressure": "39",
"diastolicPressure": "500",
"pulse": "187",
"temperature": "40",
"tempvType": "",
"tempvUnit": "",
"glu": "500",
"sao2": "98",
"fatMass": "",
"fatRate": "",
"fatFreeMass": "",
"moistureMass": "",
"moistureRate": "",
"skeletonMass": "",
"muscleMass": "",
"extracellularFluid": "",
"intracellularFluid": "",
"proteinMass": "",
"mineralsMass": "",
"other": "",
"basalMetabolism": "",
"fatAdjust": "",
"muscleAdjust": "",
"isHistory": 0,
"isMajorDiseases": "1",
"isPurchaseInsurance": "0",
"inspectTime": "2023-06-21",
"actionType": "",
"fileName": "一等奖-幼儿园公开课-小白鱼过生日.png",
"filePath": "https://bimuse.bzlrobot.com/bsp/test/opi/document/v1/document/fileStore/viewPicture/b3884269188641ab97c4b80b51e3667a"
}
}

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

File diff suppressed because one or more lines are too long

File diff suppressed because it is too large Load Diff

View File

@ -29,7 +29,7 @@
"header": [], "header": [],
"body": { "body": {
"mode": "raw", "mode": "raw",
"raw": "{\r\n \"account\": \"18127813600\",\r\n \"password\": \"WD6Y0+LJLHXuFaplzUtSCnwktA7KgXCpjCS+OVvIFGTEoz2gbqK2oOOuJUf7ao0m2YYGiGi1pQTMBnkrxIY1cztGYbVp97kvIQwZLN4UhrOAe3h1asY/NLnDwB/byl7agcGv9WI4oy6B1Z93HVHmQiAKn7QqnDgPVITu4jthNc8=\"\r\n}", "raw": "{\r\n \"account\": \"181200\",\r\n \"password\": \"WD6YB1ZhNc8=\"\r\n}",
"options": { "options": {
"raw": { "raw": {
"language": "json" "language": "json"
@ -42,498 +42,11 @@
"{{url}}" "{{url}}"
], ],
"path": [ "path": [
"bsp",
"test",
"user",
"ugs",
"auth",
"loginByNotBip" "loginByNotBip"
] ]
} }
}, },
"response": [] "response": []
},
{
"name": "获取需求采购列表(默认条件查询-成功)",
"event": [
{
"listen": "prerequest",
"script": {
"exec": [
""
],
"type": "text/javascript"
}
},
{
"listen": "test",
"script": {
"exec": [
"pm.test(\"assert_code\", function () {\r",
" let body = JSON.parse(pm.request.body.raw)\r",
" body_code = body.code\r",
" let res = pm.response.json()\r",
" for (let i of res.data.records) {\r",
" pm.expect(i.orderCode, \"不包含0003\").to.include(body_code)\r",
" }\r",
" let requireId = res.data.records[0].id\r",
" pm.environment.set(\"requireId\", requireId)\r",
"})"
],
"type": "text/javascript"
}
}
],
"request": {
"method": "POST",
"header": [
{
"key": "BSP_TOKEN",
"value": "{{bsToken}}",
"type": "text"
}
],
"body": {
"mode": "raw",
"raw": "{\r\n \"ncCode\": \"\",\r\n \"applyTimeBegin\": \"\",\r\n \"applyTimeEnd\": \"\",\r\n \"applyUserName\": \"\",\r\n \"auditStatus\": \"\",\r\n \"buildingCode\": \"\",\r\n \"code\": \"RE202210270003\",\r\n \"name\": \"\",\r\n \"purchaseType\": \"\",\r\n \"size\": 10,\r\n \"current\": 1,\r\n \"projectId\": \"104966\"\r\n}",
"options": {
"raw": {
"language": "json"
}
}
},
"url": {
"raw": "{{url}}/bsp/test/user/ugs/ibs/api/ibs-material/material/jobRequire/pages?t=1681310676000",
"host": [
"{{url}}"
],
"path": [
"bsp",
"test",
"user",
"ugs",
"ibs",
"api",
"ibs-material",
"material",
"jobRequire",
"pages"
],
"query": [
{
"key": "t",
"value": "1681310676000"
}
]
}
},
"response": []
},
{
"name": "获取请购详情",
"request": {
"method": "GET",
"header": [
{
"key": "BSP_TOKEN",
"value": "aba920e63584c3b1fef3cb4c498bca44",
"type": "text"
}
],
"url": {
"raw": "https://bimdc.bzlrobot.com/bsp/test/user/ugs/ibs/api/ibs-material/material/jobRequire/detail?t=&isEdit=&requireId=5804&projectId=",
"protocol": "https",
"host": [
"bimdc",
"bzlrobot",
"com"
],
"path": [
"bsp",
"test",
"user",
"ugs",
"ibs",
"api",
"ibs-material",
"material",
"jobRequire",
"detail"
],
"query": [
{
"key": "t",
"value": ""
},
{
"key": "isEdit",
"value": ""
},
{
"key": "requireId",
"value": "5804"
},
{
"key": "projectId",
"value": ""
}
]
}
},
"response": []
},
{
"name": "文件上传",
"request": {
"method": "POST",
"header": [
{
"key": "Accept",
"value": "*/*"
},
{
"key": "Accept-Language",
"value": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6"
},
{
"key": "Connection",
"value": "keep-alive"
},
{
"key": "Cookie",
"value": "sensorsdata2015jssdkcross=%7B%22distinct_id%22%3A%221876a7f1e0f1368-09fbff283e5df9-4c657b58-2073600-1876a7f1e10e86%22%2C%22first_id%22%3A%22%22%2C%22props%22%3A%7B%22%24latest_traffic_source_type%22%3A%22%E5%BC%95%E8%8D%90%E6%B5%81%E9%87%8F%22%2C%22%24latest_search_keyword%22%3A%22%E6%9C%AA%E5%8F%96%E5%88%B0%E5%80%BC%22%2C%22%24latest_referrer%22%3A%22https%3A%2F%2Flogin.countrygarden.com.cn%2F%22%7D%2C%22identities%22%3A%22eyIkaWRlbnRpdHlfY29va2llX2lkIjoiMTg3NmE3ZjFlMGYxMzY4LTA5ZmJmZjI4M2U1ZGY5LTRjNjU3YjU4LTIwNzM2MDAtMTg3NmE3ZjFlMTBlODYifQ%3D%3D%22%2C%22history_login_id%22%3A%7B%22name%22%3A%22%22%2C%22value%22%3A%22%22%7D%2C%22%24device_id%22%3A%221876a7f1e0f1368-09fbff283e5df9-4c657b58-2073600-1876a7f1e10e86%22%7D; x-access-token=9f576f5f914c6e29584f82e5c5fe9c92"
},
{
"key": "Origin",
"value": "https://bimdc.bzlrobot.com"
},
{
"key": "Sec-Fetch-Dest",
"value": "empty"
},
{
"key": "Sec-Fetch-Mode",
"value": "cors"
},
{
"key": "Sec-Fetch-Site",
"value": "same-origin"
},
{
"key": "User-Agent",
"value": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36 Edg/115.0.0.0"
},
{
"key": "app_token",
"value": "A4C419A561A3404A86A33628A3F681C3"
},
{
"key": "sec-ch-ua",
"value": "\"Not/A)Brand\";v=\"99\", \"Microsoft Edge\";v=\"115\", \"Chromium\";v=\"115\""
},
{
"key": "sec-ch-ua-mobile",
"value": "?0"
},
{
"key": "sec-ch-ua-platform",
"value": "\"Windows\""
}
],
"body": {
"mode": "formdata",
"formdata": [
{
"key": "file",
"type": "file",
"src": "test.jpeg"
}
]
},
"url": {
"raw": "https://bimdc.bzlrobot.com/bsp/opi/document/v1/document/fileStore/uploadFile",
"protocol": "https",
"host": [
"bimdc",
"bzlrobot",
"com"
],
"path": [
"bsp",
"opi",
"document",
"v1",
"document",
"fileStore",
"uploadFile"
]
}
},
"response": []
},
{
"name": "https://bimdc.bzlrobot.com/bsp/opi/document/v1/document/fileStore/uploadFile",
"request": {
"method": "POST",
"header": [
{
"key": "Accept",
"value": "*/*"
},
{
"key": "Accept-Language",
"value": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6"
},
{
"key": "Connection",
"value": "keep-alive"
},
{
"key": "Cookie",
"value": "sensorsdata2015jssdkcross=%7B%22distinct_id%22%3A%221876a7f1e0f1368-09fbff283e5df9-4c657b58-2073600-1876a7f1e10e86%22%2C%22first_id%22%3A%22%22%2C%22props%22%3A%7B%22%24latest_traffic_source_type%22%3A%22%E5%BC%95%E8%8D%90%E6%B5%81%E9%87%8F%22%2C%22%24latest_search_keyword%22%3A%22%E6%9C%AA%E5%8F%96%E5%88%B0%E5%80%BC%22%2C%22%24latest_referrer%22%3A%22https%3A%2F%2Flogin.countrygarden.com.cn%2F%22%7D%2C%22identities%22%3A%22eyIkaWRlbnRpdHlfY29va2llX2lkIjoiMTg3NmE3ZjFlMGYxMzY4LTA5ZmJmZjI4M2U1ZGY5LTRjNjU3YjU4LTIwNzM2MDAtMTg3NmE3ZjFlMTBlODYifQ%3D%3D%22%2C%22history_login_id%22%3A%7B%22name%22%3A%22%22%2C%22value%22%3A%22%22%7D%2C%22%24device_id%22%3A%221876a7f1e0f1368-09fbff283e5df9-4c657b58-2073600-1876a7f1e10e86%22%7D; x-access-token=9f576f5f914c6e29584f82e5c5fe9c92"
},
{
"key": "Origin",
"value": "https://bimdc.bzlrobot.com"
},
{
"key": "Sec-Fetch-Dest",
"value": "empty"
},
{
"key": "Sec-Fetch-Mode",
"value": "cors"
},
{
"key": "Sec-Fetch-Site",
"value": "same-origin"
},
{
"key": "User-Agent",
"value": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36 Edg/115.0.0.0"
},
{
"key": "app_token",
"value": "A4C419A561A3404A86A33628A3F681C3"
},
{
"key": "sec-ch-ua",
"value": "\"Not/A)Brand\";v=\"99\", \"Microsoft Edge\";v=\"115\", \"Chromium\";v=\"115\""
},
{
"key": "sec-ch-ua-mobile",
"value": "?0"
},
{
"key": "sec-ch-ua-platform",
"value": "\"Windows\""
}
],
"body": {
"mode": "formdata",
"formdata": [
{
"key": "file",
"type": "file",
"src": "/C:/Users/chenyongzhi11/Desktop/微信图片_20221025174254.jpg"
}
]
},
"url": {
"raw": "https://bimdc.bzlrobot.com/bsp/opi/document/v1/document/fileStore/uploadFile",
"protocol": "https",
"host": [
"bimdc",
"bzlrobot",
"com"
],
"path": [
"bsp",
"opi",
"document",
"v1",
"document",
"fileStore",
"uploadFile"
]
}
},
"response": []
},
{
"name": "平台登录",
"event": [
{
"listen": "test",
"script": {
"exec": [
""
],
"type": "text/javascript"
}
}
],
"request": {
"method": "POST",
"header": [],
"body": {
"mode": "formdata",
"formdata": [
{
"key": "username",
"value": "admin",
"type": "text"
},
{
"key": "password",
"value": "3306",
"type": "text"
},
{
"key": "csrfmiddlewaretoken",
"value": "av1VLobtvmrqvvoDRygqIqBr8QOa2ExxzcB1zVpHRKXXvOtoDzhmuLapxdP8uJj7",
"type": "text"
}
]
},
"url": {
"raw": "http://127.0.0.1:8000/index/",
"protocol": "http",
"host": [
"127",
"0",
"0",
"1"
],
"port": "8000",
"path": [
"index",
""
]
}
},
"response": []
},
{
"name": "新增项目",
"request": {
"method": "POST",
"header": [
{
"key": "Cookie",
"value": "sessionid=zjs3ytyud4hh952ongttt6vvx3fihlqv",
"type": "text"
}
],
"body": {
"mode": "urlencoded",
"urlencoded": []
},
"url": {
"raw": "http://127.0.0.1:8000/project/create_project/",
"protocol": "http",
"host": [
"127",
"0",
"0",
"1"
],
"port": "8000",
"path": [
"project",
"create_project",
""
]
}
},
"response": []
},
{
"name": "查询项目列表信息",
"request": {
"method": "GET",
"header": [],
"url": {
"raw": "http://127.0.0.1:8000/project/get_projects/",
"protocol": "http",
"host": [
"127",
"0",
"0",
"1"
],
"port": "8000",
"path": [
"project",
"get_projects",
""
]
}
},
"response": []
},
{
"name": "新增模块",
"request": {
"method": "POST",
"header": [
{
"key": "Cookie",
"value": "sessionid=zjs3ytyud4hh952ongttt6vvx3fihlqv",
"type": "text"
}
],
"body": {
"mode": "raw",
"raw": "",
"options": {
"raw": {
"language": "html"
}
}
},
"url": {
"raw": "http://127.0.0.1:8000/module/create_modul/",
"protocol": "http",
"host": [
"127",
"0",
"0",
"1"
],
"port": "8000",
"path": [
"module",
"create_modul",
""
]
}
},
"response": []
},
{
"name": "查询模块列表",
"request": {
"method": "GET",
"header": [],
"url": {
"raw": "http://127.0.0.1:8000/module/get_module/",
"protocol": "http",
"host": [
"127",
"0",
"0",
"1"
],
"port": "8000",
"path": [
"module",
"get_module",
""
]
}
},
"response": []
} }
] ]
} }

View File

@ -69,12 +69,12 @@ msg = {
{ {
"index": "13", "index": "13",
"type": "string", "type": "string",
"value": "http://192.1.14.19/document/fileStore/viewPicture/08188228a79b42ffa6b32a85f08b38ba" "value": "/08188228a72a85f08b38ba"
}, },
{ {
"index": "14", "index": "14",
"type": "string", "type": "string",
"value": "https://ibs-test.bzlrobot.com/dfs/get?fileName=In-rear-pic.jpg&group=group1&path=M00/0A/4F/CgjLoWDiwteEBoWJAAAAANRLO4A310.jpg" "value": "/CgjLoBoA310.jpg"
}, },
{ {
"index": "18", "index": "18",
@ -89,5 +89,5 @@ msg = {
} }
if __name__ == '__main__': if __name__ == '__main__':
p = sys.argv[0] p = sys.argv[0]
rab = MQTTClient(broker_address='192.2.3.59', topic='weigh/492875336/rtdata') rab = MQTTClient(broker_address='1.2.3.4', topic='9999999')
rab.send_message(msg) rab.send_message(msg)

View File

@ -116,7 +116,7 @@ if __name__ == '__main__':
"time": "2023-04-23T17:48:33.128Z", "time": "2023-04-23T17:48:33.128Z",
"type": "SYNC" "type": "SYNC"
} }
rab = RabbitMQClient(host='192.1.1.59:1883', exchange_name='/bridge/492/rtdata', rab = RabbitMQClient(host='1.1.1.9:18', exchange_name='/rtdata',
# exchange_type='topic', # exchange_type='topic',
# routing_key='connected' # routing_key='connected'
) )

View File

@ -3,11 +3,12 @@
# ------------------------------------------------------------------------------- # -------------------------------------------------------------------------------
# Name: bif_datetime.py # Name: bif_datetime.py
# Description: # Description:
# Author: chenyongzhi # Author: kira
# EMAIL: 262667641@qq.com # EMAIL: 262667641@qq.com
# Date: 2021/1/12 14:03 # Date: 2021/1/12 14:03
# ------------------------------------------------------------------------------- # -------------------------------------------------------------------------------
from datetime import datetime, timedelta from datetime import datetime, timedelta
from common.bif_functions import logger from common.bif_functions import logger
__all__ = ['get_current_date', 'get_current_time', 'get_delta_time'] __all__ = ['get_current_date', 'get_current_time', 'get_delta_time']

View File

@ -3,7 +3,7 @@
# ------------------------------------------------------------------------------- # -------------------------------------------------------------------------------
# Name: bif_json.py # Name: bif_json.py
# Description: # Description:
# Author: chenyongzhi # Author: kira
# EMAIL: 262667641@qq.com # EMAIL: 262667641@qq.com
# Date: 2021/1/12 14:02 # Date: 2021/1/12 14:02
# ------------------------------------------------------------------------------- # -------------------------------------------------------------------------------

View File

@ -3,7 +3,7 @@
# ------------------------------------------------------------------------------- # -------------------------------------------------------------------------------
# Name: bif_list.py # Name: bif_list.py
# Description: # Description:
# Author: chenyongzhi # Author: kira
# EMAIL: 262667641@qq.com # EMAIL: 262667641@qq.com
# Date: 2021/1/12 15:15 # Date: 2021/1/12 15:15
# ------------------------------------------------------------------------------- # -------------------------------------------------------------------------------

View File

@ -3,7 +3,7 @@
# ------------------------------------------------------------------------------- # -------------------------------------------------------------------------------
# Name: bif_random.py # Name: bif_random.py
# Description: # Description:
# Author: chenyongzhi # Author: kira
# EMAIL: 262667641@qq.com # EMAIL: 262667641@qq.com
# Date: 2021/1/12 14:02 # Date: 2021/1/12 14:02
# ------------------------------------------------------------------------------- # -------------------------------------------------------------------------------

View File

@ -18,6 +18,8 @@ __all__ = ['get_timestamp', 'ms_fmt_hms']
def get_timestamp(length=13): def get_timestamp(length=13):
""" """
获取时间戳字符串长度最多为16位默认13位 获取时间戳字符串长度最多为16位默认13位
Returns:
Args: Args:
length: 时间戳长度 length: 时间戳长度
@ -37,7 +39,7 @@ def ms_fmt_hms(ms):
""" """
将毫秒转换成 h:m:s.ms格式字符串 将毫秒转换成 h:m:s.ms格式字符串
Args: Args:
ms: ms: 毫秒
Returns: Returns:

View File

@ -1,4 +1,12 @@
# -*- coding: utf-8 -*- # coding: utf-8
# -------------------------------------------------------------------------------
# Name: bif_time.py
# Description:
# Author: kira
# EMAIL: 262667641@qq.com
# Date: 2021/1/12 14:03
# -------------------------------------------------------------------------------
import datetime import datetime
import math import math
import random import random

View File

@ -9,11 +9,16 @@
""" """
from common.crypto import logger from common.crypto import logger
from common.crypto.encryption_rsa import Rsa from common.crypto.encryption_rsa import Rsa
# from common.crypto.encryption_aes import DoAES
from extensions import sign from extensions import sign
@logger.log_decorator() @logger.log_decorator()
class EncryptData: class EncryptData:
"""
数据加密入口
"""
def encrypts(self, headers_crypto, headers, request_data_crypto, request_data): def encrypts(self, headers_crypto, headers, request_data_crypto, request_data):
encryption_methods = { encryption_methods = {
"MD5": sign.md5_sign, "MD5": sign.md5_sign,

View File

@ -41,7 +41,7 @@ class DoAES:
if __name__ == '__main__': if __name__ == '__main__':
# key = os.urandom(16) #随即产生n个字节的字符串可以作为随机加密key使用 # key = os.urandom(16) #随即产生n个字节的字符串可以作为随机加密key使用
key = '2l4LoWczlWxlMZJAAp5N0g6EygZZd9A6' # 随即产生n个字节的字符串可以作为随机加密key使用 key = '2l4LoWczlWxlMZJAAp5N0g6EygZZd9C6' # 随即产生n个字节的字符串可以作为随机加密key使用
text = '4534' # 需要加密的内容 text = '4534' # 需要加密的内容
aes_test = DoAES(key) aes_test = DoAES(key)
cipher_text = aes_test.encrypt(text) cipher_text = aes_test.encrypt(text)

View File

@ -8,10 +8,9 @@
@desc: @desc:
""" """
import hashlib
import base64 import base64
import binascii import binascii
import rsa import hashlib
from pyDes import des, CBC, PAD_PKCS5 from pyDes import des, CBC, PAD_PKCS5

View File

@ -3,7 +3,6 @@
# @Author : kira # @Author : kira
# @Email : 262667641@qq.com # @Email : 262667641@qq.com
# @File : analysis_json.py # @File : analysis_json.py
# @Project : risk_api_project
""" """
接口返回的数据是 列表字典 接口返回的数据是 列表字典
新建两个函数 A B函数 A 处理字典数据被调用后判断传循环嵌套 格式的通过一个 key 获取到被包裹了多层的目标数据 新建两个函数 A B函数 A 处理字典数据被调用后判断传循环嵌套 格式的通过一个 key 获取到被包裹了多层的目标数据

View File

@ -3,7 +3,6 @@
# @Author : kira # @Author : kira
# @Email : 262667641@qq.com # @Email : 262667641@qq.com
# @File : assert_dict.py # @File : assert_dict.py
# @Project : risk_api_project
import sys import sys
sys.path.append("../") sys.path.append("../")

View File

@ -1,7 +1,7 @@
# -*- coding:utf-8 -*- # -*- coding:utf-8 -*-
""" """
Time: 2020/6/1/001 15:29 Time: 2020/6/1/001 15:29
Author: 陈勇志 Author: kira
Email:262667641@qq.com Email:262667641@qq.com
Project:api_project Project:api_project
""" """
@ -15,8 +15,6 @@ from jsonpath_ng import parse
from common.variables import Variables from common.variables import Variables
from common.data_extraction import logger from common.data_extraction import logger
# from common.http_client.http_client import Pyt
REPLACE_DICT = { REPLACE_DICT = {
"null": None, "null": None,
"True": True, "True": True,
@ -24,16 +22,11 @@ REPLACE_DICT = {
} }
# d = Variables()
class DataExtractor(Variables): class DataExtractor(Variables):
def __init__(self): def __init__(self):
super().__init__() super().__init__()
# self.PATTERN = getattr(Variables, "PATTERN") # 预编译正则表达式
@logger.log_decorator("提取参数出现了意想不到的错误!!") @logger.log_decorator("提取参数出现了意想不到的错误!!")
def substitute_data(self, response, regex=None, keys=None, deps=None, jp_dict=None): def substitute_data(self, response, regex=None, keys=None, deps=None, jp_dict=None):
""" """
@ -83,6 +76,15 @@ class DataExtractor(Variables):
self.update_variable(key, None) self.update_variable(key, None)
def substitute_route(self, response, route_str): def substitute_route(self, response, route_str):
"""
想字典一样取值:name=data[0].name;ok=data[0].id;an=data[0].age
Args:
response:
route_str:
Returns:
"""
deps_list = re.sub(f"[\r\n]+", "", route_str).split(";") deps_list = re.sub(f"[\r\n]+", "", route_str).split(";")
for dep_item in deps_list: for dep_item in deps_list:
key, value_path = dep_item.split("=") key, value_path = dep_item.split("=")

View File

@ -3,7 +3,6 @@
# @Author : kira # @Author : kira
# @Email : 262667641@qq.com # @Email : 262667641@qq.com
# @File : dict_get.py # @File : dict_get.py
# @Project : api_project
def dict_get(dic, locators, default=None): def dict_get(dic, locators, default=None):

View File

@ -31,7 +31,6 @@ def execute_sql_files(sql_path, db_config):
for sql_statement in sql_statements: for sql_statement in sql_statements:
if sql_statement: if sql_statement:
cur.execute(sql_statement) cur.execute(sql_statement)
connection.commit() connection.commit()
finally: finally:

View File

@ -24,13 +24,6 @@ class MysqlClient:
初始化连接配置 初始化连接配置
Args: Args:
db_config: 数据库连接配置字典 db_config: 数据库连接配置字典
{
"host": "xxxx.xxx.xxx.xx",
"port": 3306,
"database": "db_name",
"user": "root",
"password": "xxxx"
}
""" """
if not db_config: if not db_config:
return return
@ -153,11 +146,11 @@ if __name__ == '__main__':
} }
database_2 = { database_2 = {
"host": "10.8.203.25", "host": "10.10.10.10",
"port": 3306, "port": 3306,
"database": "ibs_lms_base", "database": "base",
"user": "root", "user": "root",
"password": "gd1234" "password": "roottttttttttttttttttttest"
} }
res = MysqlClient(database_2).execute_sql(sql_2) res = MysqlClient(database_2).execute_sql(sql_2)
print("数据执行结果", res) print("数据执行结果", res)

View File

@ -8,8 +8,12 @@ import redis
class RedisClient: class RedisClient:
"""
redis数据库
"""
def __init__(self): def __init__(self):
self.redis = redis.Redis(host='10.8.203.25', port=6379, password='test2020') self.redis = redis.Redis(host='1.1.3.5', port=6379, password='test')
def set_data(self, key, value, expire_time=None): def set_data(self, key, value, expire_time=None):
self.redis.set(key, value) self.redis.set(key, value)

View File

@ -30,35 +30,9 @@ class DoExcel:
# self.wb.save(self.file_name) # self.wb.save(self.file_name)
# self.wb.close() # self.wb.close()
@logger.log_decorator()
def do_excel(self):
"""
通过 title 定位单元格获取所有测试数据
Returns: 读取每一条测试用用例分别保存到字典中然后再将所有用例保存到列表中[{用例1},{用例2},{用例3}]
[{"":""},{},{}]
"""
sheets = eval(self.get_excel_init().get("sheets"))
test_data = []
for sheet_name in sheets: # 遍历存在配置文件里面的字典sheet_name == 每一个 excel 中的 sheetName
sheet = self.wb[sheet_name] # 获取所有 sheet 句柄
max_row = sheet.max_row # 获取最大行
max_column = sheet.max_column # 获取最大列
fist_header = [] # 获取第一行标题所有值
for i in range(1, max_column + 1):
fist_header.append(sheet.cell(1, i).value)
# 定位单元格
for i in range(2, max_row + 1):
sub_data = {} # 列表内的字典(也就是测试数据)
for k in range(1, max_column + 1):
sub_data[fist_header[k - 1]] = sheet.cell(i, k).value
sub_data["sheet"] = sheet_name
test_data.append(sub_data) # 将所有单元格 title 对应的值组成字典添加到列表中。
return test_data
# @logger.log_decorator()
def do_excel_yield(self): def do_excel_yield(self):
""" """
读取excel数据的生成器 读取excel数据
Returns: Returns:
""" """

View File

@ -8,10 +8,8 @@ import json
import os import os
import sys import sys
# 把当前目录加入到系统环境变量中
sys.path.append(os.path.split(os.path.dirname(os.path.abspath(__file__)))[0]) sys.path.append(os.path.split(os.path.dirname(os.path.abspath(__file__)))[0])
sys.path.append("../..") sys.path.append("../..")
# sys.path.append('venv/Lib/site-packages')
from openpyxl import load_workbook, Workbook from openpyxl import load_workbook, Workbook
from common.file_handling import logger from common.file_handling import logger

View File

@ -1,37 +0,0 @@
#!/usr/bin/env python
# encoding: utf-8
"""
@author: kira
@contact: 262667641@qq.com
@file: get_init.py
@time: 2023/3/16 10:33
@desc:
"""
from common.file_handling.do_excel import DoExcel
from common.utils.logger import MyLog
@MyLog().decorator_log("读取excel中初始化数据异常")
def get_init(test_file):
"""
Returns:返回初始化数据及测试用例
"""
MyLog().my_log(f"读取测试用例excel文件{test_file}", "info")
excel_handle = DoExcel(test_file) # 实例化对象
try:
excel_handle.clear_date() # 清空 excel 中实际结果列的数据
# test_case = excel_handle.do_excel() # 获取 excel 中的测试用例
test_case = excel_handle.do_excel_yield() # 获取 excel 中的测试用例
init_data = excel_handle.get_excel_init() # 获取初始化基本数据
MyLog().my_log(f"如下是初始化得到得数据:{init_data}", "info")
except Exception as e:
raise e
return excel_handle, init_data, test_case
if __name__ == '__main__':
from common.config import Config
init = get_init(Config.test_case)
print(init)

View File

@ -10,12 +10,9 @@ sys.path.append("../")
sys.path.append("./common") sys.path.append("./common")
from common.http_client import logger from common.http_client import logger
# from common.data_extraction.data_extractor import DataExtractor
# from common.data_extraction.dependent_parameter import DependentParameter
from common.validation.load_modules_from_folder import LoadModulesFromFolder from common.validation.load_modules_from_folder import LoadModulesFromFolder
# @singleton
class Pyt(LoadModulesFromFolder): class Pyt(LoadModulesFromFolder):
# 类属性,保存一个会话对象,防止每次都创建一个新的会话,节省资源 # 类属性,保存一个会话对象,防止每次都创建一个新的会话,节省资源
session = requests.Session() session = requests.Session()
@ -52,10 +49,8 @@ class Pyt(LoadModulesFromFolder):
""" """
发送 http 请求 发送 http 请求
@param host: 域名 @param host: 域名
@param __url: 接口 __url @param url: 接口 __url
@param method: http 请求方法 @param method: http 请求方法
# @param request_data_type: 请求数据类型
# @param headers: 请求头部信息,默认为 None
@param kwargs: 接受 requests 原生的关键字参数 @param kwargs: 接受 requests 原生的关键字参数
@return: 响应对象 @return: 响应对象
""" """
@ -65,8 +60,6 @@ class Pyt(LoadModulesFromFolder):
if not url: if not url:
raise ValueError("URL cannot be None") raise ValueError("URL cannot be None")
__url = f'{host}{url}' if not re.match(r"https?", url) else url __url = f'{host}{url}' if not re.match(r"https?", url) else url
# 获取session 中的请求方法
# func = getattr(session, method.lower())
# 增加兼容 # 增加兼容
# 处理 headers 参数为字符串类型的情况 # 处理 headers 参数为字符串类型的情况
@ -78,7 +71,6 @@ class Pyt(LoadModulesFromFolder):
kwargs['json'] = json.loads(kwargs['json']) kwargs['json'] = json.loads(kwargs['json'])
# 发送请求 # 发送请求
# return func(__url, verify=True, timeout=30, **kwargs)
self.request = requests.Request(method, __url, **kwargs) self.request = requests.Request(method, __url, **kwargs)
self.response = self.session.send(self.request.prepare(), timeout=30, verify=True) self.response = self.session.send(self.request.prepare(), timeout=30, verify=True)
@ -86,14 +78,11 @@ class Pyt(LoadModulesFromFolder):
if __name__ == '__main__': if __name__ == '__main__':
hst = 'https://ibs-test.bzlrobot.com' hst = 'https://baidu.com'
url = '/api/ibs-auth/ibs_platform/login?t=168672334' url = '/login?t=168672334'
method = 'post' method = 'post'
kwargs = { kwargs = {
'json': '{"account": "luoshunwen005", "grantType": "password", "isBip": "false","password": "o+t2SnEEylblxlfIspJUvGFa0gCDNrU2dC34LjVFqIiTmxa855YDBE/6J7eRVBGaQwR7mozSKComk9n6kjSNRjSX1m574dRZdESIeYsmM/xk2Nt5n5dqB268qCMivJMXpHQMygpT4RpDiYoOiEqlOi9eG5G7v/5rixHiZ9xv98m34xVD1VdlaCbphoB9JI7T9HmVFJniSWt01ruC5t+aFUvfxLjOpRmYmfz8GwtSd5XXKaKr29ce1C39Fg+PtqOkQ3cOLVS9hXgzz6s2zud0++T4vwgVtrHx86aMrrozhCdKzrQuWPEO1cSsaEaNVdSUsT54je+4O+xKzxkJhoGMnQ=="}', 'json': '{"account": "kira","password": "9999999"}',
'headers': None} 'headers': None}
pyt = Pyt() pyt = Pyt()
pyt.http_client(hst, url, method, **kwargs) pyt.http_client(hst, url, method, **kwargs)
# print(pyt.request.json)
# print(pyt.request.headers)
# print(pyt.response.text)

View File

@ -1,124 +0,0 @@
# -*- coding: utf-8 -*-
import json
import re
import sys
import requests
import urllib3
sys.path.append("../")
sys.path.append("./common")
from common.http_client import logger
# from common.data_extraction.data_extractor import DataExtractor
# from common.data_extraction.dependent_parameter import DependentParameter
from common.validation.load_modules_from_folder import LoadModulesFromFolder
# @singleton
class Pyt(LoadModulesFromFolder):
# 类属性,保存一个会话对象,防止每次都创建一个新的会话,节省资源
session = requests.Session()
def __init__(self):
super().__init__()
self.request = None
self.response = None
# @staticmethod
def log_decorator(msg="请求异常"):
def decorator(func):
def wrapper(*args, **kwargs):
try:
print(f"发送请求的参数: {kwargs}")
response = func(*args, **kwargs)
print(f"请求地址 --> {response.request.url}")
print(f"请求头 --> {response.request.headers}")
print(f"请求 body --> {response.request.body}")
print(f"接口状态--> {response.status_code}")
print(f"接口耗时--> {response.elapsed}")
print(f"接口响应--> {response.text}")
return response
except Exception as e:
logger.error(f"发送请求失败: {e}")
return
return wrapper
return decorator
def before_request(self, request, **kwargs):
item = kwargs.get("item")
# 拼接动态代码段文件
prepost_script = f"prepost_script_{item.get('sheet')}_{item.get('iid')}.py"
item = self.action.load_and_execute_script(Config.SCRIPTS_DIR, prepost_script, "setup", item)
# 替换 URL, PARAMETERS, HEADER,期望值
item = self.action.replace_dependent_parameter(item)
url, query_str, request_data, headers, expected, request_data_type = self.__request_info(item)
# 提取请求参数信息
self.action.substitute_data(request_data, jp_dict=jp_dict)
headers, request_data = self.action.encrypt.encrypts(headers_crypto, headers, request_crypto, request_data)
result_tuple = None
result = "PASS"
response = None
# 执行请求操作
kwargs = {
request_data_type: request_data,
'headers': headers
}
# @log_decorator()
def http_client(self, host, url, method, **kwargs):
"""
发送 http 请求
@param host: 域名
@param __url: 接口 __url
@param method: http 请求方法
# @param request_data_type: 请求数据类型
# @param headers: 请求头部信息,默认为 None
@param kwargs: 接受 requests 原生的关键字参数
@return: 响应对象
"""
# 关闭 https 警告信息
urllib3.disable_warnings()
if not url:
raise ValueError("URL cannot be None")
__url = f'{host}{url}' if not re.match(r"https?", url) else url
# 获取session 中的请求方法
# func = getattr(session, method.lower())
# 增加兼容
# 处理 headers 参数为字符串类型的情况
if 'headers' in kwargs and isinstance(kwargs['headers'], str):
kwargs['headers'] = json.loads(kwargs['headers'])
# 处理 json 参数为字符串类型的情况
if 'json' in kwargs and isinstance(kwargs['json'], str):
kwargs['json'] = json.loads(kwargs['json'])
# 发送请求
# return func(__url, verify=True, timeout=30, **kwargs)
self.request = requests.Request(method, __url, **kwargs)
self.response = self.session.send(self.request.prepare(), timeout=30, verify=True)
return self.response
if __name__ == '__main__':
hst = 'https://ibs-test.bzlrobot.com'
url = '/api/ibs-auth/ibs_platform/login?t=168672334'
method = 'post'
kwargs = {
'json': '{"account": "luoshunwen005", "grantType": "password", "isBip": "false","password": "o+t2SnEEylblxlfIspJUvGFa0gCDNrU2dC34LjVFqIiTmxa855YDBE/6J7eRVBGaQwR7mozSKComk9n6kjSNRjSX1m574dRZdESIeYsmM/xk2Nt5n5dqB268qCMivJMXpHQMygpT4RpDiYoOiEqlOi9eG5G7v/5rixHiZ9xv98m34xVD1VdlaCbphoB9JI7T9HmVFJniSWt01ruC5t+aFUvfxLjOpRmYmfz8GwtSd5XXKaKr29ce1C39Fg+PtqOkQ3cOLVS9hXgzz6s2zud0++T4vwgVtrHx86aMrrozhCdKzrQuWPEO1cSsaEaNVdSUsT54je+4O+xKzxkJhoGMnQ=="}',
'headers': None}
pyt = Pyt()
pyt.http_client(hst, url, method, **kwargs)
# print(pyt.request.json)
# print(pyt.request.headers)
# print(pyt.response.text)

View File

@ -1,55 +0,0 @@
import re
import requests
import urllib3
from common.http_client.request_hooks import Hooks
hooks = Hooks()
def req(host, url, method, **kwargs):
"""
发送 http 请求
@param host: 域名
@param url: 接口 url
@param method: http 请求方法
@param kwargs: 接受 requests 原生的关键字参数
@return: 响应对象
"""
# 关闭 https 警告信息
urllib3.disable_warnings()
session = requests.Session()
if not url:
raise ValueError("URL 不能为 None")
url = f'{host}{url}' if not re.match(r"https?", url) else url
# 执行 before_request 钩子函数
request = requests.Request(method, url, **kwargs)
request = hooks.run_before_request_funcs(request)
# 发送请求
prepared_request = session.prepare_request(request)
response = session.send(prepared_request)
# 执行 after_request 钩子函数
response = hooks.run_after_request_funcs(response)
return response
if __name__ == '__main__':
url = "https://ibs-test.bzlrobot.com/api/ibs-auth/ibs_platform/login?t=1686740475000"
payload = {
"grantType": "password",
"account": "luoshunwen005",
"password": "TTdNRuVHxmOI7P8G2yjrfRBaMyQzmy8XGzS1rvbV8X/WqQuJpTAVgTqTAkIswJ0xbGpA2rvoVRnsW3Dg+vmtiJrm/hNUmd7nRV42tMltWDzFAgCC4KOFBC1b+mRkACby+nuiS+N7J6xEAieXJXh6Ml5jWy9qbt2rziteB8npxsMsOygiuRpUmoSkHz8wshQGtqOAr9uQRhglNLJdLWzZtas6TQvypeOMOGSatA2arJ7ipGE3oW+AiATyDu22Eh+PBO+eR7wLWOyO2XxeWhK+5EGiiIVmKRyaMY3JedVHSjpnWmnZ1Vj9pZjUaenJQfCgSS5CBnxLX/AoxB5TvJmMEA==",
"isBip": False
}
headers = {
'Content-Type': 'application/json'
}
kg = {'json': payload, "headers": headers}
res = req("", url, 'post', **kg)
print(res, res.json())

View File

@ -8,7 +8,6 @@ class Hooks:
注册 before_request 钩子函数将其添加到 before_request_funcs 列表中 注册 before_request 钩子函数将其添加到 before_request_funcs 列表中
""" """
self.before_request_funcs.append(func) self.before_request_funcs.append(func)
print(self.before_request_funcs)
return func return func
def after_request(self, func): def after_request(self, func):
@ -16,7 +15,6 @@ class Hooks:
注册 after_request 钩子函数将其添加到 after_request_funcs 列表中 注册 after_request 钩子函数将其添加到 after_request_funcs 列表中
""" """
self.after_request_funcs.append(func) self.after_request_funcs.append(func)
print(self.after_request_funcs)
return func return func
def run_before_request_funcs(self, request): def run_before_request_funcs(self, request):

View File

@ -1,64 +0,0 @@
from common.data_extraction.dependent_parameter import DependentParameter
from common.crypto.encryption_main import do_encrypt
from common.http_client.request_hooks import Hooks
from common.utils.mylogger import MyLogger
dep_par = DependentParameter() # 参数提取类实例化
logger = MyLogger()
hooks = Hooks()
@logger.log_decorator()
@hooks.before_request
def update_url(request):
"""更新url"""
request.url = dep_par.replace_dependent_parameter(request.url)
print(request.url)
return request
@logger.log_decorator()
@hooks.before_request
def update_header(request):
"""更新请求头"""
request.headers = dep_par.replace_dependent_parameter(request.headers)
print(request.headers)
return request
@logger.log_decorator()
@hooks.before_request
def update_body(request):
"""更新请求参数"""
if request.json:
request.json = dep_par.replace_dependent_parameter(request.json)
if request.encryption:
request.data = do_encrypt(request.encryption, request.json) # 数据加密MD5  
else:
request.data = dep_par.replace_dependent_parameter(request.data)
if request.encryption:
request.data = do_encrypt(request.encryption, request.data) # 数据加密MD5  
return request
@logger.log_decorator()
@hooks.before_request
def update_expected(request):
"""更新预期结果"""
request.expected = dep_par.replace_dependent_parameter(request.expected)
print(request.expected)
return request
@logger.log_decorator()
@hooks.after_request
def parse_json(response):
"""
尝试将响应中的内容解析为 JSON 格式
"""
try:
response.json_data = response.json()
except ValueError:
response.json_data = None
return response

View File

@ -23,8 +23,7 @@ class WxWorkSms:
self.send_url = f"https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key={key}" self.send_url = f"https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key={key}"
self.up_url = f"https://qyapi.weixin.qq.com/cgi-bin/webhook/upload_media?key={key}&type=file" self.up_url = f"https://qyapi.weixin.qq.com/cgi-bin/webhook/upload_media?key={key}&type=file"
def send_markdown(self, project_name, project_port, total_cases, pass_rate, success_cases, fail_cases, def send_markdown(self, project_name, project_port, total_cases, pass_rate, success_cases, fail_cases, skip_cases,
skip_cases,
error_cases, report_url): error_cases, report_url):
""" """
发送markdown 请求 发送markdown 请求
@ -107,4 +106,4 @@ class WxWorkSms:
if __name__ == '__main__': if __name__ == '__main__':
dirs = r'D:\apk_api\api-test-project\OutPut\Reports' dirs = r'D:\apk_api\api-test-project\OutPut\Reports'
WxWorkSms('8b1647d4-dc32-447c-b524-548acf18a938').send_main(dirs, 2, 3, 4, 5, 6, 7, 8, 9, 10) WxWorkSms('8b1647d4-dc32-447c-b524-548acf18a938').send_main(dirs, 2, 3, 4, 5, 6, 7, 8, 9, 10)
# WxWorkSms('8b1647d4-dc32-447c-b524-548acf18a938').send_markdown(1, 2, 3, 4, 5, 6, 7, 8, 9) # WxWorkSms('8b1647d4-dc32-447c-b524-548acf18a938').send_markdown(1, 2, 3, 4, 5, 6, 7, 8, 9)

View File

@ -1,13 +0,0 @@
import time
from functools import wraps
def fn_time(func):
def inner(*args, **kwargs):
t0 = time.time()
func(*args, **kwargs)
t1 = time.time()
print(f"{func.__name__}总共运行:{str(t1 - t0)}")
# return func(*args, **kwargs)
return inner

View File

@ -17,6 +17,7 @@ class ScriptNotFoundError(Exception):
class LoadScript: class LoadScript:
# @logger.log_decorator() # @logger.log_decorator()
def load_script(self, script_path): def load_script(self, script_path):
""" """
加载脚本文件并返回模块对象 加载脚本文件并返回模块对象

View File

@ -45,14 +45,11 @@ class MyLog:
sh.setFormatter(format_str) # 设置屏幕上显示的格式 sh.setFormatter(format_str) # 设置屏幕上显示的格式
current = time.strftime("%Y-%m-%d", time.localtime()) # 设置当前日期 current = time.strftime("%Y-%m-%d", time.localtime()) # 设置当前日期
if level == "error": if level == "error":
th = handlers.TimedRotatingFileHandler(filename=f'{file_name}/{current}_{level}.logger', th = handlers.TimedRotatingFileHandler(filename=f'{file_name}/{current}_{level}.logger', when=when,
when=when,
backupCount=back_count, encoding="utf-8") backupCount=back_count, encoding="utf-8")
else: else:
th = handlers.TimedRotatingFileHandler(filename=file_name + "/{}_info.logger".format(current), th = handlers.TimedRotatingFileHandler(filename=file_name + "/{}_info.logger".format(current), when=when,
when=when, backupCount=back_count, encoding="utf-8") # 往文件里写日志
backupCount=back_count,
encoding="utf-8") # 往文件里写日志
th.setFormatter(format_str) # 设置文件里写入的格式 th.setFormatter(format_str) # 设置文件里写入的格式
my_logger.addHandler(sh) # 将对象加入logger里 my_logger.addHandler(sh) # 将对象加入logger里

View File

@ -1,7 +1,5 @@
import json import json
from common.utils import logger
# @logger.log_decorator() # @logger.log_decorator()
def parsing_openapi(file_path): def parsing_openapi(file_path):
@ -13,30 +11,30 @@ def parsing_openapi(file_path):
for path, methods in paths.items(): for path, methods in paths.items():
for method, details in methods.items(): for method, details in methods.items():
test_case = { test_case = {
"id": count, "Id": count,
"name": "openapi", "Name": "openapi",
"description": details.get("summary"), "Description": details.get("summary"),
"Run": "yes", "Run": "yes",
"Time": "0.1", "Time": "0.1",
'method': method, 'Method': method,
'url': path, 'Url': path,
'headers': json.dumps(extract_parameters(details.get('parameters', []), 'header')), 'Headers': json.dumps(extract_parameters(details.get('parameters', []), 'header')),
'Headers是否加密': "", 'Headers Crypto': "",
'params': json.dumps(extract_parameters(details.get('parameters', []), 'query')), 'Query Str': json.dumps(extract_parameters(details.get('parameters', []), 'query')),
'request_data_type': determine_request_type(details.get('requestBody')), 'Request Data Type': determine_request_type(details.get('requestBody')),
'request_data': json.dumps(extract_request_body(details.get('requestBody'))), 'Request Data': json.dumps(extract_request_body(details.get('requestBody'))),
'请求参数是否加密': '', 'Request Data Crypto': '',
'提取请求参数': '', 'Extract Request Data': '',
'Jsonpath': '', 'Jsonpath': '',
'正则表达式': '', 'Regex': '',
'正则变量': '', 'Regex Params List': '',
'绝对路径表达式': '', 'Retrieve Value': '',
'SQL': '', 'SQL': '',
'sql变量': '', 'Sql Params Dict': '',
'预期结果': '', 'Expected': '',
'响应结果': '', 'Response': '',
'断言结果': '', 'Assertion': '',
'报错日志': ''} 'Error Log': ''}
test_cases.append(test_case) test_cases.append(test_case)
count += 1 count += 1
@ -105,3 +103,9 @@ if __name__ == '__main__':
file = f'../../cases/temporary_file/openapi.json' file = f'../../cases/temporary_file/openapi.json'
res = parsing_openapi(file) res = parsing_openapi(file)
print(res) print(res)
from common.file_handling.excel import DoExcel
from common.config import Config
templates = Config.templates # 使用标准模板
ex = DoExcel(templates)
ex.do_main("openapi.xlsx", *res)

View File

@ -8,7 +8,7 @@ result = []
def parsing_postman(path): def parsing_postman(path):
""" """
解析postman到处的json文件 解析postman导出的json文件 转为excel测试用例
Args: Args:
path: path:
@ -29,9 +29,9 @@ def parsing_postman(path):
_parse_api(content=content.get('item')) _parse_api(content=content.get('item'))
elif 'request' in content.keys(): elif 'request' in content.keys():
id_count += 1 id_count += 1
api['id'] = id_count api['Id'] = id_count
api['name'] = 'postman' api['Name'] = 'postman'
api['description'] = content.get('name') api['Description'] = content.get('name')
request = content.get('request') request = content.get('request')
api['Run'] = 'yes' api['Run'] = 'yes'
api['Time'] = 0.5 api['Time'] = 0.5
@ -39,14 +39,12 @@ def parsing_postman(path):
# api请求方法 # api请求方法
api['Method'] = request.get('method', 'GET').upper() api['Method'] = request.get('method', 'GET').upper()
header = request.get('header') header = request.get('header')
header = {item.get('key'): item.get('value') for item in header = {item.get('key'): item.get('value') for item in header} if header else {}
header} if header else {}
auth = request.get('auth') auth = request.get('auth')
if auth: if auth:
auth_type = auth.get('type') auth_type = auth.get('type')
if auth.get(auth_type): if auth.get(auth_type):
auth_value = {item.get('key'): item.get('value') for item in auth_value = {item.get('key'): item.get('value') for item in auth.get(auth_type) if
auth.get(auth_type) if
(item and item.get('key'))} (item and item.get('key'))}
header.update(auth_value) header.update(auth_value)
# api 请求地址 # api 请求地址
@ -63,51 +61,46 @@ def parsing_postman(path):
# [item.get('key') + '=' + (item.get('value') or '') for item in url.get('query') if item]) # [item.get('key') + '=' + (item.get('value') or '') for item in url.get('query') if item])
# api请求头 # api请求头
api['Headers'] = json.dumps(header, ensure_ascii=False) api['Headers'] = json.dumps(header, ensure_ascii=False)
api['Headers是否加密'] = '' api['Headers Crypto'] = ''
api['params'] = '' api['Query Str'] = ''
body = request.get('body') body = request.get('body')
if body: if body:
# api接口请求参数类型 # api接口请求参数类型
request_mode = body.get('mode') request_mode = body.get('mode')
if 'raw' == request_mode: if 'raw' == request_mode:
api['request_data_type'] = 'json' api['Request Data Type'] = 'json'
elif 'formdata' == request_mode: elif 'formdata' == request_mode:
api['request_data_type'] = 'data' api['Request Data Type'] = 'data'
elif 'urlencoded' == request_mode: elif 'urlencoded' == request_mode:
api['request_data_type'] = 'data' api['Request Data Type'] = 'data'
# api接口请求参数 # api接口请求参数
request_data = body.get(request_mode) request_data = body.get(request_mode)
api['Request Data'] = {} api['Request Data'] = {}
if request_data and 'raw' == request_mode: if request_data and 'raw' == request_mode:
api['Request Data'].update( api['Request Data'].update(
json.loads(request_data.replace('\t', '').replace('\n', json.loads(request_data.replace('\t', '').replace('\n', '').replace('\r', '')))
'').replace(
'\r', '')))
elif request_data and 'formdata' == request_mode: elif request_data and 'formdata' == request_mode:
if isinstance(request_data, list): if isinstance(request_data, list):
for item in request_data: for item in request_data:
if item.get("type") == "text": if item.get("type") == "text":
api['Request Data'].update({item.get( api['Request Data'].update({item.get('key'): item.get("value", "")})
'key'): item.get("value", "")})
elif item.get("type") == "file": elif item.get("type") == "file":
api["Request Data"].update({item.get( api["Request Data"].update({item.get('key'): item.get("src", "")})
'key'): item.get("src", "")}) api["Request Data Type"] = "files"
api["request_data_type"] = "files" api["Request Data"] = json.dumps(api["Request Data"], ensure_ascii=False)
api["Request Data"] = json.dumps(api["Request Data"], api['Request Data Crypto'] = ''
ensure_ascii=False) api['Extract Request Data'] = ''
api['请求参数是否加密'] = ''
api['提取请求参数'] = ''
api['Jsonpath'] = '' api['Jsonpath'] = ''
api['正则表达式'] = '' api['Regex'] = ''
api['正则变量'] = '' api['Regex Params List'] = ''
api['绝对路径表达式'] = '' api['Retrieve Value'] = ''
api['SQL'] = '' api['SQL'] = ''
api['sql变量'] = '' api['Sql Params Dict'] = ''
api['预期结果'] = '' api['Expected'] = ''
api['响应结果'] = '' api['Response'] = ''
api['断言结果'] = '' api['Assertion'] = ''
api['报错日志'] = '' api['Error Log'] = ''
result.append(api) result.append(api)
@ -117,7 +110,7 @@ def parsing_postman(path):
if __name__ == '__main__': if __name__ == '__main__':
pat = r'E:\apitest\cases\temporary_file\postman.json' pat = r'..\..\cases\temporary_file\postman.json'
res = parsing_postman(pat) res = parsing_postman(pat)
from common.file_handling.excel import DoExcel from common.file_handling.excel import DoExcel
from common.config import Config from common.config import Config

View File

@ -1,21 +0,0 @@
#!/usr/bin/env python
# encoding: utf-8
"""
@author: kira
@contact: 262667641@qq.com
@file: retry.py
@time: 2023/3/15 11:15
@desc:
"""
def retry(func):
"函数重跑"
def run_again(*args, **kwargs):
try:
func(*args, **kwargs)
except:
pass
return run_again

View File

@ -12,12 +12,9 @@ from functools import wraps
def singleton(cls): def singleton(cls):
""" """
单例模式类装饰器
Args: Args:
cls:被装饰类 cls:被装饰类
Returns: Returns:
""" """
instance = {} instance = {}

View File

@ -6,8 +6,9 @@ from dataclasses import dataclass
@dataclass @dataclass
class Variables: class Variables:
variables = {} # 定义依赖表 variables = {} # 定义依赖表
# 预编译正则表达式
pattern_l = re.compile(r"{{\s*([^}\s]+)\s*}}(?:\[(\d+)\])?") pattern_l = re.compile(r"{{\s*([^}\s]+)\s*}}(?:\[(\d+)\])?")
PATTERN = re.compile(r"{{(.*?)}}") # 预编译正则表达式 PATTERN = re.compile(r"{{(.*?)}}")
pattern = re.compile(r'({)') pattern = re.compile(r'({)')
pattern_fun = re.compile(r"{{(\w+\(\))}}") pattern_fun = re.compile(r"{{(\w+\(\))}}")
@ -35,13 +36,13 @@ class Variables:
if __name__ == '__main__': if __name__ == '__main__':
from common.file_handling.get_excel_init import get_init # from common.file_handling.get_excel_init import get_init
from common.config import Config from common.config import Config
test_file = Config.test_case test_file = Config.test_case
excel_handle, init_data, test_case = get_init(test_file) # excel_handle, init_data, test_case = get_init(test_file)
initialize_data = eval(init_data.get("initialize_data")) # initialize_data = eval(init_data.get("initialize_data"))
print(initialize_data) # print(initialize_data)
d = Variables # d = Variables
d.set_variable(initialize_data) # 初始化依赖表 # d.set_variable(initialize_data)
print("--------------------->", d.get_variable()) # print("--------------------->", d.get_variable())

View File

@ -1,17 +0,0 @@
# decorator_test.py
def my_decorator(func):
print("Decorator function called.")
def wrapper(*args, **kwargs):
print("Wrapper function called.")
return func(*args, **kwargs)
return wrapper
@my_decorator
def my_function():
print("Original function called.")
# print("Module imported.")

View File

@ -1,35 +0,0 @@
#!/usr/bin/env python
# encoding: utf-8
"""
@author: kira
@contact: 262667641@qq.com
@file: depr.py
@time: 2023/6/19 17:51
@desc:
"""
import requests
from common.database.redis_client import RedisClient
redis_client = RedisClient()
# 第一个接口,设置依赖数据
def first_api():
response = requests.get('https://api.example.com/first')
data = response.json()
redis_client.set_data('key', data['value'])
def second_api():
# 获取依赖数据
dependency_data = redis_client.get_data('key')
response = requests.post('https://api.example.com/second', data={'data': dependency_data})
result = response.json()
# 处理接口响应结果
if __name__ == '__main__':
first_api()
second_api()

View File

@ -1,38 +0,0 @@
import gc
# 引用计数示例
# a = [1, 2, 3]
# b = a
# print(sys.getrefcount(a)-1) # 输出2a和b都引用了该对象
# b = None
# print(sys.getrefcount(a)-1) # 输出1只有a引用了该对象
#
# # 标记-清除算法示例
class Node:
def __init__(self):
self.next = None
node1 = Node()
node2 = Node()
node1.next = node2
node2.next = node1
res = gc.collect() # 执行垃圾回收,循环引用的对象会被清除
# # 分代回收示例
# a = [1, 2, 3]
# gc.collect()
# print(gc.get_count()) # 输出(0, 0, 0)第0代对象计数为1
# b = [4, 5, 6]
# c = [7, 8, 9]
# gc.collect()
# print(gc.get_count()) # 输出(1, 0, 0)第0代对象计数为0第1代对象计数为2
# d = [10, 11, 12]
# e = [13, 14, 15]
# gc.collect()
# print(gc.get_count()) # 输出(2, 0, 0)第0代对象计数为0第1代对象计数为0第2代对象计数为2
#
# # 对象的代信息
# print(gc.get_objects()) # 输出对象的代信息

View File

@ -1,16 +0,0 @@
def sum(x, y):
return x + y
def generate():
for i in range(4):
yield i
res = generate()
for n in [0, 1, 2, 3]:
base =(sum(i, n) for i in res)
print(base)

View File

@ -1,59 +0,0 @@
# !/usr/bin/env python
# encoding: utf-8
"""
@author: kira
@contact: 262667641@qq.com
@file: rt.py
@time: 2023/6/19 17:15
@desc:
"""
import concurrent.futures
import requests
from common.database.redis_client import RedisClient
# 创建 Redis 客户端
redis_client = RedisClient()
def get_user_info(user_id):
cache_key = f'user:{user_id}'
user_info = redis_client.get_data(cache_key)
if not user_info:
# 调用接口获取用户信息
response = requests.get(f'http://127.0.0.1:5000/?user_id={user_id}')
if response.status_code == 200:
user_info = response.text
print(user_info)
redis_client.set_data(cache_key, user_info, expire_time=3600)
else:
print(f"Failed to retrieve user info for user_id: {user_id}. Status code: {response.status_code}")
return user_info
# 并发测试函数
def run_concurrent_test(user_ids):
with concurrent.futures.ThreadPoolExecutor() as executor:
# 提交任务到线程池
future_to_user_id = {executor.submit(get_user_info, user_id): user_id for user_id in user_ids}
# 处理返回结果
for future in concurrent.futures.as_completed(future_to_user_id):
user_id = future_to_user_id[future]
try:
user_info = future.result()
print(f"user_id: {user_id}; user_info: {user_info}")
except Exception as e:
print(f"Error occurred for user_id: {user_id}, Error: {str(e)}")
if __name__ == '__main__':
u_ids = [i for i in range(10, 99)]
run_concurrent_test(u_ids)
# u_ids = [i for i in range(1000, 99999)]
# for i in u_ids:
# response = requests.get(f'http://127.0.0.1:5000/?user_id={i}')
# print(response.text)

View File

@ -1,39 +0,0 @@
#!/usr/bin/env python
# encoding: utf-8
"""
@author: kira
@contact: 262667641@qq.com
@file: test_single_class.py
@time: 2023/6/21 17:03
@desc:
"""
from functools import wraps
def single(cls):
instance = {}
@wraps(cls)
def decortator(*args, **kwargs):
if cls not in instance:
instance[cls] = cls(*args, **kwargs)
return instance[cls]
return decortator
class A:
pass
@single
class B(A):
pass
class C(B):
pass
if __name__ == '__main__':
C()

View File

@ -2,8 +2,8 @@ import os.path
from common.config import Config from common.config import Config
from common.file_handling.excel import DoExcel from common.file_handling.excel import DoExcel
from common.utils.parsing_postman import parsing_postman
from common.utils.parsing_openapi import parsing_openapi from common.utils.parsing_openapi import parsing_openapi
from common.utils.parsing_postman import parsing_postman
class ExcelConverter: class ExcelConverter:
@ -37,10 +37,8 @@ class ExcelConverter:
if __name__ == '__main__': if __name__ == '__main__':
postman_to_json = r'.\data\temporary_file\postman.json' # postman导出的json文件 postman_to_json = r'.\data\temporary_file\postman.json' # postman导出的json文件
postman_out_file = os.path.join(Config.base_path, 'cases', 'test_cases', postman_out_file = os.path.join(Config.base_path, 'cases', 'test_cases', 'test_postman_cases.xlsx') # 转化后的文件保存的位置
'test_postman_cases.xlsx') # 转化后的文件保存的位置
openapi_to_json = r'.\data\temporary_file\openapi.json' # postman导出的json文件 openapi_to_json = r'.\data\temporary_file\openapi.json' # postman导出的json文件
openapi_out_file = os.path.join(Config.base_path, 'cases', 'test_cases', openapi_out_file = os.path.join(Config.base_path, 'cases', 'test_cases', 'test_openapi_cases.xlsx') # 转化后的文件保存的位置
'test_openapi_cases.xlsx') # 转化后的文件保存的位置
ExcelConverter('postman', postman_to_json, postman_out_file).main() ExcelConverter('postman', postman_to_json, postman_out_file).main()
ExcelConverter('postman', openapi_to_json, openapi_out_file).main() ExcelConverter('postman', openapi_to_json, openapi_out_file).main()

View File

@ -11,5 +11,7 @@
__all__ = ["online_function"] __all__ = ["online_function"]
# 用户自定义扩展函数文件。
def online_function(): def online_function():
pass pass

View File

@ -1,8 +0,0 @@
# -*- coding: utf-8 -*-
# @Time : 2019/11/18 10:15
# @Author : kira
# @Email : 262667641@qq.com
# @File : __init__.py.py
# @Project : risk_api_project

File diff suppressed because it is too large Load Diff

View File

@ -1,126 +0,0 @@
import json
import requests
class Hooks:
def __init__(self):
self.before_request_funcs = {}
self.after_request_funcs = {}
def before_request(self, func):
"""
注册 before_request 钩子函数
"""
self.before_request_funcs[func.__name__] = func
return func
def after_request(self, func):
"""
注册 after_request 钩子函数
"""
self.after_request_funcs[func.__name__] = func
return func
def run_before_request_hooks(self, func_names, request, json_data):
"""
执行 before_request 钩子函数
"""
for func_name in func_names:
if func_name in self.before_request_funcs:
func = self.before_request_funcs[func_name]
json_data = func(request, json_data)
return json_data
def run_after_request_hooks(self, func_names, response):
"""
执行 after_request 钩子函数
"""
for func_name in func_names:
if func_name in self.after_request_funcs:
func = self.after_request_funcs[func_name]
response = func(response)
return response
hooks = Hooks()
session = requests.Session()
def req(url, method, **kwargs):
"""
发送请求并返回响应对象
"""
before_hooks = kwargs.pop('before_hooks', [])
after_hooks = kwargs.pop('after_hooks', [])
json_data = kwargs.pop('json', {})
request = requests.Request(method=method, url=url, **kwargs)
prepared_request = session.prepare_request(request)
json_data = hooks.run_before_request_hooks(before_hooks, prepared_request, json_data)
prepared_request.body = json.dumps(json_data)
response = session.send(prepared_request)
response = hooks.run_after_request_hooks(after_hooks, response)
return response
@hooks.before_request
def add_authentication_headers(request, json_data):
"""
添加认证头信息
"""
print("前置钩子函数,添加认证头信息", request)
request.headers["Authorization"] = "Bearer YOUR_AUTH_TOKEN"
return json_data
@hooks.before_request
def handle_dependent_parameters(request, json_data):
"""
处理依赖参数
"""
print("前置钩子函数,处理依赖参数", request)
json_data["verification_code"] = get_verification_code()
return json_data
def get_verification_code():
# 实现获取验证码的逻辑
return "YOUR_VERIFICATION_CODE"
@hooks.after_request
def after_dependent_parameters(request, json_data):
"""
处理后置
"""
print("后置钩子函数,处理依赖参数", request)
return json_data
def test_user_registration():
url = "http://jsonplaceholder.typicode.com/posts"
data = {
"userId": "testuser",
"title": "password123",
"body": "测试玩家勇哥"
}
headers = {
"Content-Type": "application/json"
}
before_hooks = [add_authentication_headers.__name__, handle_dependent_parameters.__name__]
after_hooks = [after_dependent_parameters.__name__]
kwargs = {"json": data, "headers": headers}
return req(url, "post", before_hooks=before_hooks, after_hooks=after_hooks, **kwargs)
# assert response.status_code == 200
# assert response.json()["success"] is True
if __name__ == "__main__":
res = test_user_registration()
print(res, res.text)

View File

@ -1,29 +0,0 @@
#!/usr/bin/env python
# encoding: utf-8
"""
@author: kira
@contact: 262667641@qq.com
@file: hooks_decorator.py
@time: 2023/6/16 17:03
@desc:
"""
from temp.extent.hooks import Hooks
hooks = Hooks()
def before_decorator(func):
def wrapper(*args, **kwargs):
# hooks.execute_hooks(*args, **kwargs)
return func(*args, **kwargs)
return wrapper
def after_decorator(func):
def wrapper(*args, **kwargs):
result = func(*args, **kwargs)
# hooks.execute_hooks(*args, **kwargs)
return result
return wrapper

View File

@ -1,23 +0,0 @@
#!/usr/bin/env python
# encoding: utf-8
"""
@author: kira
@contact: 262667641@qq.com
@file: run.py
@time: 2023/6/16 16:52
@desc:
"""
import requests
from temp.extent.hooks_decorator import before_decorator, after_decorator
@after_decorator
@before_decorator
def test_user_registration(url, method, **kwargs):
requests.request(method, url, **kwargs)
if __name__ == "__main__":
kwg = {}
test_user_registration("http://jsonplaceholder.typicode.com/posts/2", "get", **kwg)

View File

@ -1,125 +0,0 @@
import json
import requests
class Hooks:
def __init__(self):
self.before_request_funcs = {}
self.after_request_funcs = {}
def before_request(self, func):
"""
注册 before_request 钩子函数
"""
self.before_request_funcs[func.__name__] = func
return func
def after_request(self, func):
"""
注册 after_request 钩子函数
"""
self.after_request_funcs[func.__name__] = func
return func
def run_before_request_hooks(self, func_names, request, json_data):
"""
执行 before_request 钩子函数
"""
for func_name in func_names:
if func_name in self.before_request_funcs:
func = self.before_request_funcs[func_name]
json_data = func(request, json_data)
return json_data
def run_after_request_hooks(self, func_names, request, response):
"""
执行 after_request 钩子函数
"""
for func_name in func_names:
if func_name in self.after_request_funcs:
func = self.after_request_funcs[func_name]
response = func(request, response)
return response
hooks = Hooks()
session = requests.Session()
def req(url, method, **kwargs):
"""
发送请求并返回响应对象
"""
before_hooks = kwargs.pop('before_hooks', [])
after_hooks = kwargs.pop('after_hooks', [])
json_data = kwargs.pop('json', {})
request = requests.Request(method=method, url=url, **kwargs)
prepared_request = session.prepare_request(request)
json_data = hooks.run_before_request_hooks(before_hooks, prepared_request, json_data)
prepared_request.body = json.dumps(json_data)
response = session.send(prepared_request)
response = hooks.run_after_request_hooks(after_hooks, prepared_request, response)
return response
@hooks.before_request
def add_authentication_headers(request, json_data):
"""
添加认证头信息
"""
print("前置钩子函数,添加认证头信息", request)
request.headers["Authorization"] = "Bearer YOUR_AUTH_TOKEN"
return json_data
@hooks.before_request
def handle_dependent_parameters(request, json_data):
"""
处理依赖参数
"""
print("前置钩子函数,处理依赖参数", request)
json_data["verification_code"] = get_verification_code()
return json_data
def get_verification_code():
# 实现获取验证码的逻辑
return "YOUR_VERIFICATION_CODE"
@hooks.after_request
def after_dependent_parameters(request, response):
"""
处理后置
"""
print("发送请求后执行", request, "后置钩子函数,处理依赖参数", response)
return response
def test_user_registration():
url = "http://www.baidu.com"
# url = "http://jsonplaceholder.typicode.com/posts"
data = {
"userId": "testuser",
"title": "password123",
"body": "测试玩家勇哥"
}
headers = {
"Content-Type": "application/json"
}
before_hooks = [add_authentication_headers.__name__, handle_dependent_parameters.__name__]
after_hooks = [after_dependent_parameters.__name__]
kwargs = {"json": data, "headers": headers}
return req(url, "post", before_hooks=before_hooks, after_hooks=after_hooks, **kwargs)
if __name__ == "__main__":
res = test_user_registration()
print("最终打印:----->", res)

View File

@ -42,3 +42,23 @@ class Hooks:
response = func(response) response = func(response)
return response return response
hooks = Hooks()
def before_decorator(func):
def wrapper(*args, **kwargs):
# hooks.execute_hooks(*args, **kwargs)
return func(*args, **kwargs)
return wrapper
def after_decorator(func):
def wrapper(*args, **kwargs):
result = func(*args, **kwargs)
# hooks.execute_hooks(*args, **kwargs)
return result
return wrapper

View File

@ -1,12 +1,4 @@
#!/usr/bin/env python # !/usr/bin/env python
# encoding: utf-8
"""
@author: kira
@contact: 262667641@qq.com
@file: prepost_script.py
@time: 2023/6/16 16:58
@desc:
""" # !/usr/bin/env python
# encoding: utf-8 # encoding: utf-8
""" """
@author: kira @author: kira
@ -15,7 +7,7 @@
@time: 2023/6/16 16:58 @time: 2023/6/16 16:58
@desc: @desc:
""" """
from temp.extent.hooks_decorator import hooks # 导入hooks对象 from temp.hooks import hooks
@hooks.before_request @hooks.before_request

View File

@ -4,8 +4,8 @@ import unittest
from ddt import ddt, data from ddt import ddt, data
from common import bif_functions from common import bif_functions
from common.config import Config
from common.action import Action from common.action import Action
from common.config import Config
from common.database.mysql_client import MysqlClient from common.database.mysql_client import MysqlClient
from common.file_handling.do_excel import DoExcel from common.file_handling.do_excel import DoExcel
from common.utils.mylogger import MyLogger from common.utils.mylogger import MyLogger
@ -36,8 +36,7 @@ class TestProjectApi(unittest.TestCase):
def test_api(self, item): def test_api(self, item):
# f"""用例:{item.get("name")}_{item.get("desc")}""" # f"""用例:{item.get("name")}_{item.get("desc")}"""
sheet, iid, condition, st, name, desc, headers_crypto, request_crypto, method = self.__base_info( sheet, iid, condition, st, name, desc, headers_crypto, request_crypto, method = self.__base_info(item)
item)
regex, keys, deps, jp_dict, extract_request_data = self.__extractor_info(item) regex, keys, deps, jp_dict, extract_request_data = self.__extractor_info(item)
if self.__is_run(condition): if self.__is_run(condition):
@ -70,10 +69,7 @@ class TestProjectApi(unittest.TestCase):
try: try:
# 执行请求操作 # 执行请求操作
kwargs = { kwargs = {request_data_type: request_data, 'headers': headers}
request_data_type: request_data,
'headers': headers
}
response = self.action.http_client(host, url, method, **kwargs) response = self.action.http_client(host, url, method, **kwargs)
# 执行后置代码片段 # 执行后置代码片段
self.action.load_and_execute_script(Config.SCRIPTS_DIR, prepost_script, "teardown", response) self.action.load_and_execute_script(Config.SCRIPTS_DIR, prepost_script, "teardown", response)

View File

@ -1,206 +0,0 @@
import time
import unittest
from ddt import ddt, data
from common import bif_functions
from common.action import Action
from common.config import Config
from common.database.mysql_client import MysqlClient
from common.file_handling.do_excel import DoExcel
from common.utils.mylogger import MyLogger
test_file = Config.test_case # 获取 excel 文件路径
excel = DoExcel(test_file)
init_data, test_case = excel.get_excel_init_and_cases()
databases = init_data.get('databases') # 获取数据库配置信息
mysql = MysqlClient(databases) # 初始化 mysql 链接
logger = MyLogger()
initialize_data = eval(init_data.get("initialize_data"))
host = init_data.get('host', "") + init_data.get("path", "")
@ddt
class TestProjectApi(unittest.TestCase):
maxDiff = None
action = Action()
@classmethod
def setUpClass(cls) -> None:
cls.action.set_variable(initialize_data) # 加载初始化数据
cls.action.set_bif_fun(bif_functions) # 加载内置方法
@data(*test_case)
def test_api(self, item):
# f"""用例:{item.get("name")}_{item.get("desc")}"""
sheet, iid, condition, st, name, desc, headers_crypto, request_crypto, method = self.__base_info(
item)
regex, keys, deps, jp_dict, extract_request_data = self.__extractor_info(item)
if self.__is_run(condition):
return
self.__pause_execution(st)
sql, sql_params_dict = self.__sql_info(item)
# 首先执行sql替换,将sql替换为正确的sql语句
sql = self.action.replace_dependent_parameter(sql)
self.__exc_sql(sql, sql_params_dict, method)
# 拼接动态代码段文件
prepost_script = f"prepost_script_{sheet}_{iid}.py"
item = self.action.load_and_execute_script(Config.SCRIPTS_DIR, prepost_script, "setup", item)
# 替换 URL, PARAMETERS, HEADER,期望值
item = self.action.replace_dependent_parameter(item)
url, query_str, request_data, headers, expected, request_data_type = self.__request_info(item)
# 提取请求参数信息
self.action.substitute_data(request_data, jp_dict=jp_dict)
headers, request_data = self.action.encrypt.encrypts(headers_crypto, headers, request_crypto, request_data)
result_tuple = None
result = "PASS"
response = None
try:
# 执行请求操作
kwargs = {
"sheet": sheet,
"iid": iid,
"item": item,
"jp_dict": jp_dict,
request_data_type: request_data,
'headers': headers
}
response = self.action.http_client(host, url, method, **kwargs)
# 执行后置代码片段
self.action.load_and_execute_script(Config.SCRIPTS_DIR, prepost_script, "teardown", response)
result_tuple = self.action.run_validate(expected, response.json()) # 执行断言 返回结果元组
self.assertNotIn("FAIL", result_tuple, "FAIL 存在结果元组中")
try:
# 提取响应
self.action.substitute_data(response.json(), regex=regex, keys=keys, deps=deps, jp_dict=jp_dict)
except Exception as err:
logger.error(f"提取响应失败:{sheet}_{iid}_{name}_{desc}"
f"\nregex={regex};"
f" \nkeys={keys};"
f"\ndeps={deps};"
f"\njp_dict={jp_dict}"
f"\n{err}")
except Exception as e:
result = "FAIL"
logger.error(f'异常用例: {sheet}_{iid}_{name}_{desc}\n{e}')
raise e
finally:
response = response.text if response is not None else str(response)
assert_log = str(result_tuple)
# 响应结果及测试结果回写 excel
excel.write_back(sheet_name=sheet, i=iid, response=response, test_result=result, assert_log=assert_log)
@classmethod
def tearDownClass(cls) -> None:
logger.info(f"所有用例执行完毕")
@staticmethod
def __base_info(item):
"""
获取基础信息
Args:
item:
Returns:
"""
sheet = item.pop("sheet")
item_id = item.pop("Id")
condition = item.pop("Run")
sleep_time = item.pop("Time")
name = item.pop("Name")
desc = item.pop("Description")
headers_crypto = item.pop("Headers Crypto")
request_data_crypto = item.pop("Request Data Crypto")
method = item.pop("Method")
return sheet, item_id, condition, sleep_time, name, desc, headers_crypto, request_data_crypto, method
@staticmethod
def __sql_info(item):
sql = item.pop("SQL")
sql_params_dict = item.pop("Sql Params Dict")
return sql, sql_params_dict
@staticmethod
def __extractor_info(item):
"""
获取提取参数的基本字段信息
Args:
item:
Returns:
"""
regex = item.pop("Regex")
keys = item.pop("Regex Params List")
deps = item.pop("Retrieve Value")
jp_dict = item.pop("Jsonpath")
extract_request_data = item.pop("Extract Request Data")
return regex, keys, deps, jp_dict, extract_request_data
@staticmethod
def __request_info(item):
"""
请求数据
Args:
item:
Returns:
"""
url = item.pop("Url")
query_str = item.pop("Query Str")
request_data = item.pop("Request Data")
headers = item.pop("Headers")
expected = item.pop("Expected")
request_data_type = item.pop("Request Data Type") if item.get("Request Data Type") else 'params'
return url, query_str, request_data, headers, expected, request_data_type
@staticmethod
def __is_run(condition):
is_run = condition
if not is_run or is_run.upper() != "YES":
return True
@staticmethod
def __pause_execution(sleep_time):
if sleep_time:
try:
time.sleep(sleep_time)
except Exception as e:
logger.error("暂时时间必须是数字")
raise e
def __exc_sql(self, sql, sql_params_dict, method):
if sql:
try:
execute_sql_results = mysql.do_mysql(sql)
if execute_sql_results and sql_params_dict:
self.action.extract_request_data(execute_sql_results, jp_dict=sql_params_dict)
if method == "SQL" and mysql:
return None
except Exception as e:
logger.error(f'执行 sql 失败:{sql},异常信息:{e}')
raise e
return sql
if __name__ == '__main__':
unittest.main()

View File

@ -1,86 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2021/10/12 11:12
# @Author : kira
# -*- coding: utf-8 -*-
# @Time : 2021/7/6 15:31
# @Author : kira
# @Email : 262667641@qq.com
# @File : test_wifi_upload_file.py
# @Project : api-test-project
import os
import random
import sys
import jsonpath
from common.file_handling.file_utils import FileUtils
sys.path.append("../../")
sys.path.append("../../common")
from common.http_client.http_client import Pyt
from common.config import Config
from common.random_tools.names import name
from common.random_tools.phone_numbers import phone
from common.random_tools.identification import idcard
pyt = Pyt()
def test_labor_register(cases):
host = 'https://bimuse.bzlrobot.com/bsp/test/user/ugs'
headers = {"BSP_APP_ID": "8d1f5bdc9c6648af84a98e2c017846c5",
"BSP_TOKEN": "32c9fd9ccc0565d0281b89b5fa198008",
"BSP_USER_ENV_ID": "f8e334ea85df45ed99a311ed022c2973",
"BSP_USER_ID": "216684145642752200",
"BSP_USER_TENANT": "216856950236843913",
"bzlrobot-authorization": "bearer 32c9fd9ccc0565d0281b89b5fa198008",
"projectId": "99000022",
"projectName": "%E4%BD%9B%E5%B1%B1%E9%99%88%E6%9D%91%E6%97%A7%E6%94%B9%E9%A1%B9%E7%9B%AE"
}
sex_id = random.randint(0, 1)
user_sex = "F" if sex_id == 0 else "M"
user_name = name.get_girl() if sex_id == 0 else name.get_boy()
user_id_card = idcard.get_generate_id(sex=sex_id)
user_birthday = idcard.get_birthday(user_id_card)
user_mobile = phone.get_mobile_number()
# 请求工种接口,获取所有工种
url = f"/ibs/api/ibs-lms-base/work-kind/page/list?t=1636702597000&current=1&size=1000&projectId={cases['projectId']}&key=&supplyId=&status=0&enabled=1"
kwargs = {
"headers": headers
}
res = pyt.http_client(host, url, "GET", **kwargs).json()
codes = jsonpath.jsonpath(res, "$.data..code")
names = jsonpath.jsonpath(res, "$.data..name")
work_kinds = dict(zip(codes, names))
work_code = random.choice(list(work_kinds.keys()))
cases["name"] = user_name
cases["idCard"]["name"] = user_name
cases["idCard"]["cardNo"] = user_id_card
cases["phone"] = user_mobile
cases["idCard"]["sex"] = user_sex
cases["idCard"]["birthday"] = user_birthday
cases['workKindCode'] = str(work_code)
cases['workKindName'] = work_kinds[str(work_code)]
if "positionName" in cases:
url = "/ibs/api/ibs-lms-member/manager/register?t=1634607524000"
else:
url = "/ibs/api/ibs-lms-member/labor/register?t=1633743835000"
try:
kwargs.update({"json": cases})
res = pyt.http_client(host, url, "POST", **kwargs)
assert res.status_code == 200
except Exception as e:
raise e
if __name__ == '__main__':
# 测试数据路径
test_case_path = os.path.join(Config.base_path, 'cases', 'cases', "labor.json")
test_case = FileUtils().read_json_file(test_case_path)
for i in range(500):
test_labor_register(test_case)