This commit is contained in:
chenyongzhiaaron 2023-05-08 18:14:46 +08:00
parent 121286fe77
commit 3d2146599d
16 changed files with 446 additions and 181 deletions

View File

@ -4,7 +4,7 @@
<content url="file://$MODULE_DIR$">
<excludeFolder url="file://$MODULE_DIR$/venv" />
</content>
<orderEntry type="jdk" jdkName="Python 3.9 (3)" jdkType="Python SDK" />
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
<component name="PackageRequirementsSettings">

View File

@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="dataSourceStorageLocal" created-in="PY-231.8109.197">
<component name="dataSourceStorageLocal" created-in="PY-222.3739.56">
<data-source name="MySQL - @10.8.203.25" uuid="dd327101-360a-4ce3-a39e-0a30a298d5cf">
<database-info product="MySQL" version="8.0.22" jdbc-version="4.2" driver-name="MySQL Connector/J" driver-version="mysql-connector-java-8.0.21 (Revision: 33f65445a1bcc544eb0120491926484da168f199)" dbms="MYSQL" exact-version="8.0.22" exact-driver-version="8.0">
<extra-name-characters>#@</extra-name-characters>

View File

@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.9 (3)" project-jdk-type="Python SDK" />
<component name="ProjectRootManager" version="2" project-jdk-name="Pipenv (api-test-project)" project-jdk-type="Python SDK" />
<component name="PyCharmProfessionalAdvertiser">
<option name="shown" value="true" />
</component>

View File

@ -31,15 +31,6 @@ class GetConfigData:
if __name__ == '__main__':
pat = r"D:\api-test-project\data\bgy\ai_sql.json"
pat = r""
t = GetConfigData.get_json_data(pat)
database_2 = {
"host": "10.8.203.25",
"port": 3306,
"database": "ibs_ai_iot",
"user": "root",
"password": "gd1234"
}
data = DoMysql(database_2, **t).do_mysql()
# print(t)
print(data)
print(t)

View File

@ -1,33 +1,16 @@
# -*- coding: utf-8 -*-
import datetime
import threading
import time
n = 5
sum = 0
a = 1
while a <= n:
sum = sum + a
a += 1
print(n, sum)
number = 0
lk = threading.Lock()
n = 5
sum = 0
a = 1
while a <= n:
sum = sum + a
a += 1
print(n, sum)
def update_num(args):
"""修改全局变量的函数"""
global number
# lk.acquire() # 枷锁
with lk:
for i in range(100000000):
number += 1
print(
f"当前线程:{threading.current_thread()},当前活跃线程:{threading.active_count()},当前任务执行后的结果:{number}")
# lk.release() # 释放锁
st = time.time()
ts = []
for i in range(2):
# 实例2个线程
t = threading.Thread(target=update_num, args=(lk,)) # 传入锁
t.start() # 启动线程
ts.append(t)
for i in ts:
i.join() # 阻塞
e_t = time.time()
print("主线程结束", number, "耗时", e_t - st)

127
debug/login.py Normal file
View File

@ -0,0 +1,127 @@
import requests
import json
import time
import datetime
import random
import hashlib
# import setting
# from test3 import AES128_de, AES128_en
# from Lib.readexcel import ReadExcel
# from sendrequests import SendRequests
now_time = time.time()
# timestamp = datetime.datetime.now().strftime('%Y%m%d%H%M%S')
stamp = str(int(round(now_time * 1000)))
# -------------- 固定参数 --------------
# reqId = "ga1a838aad7c85992b71bg3f717975d9"
# loginAccount = "17328565609"
# iampwd = "9056749f0dde456780a336ea05640d0a"
# path = "https://mp-prod.smartmidea.net/mas/v5/app/proxy?alias="
# header = {"Connection": "keep-alive", "Content-Type": "application/json", "accessToken": "administrator-token",
# "User-Agent": "IOT2020TEST", "version": "7.1.0", "Content-Length": "98",
# "Host": "mp-prod.smartmidea.net"}
# -------------- 开始加签加密并返回登录请求body --------------
class Login:
def __init__(self, data, login_account, req_id, iam_pwd, pwd):
self.data = data
self.random_str = str(random.random())
self.timestamp = datetime.datetime.now().strftime('%Y%m%d%H%M%S')
self.reqId = req_id
self.loginAccount = login_account
self.iampwd = iam_pwd
self.pwd = pwd
self.path = "https://mp-prod.smartmidea.net/mas/v5/app/proxy?alias="
self.headers = {"Connection": "keep-alive", "Content-Type": "application/json",
"accessToken": "administrator-token",
"User-Agent": "IOT2020TEST", "version": "7.1.0", "Content-Length": "98",
"Host": "mp-prod.smartmidea.net"}
@staticmethod
def md5(st: str) -> str:
"""
Args:
st:待加密字符串
Returns: 返回MD5 加密后的字符串
"""
md = hashlib.md5() # 创建MD5对象
md.update(st.encode(encoding="utf-8"))
return md.hexdigest()
@staticmethod
def do_sign(data, random_int):
"""加签"""
key = "prod_secret123@muc" + json.dumps(data) + random_int
return Login.md5(key)
def get_login_id(self):
"""获取登录id"""
url = self.path + "/v1/user/login/id/get"
data = {"loginAccount": self.loginAccount,
"stamp": stamp,
"reqId": self.reqId}
sign = self.do_sign(data, self.random_str)
headers = {"sign": sign, "random": self.random_str}
headers.update(self.headers)
res = requests.post(url=url, json=data, headers=headers).json()
return res.get('data').get('loginId')
def do_password(self):
login_id = self.get_login_id()
# 加密拼接
pw = login_id + hashlib.sha256("mm123456".encode("utf-8")).hexdigest() + "ad0ee21d48a64bf49f4fb583ab76e799"
# 如果原始密码是变的,那就用这条
# pw = login_id + hashlib.sha256("mm123456".encode("utf-8")).hexdigest() + self.pwd
password = hashlib.sha256(pw.encode("utf-8")).hexdigest() # 二次加密密码
print("加密后的 password:", password)
return password, login_id
def login(self, data):
url = self.path + "/mj/user/login"
password, login_id = self.do_password()
data = {"iotData": {
"loginAccount": self.loginAccount,
"password": password,
"clientType": 2,
"loginId": login_id,
"iotAppId": "900",
"iampwd": self.iampwd,
"reqId": self.reqId
}, "data": {
"appVersion": "6.2.0",
"osVersion": "13.3",
"appKey": "46579c15",
"deviceId": "990008698831355",
"deviceName": "OD105",
"platform": 2
}, "timestamp": self.timestamp}
sign_ = self.do_sign(data, random_str)
header = {"Connection": "keep-alive",
"Content-Type": "application/json",
"accessToken": "administrator-token",
"sign": sign_,
"random": random_str,
"User-Agent": "IOT2020TEST",
"version": "7.1.0",
"Content-Length": "98",
"Host": "mp-prod.smartmidea.net"}
print(f"{sign_},{password},{self.timestamp}")
res = requests.post(url=url, json=data, headers=header)
print("登录接口请求接口", res.text)
if __name__ == '__main__':
# val = get_login_id()
# login()
...

97
debug/login_debug.py Normal file
View File

@ -0,0 +1,97 @@
import requests
import json
import time
import datetime
import random
import hashlib
now_time = time.time()
timestamp = datetime.datetime.now().strftime('%Y%m%d%H%M%S')
stamp = str(int(round(now_time * 1000)))
reqId = "ga1a838aad7c85992b71bg3f717975d9"
loginAccount = "17328565609"
iampwd = "9056749f0dde456780a336ea05640d0a"
path = "https://mp-prod.smartmidea.net/mas/v5/app/proxy?alias="
random_str = str(random.random())
header = {"Connection": "keep-alive", "Content-Type": "application/json", "accessToken": "administrator-token",
"User-Agent": "IOT2020TEST", "version": "7.1.0", "Content-Length": "98",
"Host": "mp-prod.smartmidea.net"}
def do_sign(dj, r): # 签名
key = "prod_secret123@muc" + json.dumps(dj) + r
m = hashlib.md5()
m.update(key.encode('utf-8'))
sign_ = m.hexdigest()
return sign_
def get_login_id():
global header
global random_str
global stamp
url = path + "/v1/user/login/id/get"
data = {"loginAccount": loginAccount,
"stamp": stamp,
"reqId": reqId}
sign = do_sign(data, random_str)
headers = {"sign": sign, "random": random_str}
header.update(header)
res = requests.post(url=url, json=data, headers=headers).json()
value = res['data']['loginId']
print("loginID:", value)
return value
def do_password():
value = get_login_id()
pw = value + hashlib.sha256("mm123456".encode("utf-8")).hexdigest() + "ad0ee21d48a64bf49f4fb583ab76e799"
password = hashlib.sha256(pw.encode("utf-8")).hexdigest() # 加密密码
print("加密后的 password:", password)
return password, value
def login():
global loginAccount
global iampwd
global reqId
global timestamp
url = path + "/mj/user/login"
password, login_id = do_password()
data = {"iotData": {
"loginAccount": loginAccount,
"password": password,
"clientType": 2,
"loginId": login_id,
"iotAppId": "900",
"iampwd": iampwd,
"reqId": reqId
}, "data": {
"appVersion": "6.2.0",
"osVersion": "13.3",
"appKey": "46579c15",
"deviceId": "990008698831355",
"deviceName": "OD105",
"platform": 2
}, "timestamp": timestamp}
sign_ = do_sign(data, random_str)
header = {"Connection": "keep-alive",
"Content-Type": "application/json",
"accessToken": "administrator-token",
"sign": sign_,
"random": random_str,
"User-Agent": "IOT2020TEST",
"version": "7.1.0",
"Content-Length": "98",
"Host": "mp-prod.smartmidea.net"}
print(f"sign:{sign_}\ndata:{data}\nrandom:{random_str}")
# res = requests.post(url=url, json=data, headers=header)
# print("登录接口请求接口", res.text)
if __name__ == '__main__':
val = get_login_id()
login()
print(len('d8c004809add1768bc130f2bd07380af'))
print(len('940a7447601aca97583f49b06bea7aaa'))

75
debug/password_sign.py Normal file
View File

@ -0,0 +1,75 @@
#!/usr/bin/env python
# encoding: utf-8
"""
@author: kira
@contact: 262667641@qq.com
@file: password_sign.py
@time: 2023/5/8 17:12
@desc:
"""
import sys
import json
import hashlib
import random
import datetime
import time
import requests
now_time = time.time()
timestamp = datetime.datetime.now().strftime('%Y%m%d%H%M%S')
stamp = str(int(round(now_time * 1000)))
reqId = "ga1a838aad7c85992b71bg3f717975d9"
loginAccount = "17328565609"
iampwd = "9056749f0dde456780a336ea05640d0a"
path = "https://mp-prod.smartmidea.net/mas/v5/app/proxy?alias="
random_str = str(random.random())
header = {"Connection": "keep-alive", "Content-Type": "application/json", "accessToken": "administrator-token",
"User-Agent": "IOT2020TEST", "version": "7.1.0", "Content-Length": "98",
"Host": "mp-prod.smartmidea.net"}
class PasswordSign:
def do_sign(self, data): # 签名
"""
Args:
data:请求body 参数
Returns:
"""
key = "prod_secret123@muc" + json.dumps(data) + str(random.random())
m = hashlib.md5()
m.update(key.encode('utf-8'))
sign_ = m.hexdigest()
return sign_
def get_login_id(self):
global header
global random_str
global stamp
url = path + "/v1/user/login/id/get"
data = {"loginAccount": loginAccount,
"stamp": stamp,
"reqId": reqId}
sign_ = self.do_sign(data, random_str)
headers = {"sign": sign_, "random": random_str}
header.update(header)
res = requests.post(url=url, json=data, headers=headers).json()
value = res['data']['loginId']
print("loginID:", value)
return value
def do_password(self):
value = self.get_login_id()
pw = value + hashlib.sha256("mm123456".encode("utf-8")).hexdigest() + "ad0ee21d48a64bf49f4fb583ab76e799"
password = hashlib.sha256(pw.encode("utf-8")).hexdigest() # 加密密码
print("加密后的 password:", password)
return password, value
if __name__ == '__main__':
body = sys.argv[1]
ps = PasswordSign()
pwd, sign = ps.do_password()

30
debug/setting.py Normal file
View File

@ -0,0 +1,30 @@
#!/usr/bin/env python
# _*_ coding:utf-8 _*_
__author__ = 'YinJia'
import os
import sys
BASE_DIR = os.path.dirname(os.path.dirname(__file__))
sys.path.append(BASE_DIR) # 当前文件的绝对路径
# 配置文件
TEST_CONFIG = os.path.join(BASE_DIR, "database", "config.ini") # 拼接config.ini文件绝对路径
# 测试用例模板文件
SOURCE_FILE = os.path.join(BASE_DIR, "database", "DemoAPITestCase.xlsx") # 同上
# excel测试用例结果文件
TARGET_FILE = os.path.join(BASE_DIR, "report", "excelReport", "DemoAPITestCase.xlsx") # 同上
# 测试用例报告
TEST_REPORT = os.path.join(BASE_DIR, "report") # 同上
# 测试用例程序文件
TEST_CASE = os.path.join(BASE_DIR, "testcase") # 同上
# 请求配置
host = "mp-prod.smartmidea.net" # 测试环境 https://mp-sit.smartmidea.net 生产环境 https://mp-prod.smartmidea.net
secret = "prod_secret123@muc" # 测试环境sit_secret123@muc ;生产环境 prod_secret123@muc
Iot_key = "ad0ee21d48a64bf49f4fb583ab76e799" # 测试环境:143320d6c73144d083baf9f5b1a7acc9 生产环境:ad0ee21d48a64bf49f4fb583ab76e799

35
debug/sign.py Normal file
View File

@ -0,0 +1,35 @@
#!/usr/bin/env python
# encoding: utf-8
"""
@author: kira
@contact: 262667641@qq.com
@file: sign.py
@time: 2023/5/8 17:03
@desc:
"""
import sys
import json
import hashlib
import random
def do_sign(data): # 签名
"""
Args:
data:请求body 参数
Returns:
"""
key = "prod_secret123@muc" + json.dumps(data) + str(random.random())
m = hashlib.md5()
m.update(key.encode('utf-8'))
sign_ = m.hexdigest()
return sign_
if __name__ == '__main__':
body = sys.argv[1] # 命令行传过来的参数
sign = do_sign(body)
print(sign)

View File

@ -1,24 +0,0 @@
import datetime
import threading
import time
number = 0
def add():
global number # global声明此处的number是外面的全局变量number
for _ in range(10000000): # 进行一个大数级别的循环加一运算
number += 1
times = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print(f'{times}-"当前活跃的线程个数:{threading.active_count()}"')
print("子线程%s运算结束后number = %s" % (threading.current_thread().getName(), number))
print('------------------------------')
for i in range(2): # 用2个子线程就可以观察到脏数据
t = threading.Thread(target=add)
t.start()
time.sleep(2) # 等待2秒确保2个子线程都已经结束运算。
print("主线程执行完毕后number = ", number)

View File

@ -1,36 +0,0 @@
import threading
class MyLock:
"""锁实现"""
def __init__(self):
self.locked = False
self.locked_by = None
self.count = 0
def acquire(self):
"""枷锁"""
current_threading = threading.current_thread() # 获取当前线程
# 如果被锁了且不是当前线程去锁他,那么开始等待锁释放
if self.locked and self.locked_by != current_threading:
while self.locked:
pass
# 执行枷锁
self.locked = True
self.locked_by = current_threading
self.count += 1
def released(self):
"""释放锁"""
current_threading = threading.current_thread()
# 如果锁没有被锁或者不是当前线程的锁,直接退出释放
if not self.locked or self.locked_by != current_threading:
return
# 执行释放锁
self.count -= 1
if self.locked == 0:
# 锁定次数为0后则完全释放锁
self.locked = False
self.locked_by = None

52
debug/test3.py Normal file
View File

@ -0,0 +1,52 @@
from Crypto.Cipher import AES
from binascii import b2a_hex, a2b_hex
import hashlib
# AES128-ECB模式加密
def AES128_en(data, key):
print("加密前data:" + data)
print("加密key:" + key)
BS = AES.block_size
pad = lambda s: s + (BS - len(s) % BS) * chr(BS - len(s) % BS)
AESCipher = AES.new(key.encode('utf-8'), AES.MODE_ECB)
data = pad(data).encode('utf-8')
encryptcipher = str(b2a_hex(AESCipher.encrypt(data)), "utf-8")
return encryptcipher
# AES128-ECB模式解密
def AES128_de(data, key):
# key = bytes(key, 'utf-8')
unpad = lambda s: s[:-ord(s[len(s) - 1:])]
AESCipher = AES.new(key.encode('utf-8'), AES.MODE_ECB)
paint = unpad(AESCipher.decrypt(a2b_hex(data)))
# print(str(paint, encoding="utf-8"))
return bytes.decode(paint) # bytes类型转成string类型
# oder=aes128encryptoder,aes128decrypt(key, md5appKey的前16位)key是登陆返回的
'''
data1 = "-90,90,1,0,86,0,32,0,59,0,0,0,80,0,55,16,1,2,23,20,-45,88,0,0,0,-65,0,0,0,0,0,0,0,0,0,0,0,0,0,0,-86,29,-84,0,0,0,0,0,2,2,-80,2,26,0,1,1,60,2,8,1,2,102,102,54,0,0,0,113,-74,-13,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0"
data2 = "ebd036cd241dc82a9f67795b65978ecf187c944f79041a1b4b632650e1b4aed9"
Iot_key = "ad0ee21d48a64bf49f4fb583ab76e799"
md5key = hashlib.md5()
md5key.update(Iot_key.encode('utf-8'))
md5_key = md5key.hexdigest()
print(md5_key)
md5_key = md5_key[0:16]
print(md5_key)
AES128_1 = AES128_de(data2, md5_key)
AES128_2 = AES128_en(data1, AES128_1)
print("加密后的内容", AES128_2)
'''
data1 = "c5db3c04b26b67602400e54ee8bf0a95187c944f79041a1b4b632650e1b4aed9"
# data3 = "377cb30aa4b829dd919c8c48df635f77a1f9452bd0b3a9c21d9fe9814c0dc5df" # 国际美居key
data2 = "cdadbd7bbb86abbb775eb29dcd31152497d9d67e36b2435de014508d0dfff8c473e6e6284b6cf824af98a7699c9ca479ae40fc163789b5da66bee4d842d878cf0a585a85b716eb2d8d99df1adc2acbd468caabb3c45d7b63d24d12d164703309ae6599f062d41555110460ceceb402c139f216928c3a7b519468d043f2bdb9f62b6817a08ca424b9a468023652cf60719e49aa0b29f28f95d87f1d8844ac599b68caabb3c45d7b63d24d12d164703309c8379e99730f633f39d8033a868c6e6c75acaa0e9c949861f132314ecc87b9cffaff80d4a869a89398b067403a7c52cda8df8b7db49d7438ddf146f52d405f9fc823eca043bb3b5f32af3daff2a56762b550e38b5b81f12ed49dec8ba10ac984"
md5_Iotkey = "96c7acdfdb8af79a"
re1 = AES128_de(data1, md5_Iotkey)
print(re1)
re2 = AES128_de(data2, re1)
print(re2)
print(type(re2))

View File

@ -1,25 +0,0 @@
#!/usr/bin/env python
# encoding: utf-8
"""
@author: kira
@contact: 262667641@qq.com
@file: thread.py
@time: 2023/4/26 10:24
@desc:
"""
import time
import threading
def ts():
for i in range(10):
print(f"--------ts:{threading.current_thread().name}---------")
time.sleep(2)
print("main_process")
t = threading.Thread(target=ts) # 创建1个线程
t.setDaemon(True) # 设置为守护线程
t.start() # 启动线程
t.join(3) # 阻塞主线程结束等待3秒前置结束

View File

@ -1,18 +0,0 @@
#!/usr/bin/env python
# encoding: utf-8
"""
@author: kira
@contact: 262667641@qq.com
@file: xh.py
@time: 2023/5/5 10:09
@desc:
"""
for i in range(1, 6):
for j in range(1, 9):
if i == 3:
print("不⾛3层.......")
continue # 跳过本次循环,继续下次循环
if i == 4 and j == 4: # 遇到404
print("遇到⻤屋404了,不再继续了")
break # 结束当前循环, 注意只会结束第2层这个⼩循环。
print(f"{i}层-{i}0{j}")

View File

@ -1,37 +1,15 @@
import threading
import copy
num = 0
lit = [1, 2, 3, [4, 5, 6, [7, 8, 9]]]
mutex = threading.Lock()
cl = copy.copy(lit)
lit.append("new")
print(cl)
def add1():
global num
for i in range(10000000):
mutex.acquire()
num += 1
mutex.release()
print(f"add1:{num}")
def add2():
global num
for i in range(10000000):
mutex.acquire()
num += 1
mutex.release()
print(f"add2:{num}")
if __name__ == '__main__':
import threading
t1 = threading.Thread(target=add1)
t2 = threading.Thread(target=add2)
t1.start()
t2.start()
t1.join()
t2.join()
print(num)
a = 1
while a <= 9:
for i in range(1, a + 1):
print(f"{a}*{i}\t", end="")
print("")
a += 1