1. 优化步骤出错处理
2. 精简打印内容
3. 修改截图格式为 png
This commit is contained in:
jerrylizilong 2018-09-14 14:21:06 +08:00
parent bd1cb30af0
commit c36e1d85f0
14 changed files with 336 additions and 340 deletions

View File

@ -1,13 +1,10 @@
from app.core import log, buildCase, atx_steps from app.core import log, buildCase, atx_steps
from app.db import test_batch_manage from app.db import test_batch_manage
import requests
class atx_core(): class atx_core():
# run a signle case. # run a signle case.
# the main method # the main method
def run_case(self, steps, caseNo, deviceList=[]): def run_case(self,steps,caseNo,deviceList = []):
search_result = test_batch_manage.test_batch_manage().search_test_batch_detail1(caseNo, ['ip']) search_result = test_batch_manage.test_batch_manage().search_test_batch_detail1(caseNo, ['ip'])
if len(search_result): if len(search_result):
log.log().logger.info(search_result) log.log().logger.info(search_result)
@ -19,23 +16,23 @@ class atx_core():
steps = steps.split(',') steps = steps.split(',')
steps = buildCase.buildCase().readPublic(steps) steps = buildCase.buildCase().readPublic(steps)
step0 = steps[0].split('|') step0 = steps[0].split('|')
if len(step0) != 2: if len(step0)!=2:
log.log().logger.error('android init is not defined!') log.log().logger.error('android init is not defined!')
return 2, 'init', [] return 2, 'init', []
elif step0[0] != 'Android': elif step0[0]!='Android':
log.log().logger.error('android init is not defined!') log.log().logger.error('android init is not defined!')
return 2, 'init', [] return 2, 'init', []
else: else:
package = step0[1] package = step0[1]
isConnected, device0, u = atx_steps.atx_driver().connectDevice(package, deviceList) isConnected,device0,u = atx_steps.atx_driver().connectDevice(package,deviceList)
log.log().logger.info(' is %s connected? %s' % (device0, isConnected)) log.log().logger.info(' is %s connected? %s' %(device0, isConnected))
if isConnected: if isConnected:
log.log().logger.info('start runnning test on %s' % device0) log.log().logger.info('start runnning test on %s' %device0)
screenFileList = [] screenFileList = []
result = 1 result = 1
for step in steps: for step in steps:
log.log().logger.info('current step is: %s' % step) log.log().logger.info('current step is: %s' %step)
if len(step) == 0: if len(step)==0:
pass pass
else: else:
step = step.split('|') step = step.split('|')
@ -51,26 +48,33 @@ class atx_core():
trytime = 3 trytime = 3
while trytime: while trytime:
log.log().logger.info('try time: %s' % (4 - trytime)) log.log().logger.info('try time: %s' % (4 - trytime))
try: u,result,screenFileList = atx_steps.atx_driver().run_step(u, step_name, detail,caseNo,screenFileList)
u, result, screenFileList = atx_steps.atx_driver().run_step(u, step_name, detail,
caseNo, screenFileList)
except requests.exceptions.ConnectionError as e:
log.log().logger.error(e)
result = 2
if result == 2: if result == 2:
log.log().logger.error('failed at %s : %s, try again!' % (step_name, detail)) log.log().logger.error('failed at %s : %s, try again!' % (step_name, detail))
trytime += -1 trytime += -1
else: else:
trytime = 0 trytime=0
log.log().logger.info('finish step %s : %s.' % (step_name, detail)) log.log().logger.info('finish step %s : %s.' % (step_name, detail))
if trytime == 0 and result == 2: if trytime==0 and result == 2:
log.log().logger.error('failed at %s : %s after trying 3 times!' % (step_name, detail)) log.log().logger.error('failed at %s : %s after trying 3 times!' % (step_name, detail))
break break
else: else:
pass pass
return result, step_name, screenFileList return result,step_name,screenFileList
else: else:
log.log().logger.info('package is not found in any device!') log.log().logger.info('package is not found in any device!')
return 2, 'package not found', [] return 2, 'package not found', []
# deviceList = ['172.16.131.102']
# steps = 'Android|com.ghw.sdk2,点击|id@@com.ghw.sdk2:id/btn_login,等待|5,点击|name@@FACEBOOK登录,等待|10'
# for i in range(200):
# steps += ',点击|name@@FACEBOOK登录,等待|10'
#
# while 1:
# try:
# atx_core().run_case(steps,1000,deviceList)
# except FileNotFoundError as e:
# print(e)

View File

@ -1,48 +1,46 @@
import requests
import time import time
import uiautomator2 as ut2 import uiautomator2 as ut2
from app.core import log, util, buildCase, hubs from app.core import log, util, hubs
class atx_driver(): class atx_driver():
# connect to an availble android device. if deviceList is not given, it will try to connect to availbe device in database. # connect to an availble android device. if deviceList is not given, it will try to connect to availbe device in database.
def connectDevice(self, package, deviceList=[]): def connectDevice(self,package,deviceList=[]):
if len(deviceList) == 0: if len(deviceList)==0:
deviceList = hubs.hubs().getDevices() deviceList = hubs.hubs().getDevices()
isConnected = 0 isConnected = 0
device0 = '' device0 = ''
for device in deviceList: for device in deviceList:
isConnected, u = self.init(device) isConnected,u = self.init(device)
if isConnected: if isConnected:
if self.start_app(u, package) != 0: if self.start_app(u,package)!=0:
device0 = device device0 = device
return isConnected, device0, u return isConnected, device0, u
else: else:
isConnected = 0 isConnected=0
device0 = '' device0 = ''
log.log().logger.error('package not found!') log.log().logger.error('package not found!')
return 0, '', '' return 0,'',''
# init and try to unlock the device. # init and try to unlock the device.
def init(self, device=''): def init(self,device=''):
log.log().logger.info('trying to connect device : %s' % device) log.log().logger.info('trying to connect device : %s' %device)
try: try:
u = ut2.connect(device) u = ut2.connect(device)
log.log().logger.info(u.device_info) log.log().logger.info(u.device_info)
isConnected = 1 isConnected = 1
except requests.exceptions.ConnectionError as e: except :
log.log().logger.error(e)
isConnected = 0 isConnected = 0
u = '' u = ''
return isConnected, u return isConnected,u
# 通过 adb 查询当前包是否已安装。 需要USB 链接,暂时不是使用。 # 通过 adb 查询当前包是否已安装。 需要USB 链接,暂时不是使用。
def is_app_exist0(self, package): def is_app_exist0(self,package):
import subprocess import subprocess
cmd = 'dumpsys package %s | grep version' % package cmd = 'dumpsys package %s | grep version' %package
cmds = [cmd, "exit", ] cmds = [cmd, "exit", ]
pipe = subprocess.Popen("adb shell", stdin=subprocess.PIPE, stdout=subprocess.PIPE) pipe = subprocess.Popen("adb shell", stdin=subprocess.PIPE, stdout=subprocess.PIPE)
@ -52,25 +50,23 @@ class atx_driver():
return 'versionCode' in str(code) return 'versionCode' in str(code)
# 通过 adb 查询当前包是否已安装。 不需要USB 链接,目前使用 # 通过 adb 查询当前包是否已安装。 不需要USB 链接,目前使用
def is_app_exist(self, u, package): def is_app_exist(self,u,package):
cmd = 'dumpsys package %s | grep version' % package cmd = 'dumpsys package %s | grep version' % package
result = u.adb_shell(cmd) result = u.adb_shell(cmd)
log.log().logger.info(result) log.log().logger.info(result)
return 'versionCode' in str(result) return 'versionCode' in str(result)
# start the testing app. for example , com.ghw.sdk2 # start the testing app. for example , com.ghw.sdk2
def start_app(self, u, package): def start_app(self,u, package):
# try to unlock device. # try to unlock device.
try: try:
log.log().logger.info('trying to unlock...') log.log().logger.info('trying to unlock...')
u.unlock() u.unlock()
except requests.exceptions.ReadTimeout as e: except:
log.log().logger.error(e)
try: try:
log.log().logger.info('trying to unlock again...') log.log().logger.info('trying to unlock again...')
u.app_start('xdf.android_unlock') u.app_start('xdf.android_unlock')
except requests.exceptions.ReadTimeout as e: except:
log.log().logger.error(e) pass
time.sleep(2) time.sleep(2)
log.log().logger.info('trying to swipe up...') log.log().logger.info('trying to swipe up...')
u.swipe_points([(0.509, 0.601), (0.503, 0.149)], 0.2) u.swipe_points([(0.509, 0.601), (0.503, 0.149)], 0.2)
@ -79,24 +75,23 @@ class atx_driver():
u.app_clear(package) u.app_clear(package)
u.app_start(package, stop=True, unlock=True) u.app_start(package, stop=True, unlock=True)
time.sleep(2) time.sleep(2)
if self.is_app_exist(u, package): if self.is_app_exist(u,package):
log.log().logger.info('package found, start testing!') log.log().logger.info('package found, start testing!')
time.sleep(2) time.sleep(2)
u1, result1 = self.click(u, 'name', '允许') u1, result1 =self.click(u,'name','允许')
time.sleep(1) time.sleep(1)
if result1 == 2: if result1==2:
u1, result1 = self.click(u, 'name', '始终允许') u1, result1 = self.click(u, 'name', '始终允许')
time.sleep(1) time.sleep(1)
if result1 == 2: if result1==2:
u1, result1 = self.click(u, 'name', 'ALLOW') u1, result1 = self.click(u, 'name', 'ALLOW')
time.sleep(1) time.sleep(1)
return u return u
else: else:
log.log().logger.error('package is not found in this device , try to connect another !') log.log().logger.error('package is not found in this device , try to connect another !')
return 0 return 0
# save a screenshot to corresponding folder. # save a screenshot to corresponding folder.
def take_screenshot(self, u, type, caseNo, screenFileList): def take_screenshot(self,u, type,caseNo,screenFileList):
fileName, fileName1 = util.util().screenshot(type, caseNo) fileName, fileName1 = util.util().screenshot(type, caseNo)
image = u.screenshot() image = u.screenshot()
image.save(fileName) image.save(fileName)
@ -106,8 +101,8 @@ class atx_driver():
# type text into the target element. # type text into the target element.
# method : id, name the method used to locate the target element. # method : id, name the method used to locate the target element.
# resource_id : target element's attribute, with which to locate target. # resource_id : target element's attribute, with which to locate target.
def type_text(self, u, method, resource_id, text): def type_text(self,u,method,resource_id, text):
if method == 'id': if method== 'id':
if u(resourceId=resource_id).exists: if u(resourceId=resource_id).exists:
u(resourceId=resource_id).send_keys(text) u(resourceId=resource_id).send_keys(text)
time.sleep(1) time.sleep(1)
@ -115,9 +110,9 @@ class atx_driver():
u.adb_shell('input', 'keyevent', 'BACK') u.adb_shell('input', 'keyevent', 'BACK')
time.sleep(1) time.sleep(1)
else: else:
log.log().logger.error(u"出错了,没有找到元素! by %s , %s" % (method, resource_id)) log.log().logger.error(u"出错了,没有找到元素! by %s , %s" %(method,resource_id))
result = 2 result = 2
elif method == 'name': elif method== 'name':
if u(text=resource_id).exists: if u(text=resource_id).exists:
u(text=resource_id).send_keys(text) u(text=resource_id).send_keys(text)
time.sleep(1) time.sleep(1)
@ -127,7 +122,7 @@ class atx_driver():
else: else:
log.log().logger.error(u"出错了,没有找到元素! by %s , %s" % (method, resource_id)) log.log().logger.error(u"出错了,没有找到元素! by %s , %s" % (method, resource_id))
result = 2 result = 2
elif method == 'class': elif method== 'class':
if u(className=resource_id).exists: if u(className=resource_id).exists:
u(className=resource_id).send_keys(text) u(className=resource_id).send_keys(text)
time.sleep(1) time.sleep(1)
@ -138,11 +133,11 @@ class atx_driver():
log.log().logger.error(u"出错了,没有找到元素! by %s , %s" % (method, resource_id)) log.log().logger.error(u"出错了,没有找到元素! by %s , %s" % (method, resource_id))
result = 2 result = 2
else: else:
log.log().logger.error(u"元素方法未定义! %s" % method) log.log().logger.error(u"元素方法未定义! %s" %method)
result = 2 result = 2
return u, result return u,result
def click(self, u, by, text): def click(self,u,by,text):
result = 2 result = 2
if by == 'name': if by == 'name':
u, result = self.click_text(u, text) u, result = self.click_text(u, text)
@ -152,10 +147,10 @@ class atx_driver():
u, result = self.click_description(u, text) u, result = self.click_description(u, text)
elif by == 'class': elif by == 'class':
u, result = self.click_class(u, text) u, result = self.click_class(u, text)
return u, result return u,result
# click an element locating by it's text name. # click an element locating by it's text name.
def click_text(self, u, text): def click_text(self,u, text):
if u(text=text).exists: if u(text=text).exists:
u(text=text).click() u(text=text).click()
time.sleep(1) time.sleep(1)
@ -163,10 +158,10 @@ class atx_driver():
else: else:
log.log().logger.error(u"出错了,没有找到元素! by %s , %s" % ('text', text)) log.log().logger.error(u"出错了,没有找到元素! by %s , %s" % ('text', text))
result = 2 result = 2
return u, result return u,result
# click an element locating by it's resourceId. # click an element locating by it's resourceId.
def click_id(self, u, id): def click_id(self,u, id):
if u(resourceId=id).exists: if u(resourceId=id).exists:
u(resourceId=id).click() u(resourceId=id).click()
time.sleep(1) time.sleep(1)
@ -177,7 +172,7 @@ class atx_driver():
return u, result return u, result
# click an element locating by it's description. # click an element locating by it's description.
def click_description(self, u, id): def click_description(self,u, id):
if u(description=id).exists: if u(description=id).exists:
u(description=id).click() u(description=id).click()
time.sleep(1) time.sleep(1)
@ -188,7 +183,7 @@ class atx_driver():
return u, result return u, result
# click an element locating by it's className. # click an element locating by it's className.
def click_class(self, u, id): def click_class(self,u, id):
if u(className=id).exists: if u(className=id).exists:
u(className=id).click() u(className=id).click()
time.sleep(1) time.sleep(1)
@ -199,21 +194,21 @@ class atx_driver():
return u, result return u, result
# Send event to data collection. used to test sending data in SDK demo. # Send event to data collection. used to test sending data in SDK demo.
def sendData(self, u, name): def sendData(self, u,name):
u, result = self.click_text(u, name) u, result=self.click_text(u,name)
time.sleep(2) time.sleep(2)
if result == 1: if result==1:
if u(text='发送').exists: if u(text='发送').exists:
u, result = self.click_text(u, '发送') u, result =self.click_text(u,'发送')
else: else:
u, result = self.click_text(u, 'SEND') u, result =self.click_text(u, 'SEND')
u.adb_shell('input', 'keyevent', 'BACK') u.adb_shell('input', 'keyevent', 'BACK')
time.sleep(1) time.sleep(1)
return u, result return u,result
# run step. # run step.
# if a new step type is defined, it should be added to the step_name list. # if a new step type is defined, it should be added to the step_name list.
def run_step(self, u, step_name, detail, caseNo, screenFileList): def run_step(self,u, step_name, detail,caseNo,screenFileList):
result = 1 result = 1
if step_name == 'Android': if step_name == 'Android':
time.sleep(1) time.sleep(1)
@ -226,17 +221,17 @@ class atx_driver():
elif step_name == '发送': elif step_name == '发送':
u, result = self.sendData(u, detail[0]) u, result = self.sendData(u, detail[0])
elif step_name == '填写': elif step_name == '填写':
u, result = self.type_text(u, detail[0], detail[1], detail[2]) u, result = self.type_text(u, detail[0],detail[1],detail[2])
elif step_name == '返回': elif step_name == '返回':
u.adb_shell('input', 'keyevent', 'BACK') u.adb_shell('input', 'keyevent', 'BACK')
time.sleep(1) time.sleep(1)
elif step_name == '截图': elif step_name == '截图':
screenFileList = self.take_screenshot(u, 'normal', caseNo, screenFileList) screenFileList=self.take_screenshot(u, 'normal',caseNo,screenFileList)
else: else:
result = 2 result=2
log.log().logger.error('method is not defined : %s' % step_name) log.log().logger.error('method is not defined : %s' %step_name)
if result == 0: if result == 0:
log.log().logger.error('package is not fould!') log.log().logger.error('package is not fould!')
elif result == 2: elif result == 2:
screenFileList = self.take_screenshot(u, 'fail', caseNo, screenFileList) screenFileList =self.take_screenshot(u, 'fail', caseNo, screenFileList)
return u, result, screenFileList return u,result,screenFileList

View File

@ -26,6 +26,8 @@ class buildCase(object):
else: else:
elementTemplate = '' elementTemplate = ''
element = '' element = ''
if len(paraValue) == paraCount-1:
paraValue.append('')
if len(paraValue) in (paraCount, paraCount+1) and paraCount > 0 and paraCount < 5: if len(paraValue) in (paraCount, paraCount+1) and paraCount > 0 and paraCount < 5:
if paraValue[0] == 'css': if paraValue[0] == 'css':
paraValue[0] = 'css_selector' paraValue[0] = 'css_selector'
@ -73,8 +75,11 @@ class buildCase(object):
def build_case(self,keyword,steps): def build_case(self,keyword,steps):
paraCount,template, elementTemplate = keywords.keywords().getPara(keyword) paraCount,template, elementTemplate = keywords.keywords().getPara(keyword)
conmod, element = self.convertToComend(template,paraCount,steps,elementTemplate) if paraCount=='':
return conmod,element return '',''
else:
conmod, element = self.convertToComend(template,paraCount,steps,elementTemplate)
return conmod,element
def readPublic(self,caseList0) : def readPublic(self,caseList0) :
@ -155,4 +160,4 @@ class buildCase(object):
newstep.append(step0) newstep.append(step0)
else: else:
newstep='' newstep=''
return newstep return newstep,case

View File

@ -1,4 +1,3 @@
from selenium import webdriver from selenium import webdriver
from selenium.common.exceptions import * from selenium.common.exceptions import *
from app.core import log,hubs from app.core import log,hubs
@ -10,19 +9,21 @@ class coredriver():
server_url = {} server_url = {}
import random import random
i = random.randint(0,100)%len(servers) i = random.randint(0,100)%len(servers)
log.log().logger.info('%s, %s' %(i,len(servers))) log.log().logger.debug('%s, %s' %(i,len(servers)))
server_url['hostname']=servers[i][0] server_url['hostname']=servers[i][0]
server_url['port'] = servers[i][1] server_url['port'] = servers[i][1]
return "http://%s:%s/wd/hub" %(server_url['hostname'],server_url['port']) return "http://%s:%s/wd/hub" %(server_url['hostname'],server_url['port'])
# init the driver. if runType is Android, the package is required
def iniDriver(self, runType,devicename = ''): def iniDriver(self, runType,devicename = ''):
log.log().logger.info('RUNTYPE is : %s' %runType) log.log().logger.debug('RUNTYPE is : %s' %runType)
servers = hubs.hubs().showHubs(runType) servers = hubs.hubs().showHubs(runType)
log.log().logger.info(servers) log.log().logger.debug(servers)
if len(servers): if len(servers):
server_url = self.serverUrl(servers) server_url = self.serverUrl(servers)
if runType == 'Chrome': if runType == 'Chrome':
desired_caps_web = webdriver.DesiredCapabilities.CHROME desired_caps_web = webdriver.DesiredCapabilities.CHROME.copy()
deviceList = ['Galaxy S5', 'Nexus 5X', 'Nexus 6P', 'iPhone 6', 'iPhone 6 Plus', 'iPad', 'iPad Pro'] deviceList = ['Galaxy S5', 'Nexus 5X', 'Nexus 6P', 'iPhone 6', 'iPhone 6 Plus', 'iPad', 'iPad Pro']
if devicename!='' : if devicename!='' :
if devicename not in deviceList: if devicename not in deviceList:
@ -40,15 +41,15 @@ class coredriver():
} }
desired_caps_web['goog:chromeOptions']=chrome_option desired_caps_web['goog:chromeOptions']=chrome_option
elif runType == 'Firefox': elif runType == 'Firefox':
desired_caps_web = webdriver.DesiredCapabilities.FIREFOX desired_caps_web = webdriver.DesiredCapabilities.FIREFOX.copy()
else: else:
log.log().logger.info("browser not support! %s " %runType) log.log().logger.error("browser not support! %s " %runType)
return 0 return 0
log.log().logger.info(desired_caps_web) log.log().logger.debug(desired_caps_web)
retry = 2 retry = 2
while retry: while retry:
try: try:
log.log().logger.info("start session %s time!" %(3-retry)) log.log().logger.debug("start session %s time!" %(3-retry))
driver = webdriver.remote.webdriver.WebDriver(command_executor=server_url,desired_capabilities=desired_caps_web) driver = webdriver.remote.webdriver.WebDriver(command_executor=server_url,desired_capabilities=desired_caps_web)
break break
except WebDriverException as e: except WebDriverException as e:
@ -64,14 +65,18 @@ class coredriver():
if retry == 0: if retry == 0:
return 0 return 0
else: else:
try: if devicename =='':
driver.maximize_window() try:
log.log().logger.info(driver.get_window_size()['height']) driver.maximize_window()
if driver.get_window_size()['height']<1920: if driver.get_window_size()['height']<1920:
driver.set_window_size(1920, 1080) #最大化失败,则手动 log.log().logger.debug(driver.get_window_size()['height'])
except WebDriverException as e: driver.set_window_size(1920, 1080) #最大化失败,则手动
log.log().logger.info(e) except WebDriverException as e:
driver.set_window_size(1920, 1080) #最大化失败,则手动 # log.log().logger.info(e)
try:
driver.set_window_size(1920, 1080) #最大化失败,则手动
except WebDriverException as e:
log.log().logger.info(e)
return driver return driver
else: else:
return 0 return 0

View File

@ -5,7 +5,7 @@ from app.db import test_task_manage
def main(): def main():
test_task_manage.test_task_manage().update_test_suite_check() # test_task_manage.test_task_manage().update_test_suite_check()
idList = test_task_manage.test_task_manage().test_suite_list() idList = test_task_manage.test_task_manage().test_suite_list()
idList1 = test_task_manage.test_task_manage().test_case_list() idList1 = test_task_manage.test_task_manage().test_case_list()
# log.log().logger.info(idList) # log.log().logger.info(idList)
@ -18,25 +18,31 @@ def main():
if runType =='0' or runType =='Android': if runType =='0' or runType =='Android':
threadNum = 1 threadNum = 1
runType ='Android' runType ='Android'
elif runType =='1' or runType =='iOS':
threadNum = 1
runType ='iOS'
elif runType =='2' or runType =='Chrome': elif runType =='2' or runType =='Chrome':
threadNum = 6 threadNum = 6
runType = 'Chrome' runType = 'Chrome'
else: else:
threadNum = 0 threadNum = 0
runType = '' runType = ''
if runType == '': # if runType == '':
test_task_manage.test_task_manage().update_test_suite(test_suite_id, '3') # test_task_manage.test_task_manage().update_test_suite(test_suite_id, '3')
else: # else:
test_task_manage.test_task_manage().update_test_suite(test_suite_id, '2') # test_task_manage.test_task_manage().update_test_suite(test_suite_id, '2')
process.process().runmain(test_suite_id, threadNum, runType) process.process().runmain(test_suite_id, threadNum, runType)
# test_task_manage.test_task_manage().update_test_suite_check()
result1 = 0 result1 = 0
else: else:
# log.log().logger.info('no test is available!')
result1=1 result1=1
if len(idList1): if len(idList1):
threadNum = 1 threadNum = 1
process.process().multipleRun(idList1, threadNum) process.process().multipleRun(idList1, threadNum)
result2 = 0 result2 = 0
else: else:
# log.log().logger.info('no test is available!')
result2=1 result2=1
result = result1 +result2 result = result1 +result2
return result return result

View File

@ -15,17 +15,13 @@ class extend():
elif 'text' in method: elif 'text' in method:
method = By.PARTIAL_LINK_TEXT method = By.PARTIAL_LINK_TEXT
elements = driver.find_elements(by=method,value=value) elements = driver.find_elements(by=method,value=value)
if len(elements)==0: if len(elements)>0 and is_displayed:
return None
elif len(elements)==1:
return elements[0]
elif is_displayed:
for element in elements: for element in elements:
if element.is_displayed(): if element.is_displayed():
return element return element
return None else:
else: elements.remove(element)
return elements[0] return elements[0]
def find_elements(self,driver,para_list): def find_elements(self,driver,para_list):
method, value = para_list[0], para_list[1] method, value = para_list[0], para_list[1]
@ -38,7 +34,6 @@ class extend():
elements = driver.find_elements(by=method,value=value) elements = driver.find_elements(by=method,value=value)
return elements return elements
def switchIframe(self,driver,para_list): def switchIframe(self,driver,para_list):
method, value = para_list[0], para_list[1] method, value = para_list[0], para_list[1]
# 切换 iframe # 切换 iframe
@ -54,72 +49,42 @@ class extend():
time.sleep(2) time.sleep(2)
def screenshot(self,driver,id,screenFileList,isError=False): def screenshot(self,driver,id,screenFileList,isError=False):
result = 2 result = '2'
if isError: if isError:
fileName, fileName1 = util.util().screenshot('error', id) fileName, fileName1 = util.util().screenshot('error', id)
else: else:
fileName, fileName1 = util.util().screenshot('normal', id) fileName, fileName1 = util.util().screenshot('normal', id)
try: log.log().logger.debug(fileName)
# print(fileName) driver.save_screenshot(fileName)
driver.save_screenshot(fileName) screenFileList.append(fileName1)
screenFileList.append(fileName1) result = '1'
result = 1
except requests.exceptions.ConnectionError as e:
log.log().logger.error(e)
except selenium.common.exceptions.WebDriverException as e:
log.log().logger.error(e)
return result, screenFileList return result, screenFileList
def assert_text(self,driver,text): def assert_text(self,driver,text):
try: elements = driver.find_elements(by='xpath', value="//*[contains(.,'" + text + "')]")
elements = driver.find_elements(by='xpath', value="//*[contains(.,'" + text + "')]") assert len(elements)
except NoSuchElementException as e:
log.log().logger.info(e)
elements = []
if len(elements) > 0:
result = '1'
else:
result = '2'
log.log().logger.info('verify result is : %s' % result)
return result
def assert_title(self, driver, text):
if text in driver.title:
result = '1'
else:
result = '2'
log.log().logger.info('verify result is : %s' % result)
return result
def assert_title(self,driver,text):
log.log().logger.info('目标文本:%s 期待包含文本:%s' % (driver.title, text))
assert text in driver.title
def assert_element_text(self,driver,para_list,isNot=False): def assert_element_text(self,driver,para_list,isNot=False):
result = '2' # para_list=str(para_list).split(',')
para_list=str(para_list).split(',') text0 = ''
if len(para_list)==3: if len(para_list)==3:
method, value, text = para_list[0],para_list[1],para_list[2] method, value, text = para_list[0],para_list[1],para_list[2]
text0 = '' element = self.find_element(driver, [method, value])
try: text0 = element.text
element = self.find_element(driver, [method, value])
text0 = element.text
except NoSuchElementException as e:
log.log().logger.info(e)
except AttributeError as e:
log.log().logger.info(e)
if not len(text0): if not len(text0):
try: text0=element.get_attribute('value')
text0=element.get_attribute('value') # log.log().logger.info('目标文本:%s 期待文本:%s' % (text0,text))
except NoSuchElementException as e:
log.log().logger.info(e)
except AttributeError as e:
log.log().logger.info(e)
log.log().logger.info('目标文本:%s 期待文本:%s' %(text0,text))
if (text in str(text0)):
result = '1'
if isNot: if isNot:
if result !=1: log.log().logger.info('目标文本:%s 期待不包含文本:%s' % (text0, text))
result =1 assert (text in str(text0))==False
log.log().logger.info('verify result is : %s' % result) else:
return result log.log().logger.info('目标文本:%s 期待包含文本:%s' % (text0, text))
assert (text in str(text0))
def select(self,driver,para_list): def select(self,driver,para_list):
@ -135,7 +100,7 @@ class extend():
option_method = 'visible_text' option_method = 'visible_text'
comd = 'Select(driver.find_element_by_%s("%s")).select_by_%s("%s")' % ( comd = 'Select(driver.find_element_by_%s("%s")).select_by_%s("%s")' % (
method, value, option_method, option_value) method, value, option_method, option_value)
log.log().logger.info(comd) log.log().logger.debug(comd)
exec(comd) exec(comd)
time.sleep(2) time.sleep(2)
@ -175,39 +140,27 @@ class extend():
driver.find_element_by_partial_link_text(text).click() driver.find_element_by_partial_link_text(text).click()
time.sleep(2) time.sleep(2)
def click_text(self,driver,text): def click_text(self,driver,text):
elements = driver.find_elements(by='xpath', value="//*[contains(.,'" + text + "')]") elements = driver.find_elements(by='xpath', value="//*[contains(.,'" + text + "')]")
for element in elements: for element in elements:
try: element.click()
element.click()
except NoSuchElementException as e:
log.log().logger.info(e)
time.sleep(2) time.sleep(2)
def try_click(self,driver,para_list): def try_click(self,driver,para_list):
# para_list=str(para_list).split(',')
if len(para_list)==2: if len(para_list)==2:
method, value = para_list[0],para_list[1] method, value = para_list[0],para_list[1]
# print(value) for i in range(3):
element = self.find_element(driver, [method, value]) try:
if element: self.find_element(driver, [method, value]).click()
for i in range(3): break
try: except:
element.click() pass
except NoSuchElementException as e:
log.log().logger.info(e)
time.sleep(2) time.sleep(2)
def click_index(self,driver,para_list): def click_index(self,driver,para_list):
method, value, index = para_list[0], para_list[1], para_list[2] method, value, index = para_list[0], para_list[1], para_list[2]
elements = self.find_elements(driver, [method, value]) elements = self.find_elements(driver, [method, value])
if len(elements): elements[int(index)].click()
for i in range(3):
try:
elements[int(index)].click()
except NoSuchElementException as e:
log.log().logger.info(e)
time.sleep(2) time.sleep(2)
def fill_on_date(self,driver,para_list): def fill_on_date(self,driver,para_list):
@ -222,4 +175,26 @@ class extend():
if len(str(t))==0: if len(str(t))==0:
t=2 t=2
else: else:
time.sleep(int(t)) time.sleep(int(t))
def move_to(self,driver,para_list):
if len(para_list)==2:
method, value = para_list[0],para_list[1]
for i in range(3):
try:
from selenium.webdriver.common.action_chains import ActionChains
ActionChains(driver).move_to_element(to_element=self.find_element(driver, [method, value])).perform()
break
except:
pass
time.sleep(2)
def fill(self,driver,para_list,text, is_displayed = True):
element = self.find_element(driver, para_list,is_displayed)
# element = driver.find_element(by=para_list[0],value=para_list[1])
element.clear()
element.send_keys(text)
def fill_file(self,driver,para_list,text, is_displayed = False):
self.fill(driver,para_list,text, is_displayed = False)

View File

@ -1,4 +1,4 @@
from app import useDB, config from app import useDB,config
from app.core import log,util from app.core import log,util
import json import json
class hubs(): class hubs():
@ -70,7 +70,7 @@ class hubs():
else: else:
self.updateHub(hub[0],hub[1],'0','0') self.updateHub(hub[0],hub[1],'0','0')
if len(hubs) == 0: if len(hubs) == 0:
log.log().logger.error('no hubs is availabe!') log.log().logger.debug('no hubs is availabe!')
return hubs return hubs
def checkHubs(self): def checkHubs(self):
@ -99,7 +99,7 @@ class hubs():
else: else:
sql = "select id, ip, port, androidConnect,status from test_hubs;" sql = "select id, ip, port, androidConnect,status from test_hubs;"
list = useDB.useDB().search(sql) list = useDB.useDB().search(sql)
log.log().logger.info('cases : %s' %list) log.log().logger.debug('cases : %s' %list)
results = [] results = []
for i in range(len(list)): for i in range(len(list)):
result = {} result = {}
@ -112,7 +112,7 @@ class hubs():
return results return results
def getDevices(self): def getDevices(self):
url = config.ATXHost + '/list' url = config.ATXHost+'/list'
response, content = util.util().send(url) response, content = util.util().send(url)
content = json.loads(content) content = json.loads(content)
deviceList = [] deviceList = []
@ -120,22 +120,21 @@ class hubs():
if device['present']: if device['present']:
deviceList.append(device['ip'] + ':7912') deviceList.append(device['ip'] + ':7912')
else: else:
# log.log().logger.info(device['ip'] + ' is not ready!') log.log().logger.debug(device['ip'] + ' is not ready!')
pass
return deviceList return deviceList
# 获取设备列表信息 #获取设备列表信息
def getDevicesList(self): def getDevicesList(self):
url = config.ATXHost + '/list' url = config.ATXHost+'/list'
response, content = util.util().send(url) response, content = util.util().send(url)
content = json.loads(content) content = json.loads(content)
deviceLists = [] deviceLists=[]
for device in content: for device in content:
deviceList = {} deviceList = {}
if device['present']: if device['present']:
deviceList["ip"] = device['ip'] + ':7912' deviceList["ip"]=device['ip'] + ':7912'
deviceList["model"] = device['model'] deviceList["model"]=device['model']
deviceLists.append(deviceList) deviceLists.append(deviceList)
else: else:
log.log().logger.info(device['ip'] + ' is not ready!') log.log().logger.debug(device['ip'] + ' is not ready!')
return deviceLists return deviceLists

View File

@ -3,10 +3,37 @@ import string
from app import useDB from app import useDB
class keywords(object): class keywords(object):
def __init__(self):
self.keywords = [
{'index': '前往', 'paraCount': 1, 'template': 'driver.get("http://$para1")'},
{'index': '点击', 'paraCount': 2, 'template':'driver.element_by_$para1("$para2").click()'},
{'index': '填写', 'paraCount': 3, 'template': 'driver.element_by_$para1("$para2").send_keys("$para3")'},
{'index': '等待', 'paraCount': 1, 'template': 'time.sleep($para1)'},
{'index': '点击菜单', 'paraCount': 1, 'template': 'driver.element_by_partial_link_text("$para1").click()'},
{'index': '点击文字', 'paraCount': 1, 'template': 'driver.element_by_partial_link_text("$para1").click()'},
{'index': '悬浮点击', 'paraCount': 2, 'template':'driver.element_by_$para1("$para2").click()'},
{'index': '选择', 'paraCount': 4, 'template': 'Select(driver.element_by_$para1("$para2")).select_by_$para3("$para4")'},
{'index': '验证', 'paraCount': 1, 'template': 'time.sleep(1)'},
{'index': '验证文字', 'paraCount': 3, 'template': 'result1=(driver.element_by_$para1("$para2").text)'},
{'index': '截图', 'paraCount': 1, 'template': 'driver.save_screenshot("%s" %$para1)'}
]
def getPara1(self, keyword):
# print(self.keywords)
keywords = self.keywords
result = 0
log.log().logger.info(keyword)
for list in keywords:
if list['index']==keyword:
result =1
return list['paraCount'], list['template']
break
if result ==0:
return '',''
def getPara(self, keyword): def getPara(self, keyword):
result = 0 result = 0
log.log().logger.info(keyword) # log.log().logger.info(keyword)
sql = string.Template( sql = string.Template(
"select paraCount, template, elementTemplate from `test_keyword` where `keyword`= '$index' limit 1;") "select paraCount, template, elementTemplate from `test_keyword` where `keyword`= '$index' limit 1;")
sql = sql.substitute(index=keyword) sql = sql.substitute(index=keyword)

View File

@ -2,13 +2,13 @@ import queue
import threading import threading
import time import time
from multiprocessing.dummy import Pool as ThreadPool from multiprocessing.dummy import Pool as ThreadPool
import urllib,http from retrying import retry
import requests
from selenium.common.exceptions import * from selenium.common.exceptions import *
from selenium.webdriver.support.select import Select from selenium.webdriver.support.select import Select
from app import useDB, config from selenium.webdriver.common.keys import Keys
from app.core import buildCase, log, hubs, coredriver, util, extend, atx_core from selenium.webdriver.common.action_chains import ActionChains
from app import config
from app.core import buildCase, log, hubs, coredriver, util, atx_core,extend
from app.db import test_task_manage,test_batch_manage from app.db import test_task_manage,test_batch_manage
isUseATX = config.isUseATX isUseATX = config.isUseATX
@ -23,9 +23,9 @@ class process():
browserType = case[2] browserType = case[2]
case = case[1] case = case[1]
screenFileList = [] screenFileList = []
devicename = '' package = ''
log.log().logger.info('current case is : %s' %case) log.log().logger.info('开始执行 id %-10d | 用例 %s ' %(id,case))
newstep = buildCase.buildCase().getCase(case) newstep,case = buildCase.buildCase().getCase(case)
if len(newstep): if len(newstep):
runType = newstep[0][0] runType = newstep[0][0]
if len(newstep[0][1]) == 1 : if len(newstep[0][1]) == 1 :
@ -33,13 +33,13 @@ class process():
package = newstep[0][1][0] package = newstep[0][1][0]
if runType == 'Android' and isUseATX: if runType == 'Android' and isUseATX:
# 使用 atx 执行 Android 用例 # 使用 atx 执行 Android 用例
result, stepN, screenFileList = atx_core.atx_core().run_case(case, id, deviceList=deviceList) result, stepN,screenFileList = atx_core.atx_core().run_case(case,id,deviceList=deviceList)
log.log().logger.info('%s, %s, %s' % (result, stepN, screenFileList)) log.log().logger.info('%s, %s, %s' %(result, stepN,screenFileList))
else: else:
# 使用 selenium 执行 web 用例 # 使用 selenium 执行 web 用例
if browserType !=''and devicename =='': if browserType !=''and package =='':
runType = browserType runType = browserType
driver = coredriver.coredriver().iniDriver(runType,devicename=devicename) driver = coredriver.coredriver().iniDriver(runType,devicename=package)
result = '2' result = '2'
stepN = 'init' stepN = 'init'
if driver == 0: # 没有可执行的节点,无法执行 if driver == 0: # 没有可执行的节点,无法执行
@ -52,84 +52,33 @@ class process():
stepN = 'no steps!' stepN = 'no steps!'
else: else:
for i in range(1,len(newstep)): # 开始逐个步骤执行 for i in range(1,len(newstep)): # 开始逐个步骤执行
steps = newstep[i] stepN = case[i].replace('"',"'")
keyword = steps[0] # result, stepN, screenFileList = self.do_step(driver, newstep[i], screenFileList)
comed , element = buildCase.buildCase().build_case(keyword, steps[1]) # 转换为可执行语句 try:
stepN = keyword result,stepN, screenFileList=self.do_step(driver,newstep[i],stepN,id,screenFileList)
tryTimes = 0 except:
result = '2' result = '2'
while 1: log.log().logger.error('id %-10d | 失败步骤:%s ' %(id,stepN))
if keyword=='验证文字': # ("id %-10d | 关键字: %-20s | 步骤:%-60s | 命令: %s" % (id, keyword, case, comed))
result = extend.extend().assert_element_text(driver,comed) if result == '2':
elif keyword=='验证文字非': trytime = 2
result = extend.extend().assert_element_text(driver,comed,isNot=True) while trytime:
elif keyword=='验证':
result = extend.extend().assert_text(driver,comed)
elif keyword=='截图':
result, screenFileList = extend.extend().screenshot(driver,id,screenFileList)
else:
try: try:
log.log().logger.info(comed) result1, screenFileList = extend.extend().screenshot(driver, id, screenFileList,
result = '1' True)
exec(comed) #执行语句 break
except requests.exceptions.HTTPError as e: except UnexpectedAlertPresentException as e:
log.log().logger.error(e) log.log().logger.info(e)
result = '2' time.sleep(5)
except urllib.error.URLError as e:
log.log().logger.error(e)
result = '2'
except NoSuchElementException as e:
log.log().logger.error(e)
try: try:
driver.accept_alert() driver.switch_to.alert.accept()
except : except:
log.log().logger.error('no alert') log.log().logger.error('no alert')
try: trytime += -1
driver.execute_script("window.scrollTo(0,document.body.scrollHeight)")
except :
log.log().logger.error('cannot scroll')
result = '2'
except AttributeError as e:
log.log().logger.info(e)
try:
driver.execute_script("window.scrollTo(0,document.body.scrollHeight)")
except :
log.log().logger.error('cannot scroll')
result = '2'
except WebDriverException as e:
log.log().logger.error(e)
try:
driver.accept_alert()
except :
log.log().logger.error('no alert')
result = '2'
except StaleElementReferenceException as e:
log.log().logger.info(e)
result = '2'
except http.client.RemoteDisconnected as e:
log.log().logger.info(e)
result = '2'
if result == '2' : # 步骤失败时重试3次
tryTimes +=1
time.sleep(2)
else:
break
log.log().logger.info('try times :%s, result is: %s' %(tryTimes, result))
if result == '2' and tryTimes ==3: # 失败3次自动截图
retryScreenshot = 2
while retryScreenshot:
try:
result, screenFileList =extend.extend().screenshot(driver,id,screenFileList,True)
break
except WebDriverException as e:
log.log().logger.error(e)
driver.switch_to_default_content()
retryScreenshot +=-1
# break
if result == '2' and tryTimes >=3: # 失败3次跳出
break
if result == '2' : # 失败3次跳出
break break
# for android driver ,the ending should by driver.quite(); for webdriver ,the ending is driver.close()
try: try:
driver.quit() driver.quit()
except WebDriverException as e: except WebDriverException as e:
@ -147,45 +96,75 @@ class process():
test_batch_manage.test_batch_manage().set_test_end(result, datetime.datetime.now(), stepN, screenFileList,id) test_batch_manage.test_batch_manage().set_test_end(result, datetime.datetime.now(), stepN, screenFileList,id)
return result return result
# multiple run, for webdriver , the thread could by more than 1.
@retry(stop_max_attempt_number=3,wait_fixed=2000)
def do_step(self,driver,steps,case,id,screenFileList):
keyword = steps[0]
stepN = keyword
comed, element = buildCase.buildCase().build_case(keyword, steps[1]) # 转换为可执行语句
result = '2'
log.log().logger.info("id %-10d | 关键字: %-20s | 步骤:%-60s | 命令: %s" %(id, keyword,case, comed))
try:
if comed != '':
if keyword == '截图':
result, screenFileList = extend.extend().screenshot(driver, id, screenFileList)
else:
exec(comed) # 执行语句
result = '1'
except UnexpectedAlertPresentException as e:
log.log().logger.info(e)
try:
driver.switch_to.alert.accept()
result = '1'
except :
log.log().logger.error('no alert')
else:
stepN = 'no comed to run!'
return result,stepN,screenFileList
# multiple run, for android , the tread is 1; for webdriver , the thread could by more than 1.
def multipleRun(self,caselist, threadNum): def multipleRun(self,caselist, threadNum):
pool = ThreadPool(threadNum) pool = ThreadPool(threadNum)
pool.map(self.main, caselist) pool.map(self.main, caselist)
pool.close() pool.close()
pool.join() pool.join()
def runmain(self, test_suite_id, threadNum, runType): def runmain(self,test_suite_id,threadNum, runType ):
if runType == 'Android' and isUseATX: if runType == 'Android' and isUseATX:
Hubs = hubs.hubs().getDevices() Hubs = hubs.hubs().getDevices()
log.log().logger.info('Run type is ATX and usable devices are %s' % Hubs) log.log().logger.debug('Run type is ATX and usable devices are %s' %Hubs)
else: else:
Hubs = hubs.hubs().showHubs(runType) Hubs = hubs.hubs().showHubs(runType)
if len(Hubs) == 0: if len(Hubs) ==0:
log.log().logger.error('cannot run for no available hubs!') log.log().logger.debug('cannot run for no available hubs!')
elif runType == 'Android' and isUseATX: elif runType == 'Android' and isUseATX:
self.atxMain() self.atxMain()
else: else:
self.multipleRun(util.util().getTeseCases(test_suite_id), threadNum) self.multipleRun(util.util().getTeseCases(test_suite_id),threadNum)
test_task_manage.test_task_manage().update_test_suite_check() test_task_manage.test_task_manage().update_test_suite_check()
def atxMain(self): def atxMain(self):
q = queue.Queue() q = queue.Queue()
Hubs = hubs.hubs().getDevices() Hubs = hubs.hubs().getDevices()
alllist = util.util().getTeseCasesATX(all=True) alllist = util.util().getTeseCasesATX(all=True)
if len(Hubs) and len(alllist): if len(Hubs) and len(alllist):
count = 0 count = 0
threads = [] threads = []
for i in range(len(Hubs)): for i in range(len(Hubs)):
j = Hubs[i] j = Hubs[i]
threads.append(MyThread(q, i, j)) threads.append(MyThread(q, i, j))
for mt in threads: for mt in threads:
mt.start() mt.start()
log.log().logger.info("start time: %s" % time.ctime()) log.log().logger.info("start time: %s" %time.ctime())
elif len(alllist): elif len(alllist):
log.log().logger.info('no device is avaible!') log.log().logger.debug('no device is avaible!')
else: else:
log.log().logger.info('no test case is needed!') log.log().logger.debug('no test case is needed!')
class MyThread(threading.Thread): class MyThread(threading.Thread):
@ -196,30 +175,30 @@ class MyThread(threading.Thread):
self.j = j self.j = j
def run(self): def run(self):
list0 = util.util().getTeseCasesATX(self.j, isRunning=True) list0 = util.util().getTeseCasesATX(self.j,isRunning=True)
if len(list0): if len(list0):
log.log().logger.info('case still running !') log.log().logger.info('case still running !')
else: else:
list = util.util().getTeseCasesATX(self.j) list = util.util().getTeseCasesATX(self.j)
if len(list): if len(list):
log.log().logger.info('start test by single devices :%s ' % list[0][0]) log.log().logger.info('start test by single devices :%s ' %list[0][0] )
self.q.put(u"打点我是第%d个线程ip: %s, 测试用例:%s" % (self.t, self.j, list[0][0])) self.q.put(u"打点我是第%d个线程ip: %s, 测试用例:%s" % (self.t, self.j,list[0][0]))
log.log().logger.info(u"打点我是第%d个线程ip: %s, 测试用例:%s" % (self.t, self.j, list[0][0])) log.log().logger.info(u"打点我是第%d个线程ip: %s, 测试用例:%s" % (self.t, self.j,list[0][0]))
process().main(list[0]) process().main(list[0])
time.sleep(2) time.sleep(2)
else: else:
list2 = util.util().getTeseCasesATX(all=True) list2 = util.util().getTeseCasesATX(all=True)
log.log().logger.info('current list lenth is: %s' % len(list2)) log.log().logger.info('current list lenth is: %s'%len(list2))
if len(list2): if len(list2):
try: try:
case = list2[self.t] case = list2[self.t]
except: except:
case = list2[0] case=list2[0]
log.log().logger.info('current ip is : %s' % str(self.j)) log.log().logger.info('current ip is : %s'%str(self.j))
log.log().logger.info('start test by multiple devices :%s ' % case[0]) log.log().logger.info('start test by multiple devices :%s ' % case[0])
self.q.put(u"我是第%d个线程ip: %s, 测试用例:%s" % (self.t, self.j, case[0])) self.q.put(u"我是第%d个线程ip: %s, 测试用例:%s" % (self.t, self.j, case[0]))
log.log().logger.info(u"打点我是第%d个线程ip: %s, 测试用例:%s" % (self.t, self.j, case[0])) log.log().logger.info(u"打点我是第%d个线程ip: %s, 测试用例:%s" % (self.t, self.j, case[0]))
process().main(case, deviceList=[str(self.j)]) process().main(case,deviceList=[str(self.j)])
time.sleep(3) time.sleep(3)
else: else:
log.log().logger.info('no case !') log.log().logger.info('no case !')

View File

@ -25,19 +25,19 @@ class util():
if platform.system() == 'Windows': if platform.system() == 'Windows':
screen_shot_path = config.screen_shot_path screen_shot_path = config.screen_shot_path
screen_shot_path1 = 'static\\screenshot\\' screen_shot_path1 = 'static\\screenshot\\'
log.log().logger.info(screen_shot_path) log.log().logger.debug(screen_shot_path)
normalfilename = screen_shot_path + 'normalScreenShot\\' + str(caseNo)+'_success' + nowTime + '.jpg' normalfilename = screen_shot_path + 'normalScreenShot\\' + str(caseNo)+'_success' + nowTime + '.png'
errorfilename = screen_shot_path + 'errorScreenShot\\' + str(caseNo)+'_error' + nowTime + '.jpg' errorfilename = screen_shot_path + 'errorScreenShot\\' + str(caseNo)+'_error' + nowTime + '.png'
normalfilename1 = '\\' +screen_shot_path1 + '\\' + 'normalScreenShot' + '\\' + str(caseNo)+'_success' + nowTime + '.jpg' normalfilename1 = '\\' +screen_shot_path1 + '\\' + 'normalScreenShot' + '\\' + str(caseNo)+'_success' + nowTime + '.png'
errorfilename1 = '\\' + screen_shot_path1 + '\\' + 'errorScreenShot' + '\\' + str(caseNo)+'_error' + nowTime + '.jpg' errorfilename1 = '\\' + screen_shot_path1 + '\\' + 'errorScreenShot' + '\\' + str(caseNo)+'_error' + nowTime + '.png'
else: else:
screen_shot_path = config.screen_shot_path screen_shot_path = config.screen_shot_path
screen_shot_path1 = 'static/screenshot' screen_shot_path1 = 'static/screenshot'
normalfilename = screen_shot_path + 'normalScreenShot' + '/'+ str(caseNo)+'_success' + nowTime + '.jpg' normalfilename = screen_shot_path + 'normalScreenShot' + '/'+ str(caseNo)+'_success' + nowTime + '.png'
errorfilename = screen_shot_path + 'errorScreenShot' + '/' + str(caseNo)+'_error' + nowTime + '.jpg' errorfilename = screen_shot_path + 'errorScreenShot' + '/' + str(caseNo)+'_error' + nowTime + '.png'
normalfilename1 = '/' + screen_shot_path1 + '/' + 'normalScreenShot' + '/' + str( normalfilename1 = '/' + screen_shot_path1 + '/' + 'normalScreenShot' + '/' + str(
caseNo) + '_success' + nowTime + '.jpg' caseNo) + '_success' + nowTime + '.png'
errorfilename1 = '/' + screen_shot_path1 + '/' + 'errorScreenShot' + '/' + str(caseNo) + '_error' + nowTime + '.jpg' errorfilename1 = '/' + screen_shot_path1 + '/' + 'errorScreenShot' + '/' + str(caseNo) + '_error' + nowTime + '.png'
if screenShotType == 'error': if screenShotType == 'error':
return errorfilename,errorfilename1 return errorfilename,errorfilename1
else: else:

View File

@ -53,12 +53,13 @@ class test_keyword_manage:
useDB.useDB().insert(sql) useDB.useDB().insert(sql)
return 1 return 1
def show_test_keywords_options(self): def show_test_keywords_options(self):
results = [] results = []
sql = 'select keyword from test_keyword where status = 1 ;' sql = 'select keyword from test_keyword where status = 1 order by keyword ;'
cases = useDB.useDB().search(sql) cases = useDB.useDB().search(sql)
print(cases) # print(cases)
log.log().logger.info('cases : %s' % cases) log.log().logger.info('cases : %s'%cases)
for i in range(len(cases)): for i in range(len(cases)):
results.append(cases[i][0]) results.append(cases[i][0])
return results return results

View File

@ -624,7 +624,7 @@ function changeValue(obj,order){
//setModule(obj.options[obj.selectedIndex].value); //setModule(obj.options[obj.selectedIndex].value);
var keyword = obj.options[obj.selectedIndex].value; var keyword = obj.options[obj.selectedIndex].value;
var method = document.getElementsByClassName('td_para_'+order)[0]; var method = document.getElementsByClassName('td_para_'+order)[0];
if(['点击','填写','选择','填写日期','选择全部','验证文字','验证文字非','点击索引'].indexOf(keyword)!= -1){ if(['点击','填写','选择','填写日期','填写文件','选择全部','验证文字','验证文字非','点击索引'].indexOf(keyword)!= -1){
// changeBy(keyword,order); // changeBy(keyword,order);
var methodSelect = method.getElementsByClassName('method'); var methodSelect = method.getElementsByClassName('method');
// alert(methodSelect.length); // alert(methodSelect.length);

View File

@ -52,14 +52,13 @@ class mysqlDB(object):
def connect(self): def connect(self):
# change root password to yours: # change root password to yours:
import mysql.connector import mysql.connector
conn = mysql.connector.connect(host=config.host,port=config.port, user=config.user, password=config.password, database=config.database) conn = mysql.connector.connect(host=config.host,port=config.port, user=config.user, password=config.password, database=config.database)
return conn return conn
def search(self, sql): def search(self, sql):
conn = self.connect() conn = self.connect()
cursor = conn.cursor() cursor = conn.cursor()
log.log().logger.info(sql) # log.log().logger.info(sql)
cursor.execute(sql) cursor.execute(sql)
values = cursor.fetchall() values = cursor.fetchall()
# log.log().logger.info('values1 :', values) # log.log().logger.info('values1 :', values)
@ -68,7 +67,7 @@ class mysqlDB(object):
return values return values
def insert(self, sql): def insert(self, sql):
log.log().logger.info(sql) log.log().logger.debug(sql)
conn = self.connect() conn = self.connect()
cursor = conn.cursor() cursor = conn.cursor()
cursor.execute(sql) cursor.execute(sql)
@ -76,11 +75,11 @@ class mysqlDB(object):
try: try:
conn.commit() conn.commit()
except : except :
log.log().logger.info('commit error') log.log().logger.error('commit error')
conn.close() conn.close()
def excutesql(self, sql): def excutesql(self, sql):
log.log().logger.info(sql) log.log().logger.debug(sql)
conn = self.connect() conn = self.connect()
cursor = conn.cursor() cursor = conn.cursor()
cursor.execute(sql) cursor.execute(sql)
@ -88,7 +87,7 @@ class mysqlDB(object):
try: try:
conn.commit() conn.commit()
except: except:
log.log().logger.info('commit error') log.log().logger.error('commit error')
conn.close() conn.close()
# 测试查询防止sql注入 # 测试查询防止sql注入

View File

@ -172,8 +172,8 @@ insert into `test_keyword` ( `keyword`, `paraCount`, `template`, `elementTemplat
insert into `test_keyword` ( `keyword`, `paraCount`, `template`, `elementTemplate`, `example`, `description`, `status`) values('点击文字','1','extend.extend().click_menu(driver, \"$para1\")','driver.element_by_partial_link_text(\"$para1\")','点击文字|查询','通过文本内容,快速点击元素。 适用于文字链接、按钮等元素。','1'); insert into `test_keyword` ( `keyword`, `paraCount`, `template`, `elementTemplate`, `example`, `description`, `status`) values('点击文字','1','extend.extend().click_menu(driver, \"$para1\")','driver.element_by_partial_link_text(\"$para1\")','点击文字|查询','通过文本内容,快速点击元素。 适用于文字链接、按钮等元素。','1');
insert into `test_keyword` ( `keyword`, `paraCount`, `template`, `elementTemplate`, `example`, `description`, `status`) values('悬浮点击','2','driver.find_element_by_$para1(\"$para2\").click()','driver.element_by_$para1(\"$para2\")','悬浮点击|id@@searchBtn\r\ndriver.find_element_by_id(\"searchBtn\").click()','点击页面元素,可按 id、css、xpath 等方式定位元素。 同 点击','1'); insert into `test_keyword` ( `keyword`, `paraCount`, `template`, `elementTemplate`, `example`, `description`, `status`) values('悬浮点击','2','driver.find_element_by_$para1(\"$para2\").click()','driver.element_by_$para1(\"$para2\")','悬浮点击|id@@searchBtn\r\ndriver.find_element_by_id(\"searchBtn\").click()','点击页面元素,可按 id、css、xpath 等方式定位元素。 同 点击','1');
insert into `test_keyword` ( `keyword`, `paraCount`, `template`, `elementTemplate`, `example`, `description`, `status`) values('选择','4','extend.extend().select(driver,[\"$para1\",\"$para2\",\"$para3\",\"$para4\"])','driver.element_by_$para1(\"$para2\")','选择|id@@selectBox@@index@@1选择|id@@selectBox@@text@@中国','选择下拉框中指定的选项。 可按 index、value、 text完全匹配、text_part模糊匹配 等信息选择选项','1'); insert into `test_keyword` ( `keyword`, `paraCount`, `template`, `elementTemplate`, `example`, `description`, `status`) values('选择','4','extend.extend().select(driver,[\"$para1\",\"$para2\",\"$para3\",\"$para4\"])','driver.element_by_$para1(\"$para2\")','选择|id@@selectBox@@index@@1选择|id@@selectBox@@text@@中国','选择下拉框中指定的选项。 可按 index、value、 text完全匹配、text_part模糊匹配 等信息选择选项','1');
insert into `test_keyword` ( `keyword`, `paraCount`, `template`, `elementTemplate`, `example`, `description`, `status`) values('验证','1','$para1',NULL,'验证|成功','验证页面中是否包含预期文字','1'); insert into `test_keyword` ( `keyword`, `paraCount`, `template`, `elementTemplate`, `example`, `description`, `status`) values('验证','1','extend.extend().assert_text(driver, \"$para1\")',NULL,'验证|成功','验证页面中是否包含预期文字','1');
insert into `test_keyword` ( `keyword`, `paraCount`, `template`, `elementTemplate`, `example`, `description`, `status`) values('验证文字','3','$para1,$para2,$para3',NULL,'验证文字|id@@text@@成功','验证指定元素中是否包含预期的文字信息。 如验证提示是否包含“成功”','1'); insert into `test_keyword` ( `keyword`, `paraCount`, `template`, `elementTemplate`, `example`, `description`, `status`) values('验证文字','3','extend.extend().assert_element_text(driver, [\"$para1\",\"$para2\",\"$para3\"])',NULL,'验证文字|id@@text@@成功','验证指定元素中是否包含预期的文字信息。 如验证提示是否包含“成功”','1');
insert into `test_keyword` ( `keyword`, `paraCount`, `template`, `elementTemplate`, `example`, `description`, `status`) values('截图','1','extend.extend().screenshot(driver,id,screenFileList)',NULL,'截图','手动截图。 注:用例执行失败时会自动截图。','1'); insert into `test_keyword` ( `keyword`, `paraCount`, `template`, `elementTemplate`, `example`, `description`, `status`) values('截图','1','extend.extend().screenshot(driver,id,screenFileList)',NULL,'截图','手动截图。 注:用例执行失败时会自动截图。','1');
insert into `test_keyword` ( `keyword`, `paraCount`, `template`, `elementTemplate`, `example`, `description`, `status`) values('填写日期','3','extend.extend().fill_on_date(driver,[\"$para1\",\"$para2\",\"$para3\"])','driver.element_by_$para1(\"$para2\")','填写日期|id@@start_date@@2018-04-01','在指定元素中输入日期。 ps此方法会去除元素的 readonly 属性,达到跳过手动选择日期,快速输入的目的','1'); insert into `test_keyword` ( `keyword`, `paraCount`, `template`, `elementTemplate`, `example`, `description`, `status`) values('填写日期','3','extend.extend().fill_on_date(driver,[\"$para1\",\"$para2\",\"$para3\"])','driver.element_by_$para1(\"$para2\")','填写日期|id@@start_date@@2018-04-01','在指定元素中输入日期。 ps此方法会去除元素的 readonly 属性,达到跳过手动选择日期,快速输入的目的','1');
insert into `test_keyword` ( `keyword`, `paraCount`, `template`, `elementTemplate`, `example`, `description`, `status`) values('返回','1','driver.back()',NULL,'返回','浏览器后退。','1'); insert into `test_keyword` ( `keyword`, `paraCount`, `template`, `elementTemplate`, `example`, `description`, `status`) values('返回','1','driver.back()',NULL,'返回','浏览器后退。','1');
@ -181,15 +181,16 @@ insert into `test_keyword` ( `keyword`, `paraCount`, `template`, `elementTemplat
insert into `test_keyword` ( `keyword`, `paraCount`, `template`, `elementTemplate`, `example`, `description`, `status`) values('切换','2','extend.extend().switchIframe(driver,[\"$para1\",\"$para2\"])',NULL,'切换|id@@iframe1','页面中包含iframe 时需要调用该方法进行切换否则无法定位其他iframe的元素。','1'); insert into `test_keyword` ( `keyword`, `paraCount`, `template`, `elementTemplate`, `example`, `description`, `status`) values('切换','2','extend.extend().switchIframe(driver,[\"$para1\",\"$para2\"])',NULL,'切换|id@@iframe1','页面中包含iframe 时需要调用该方法进行切换否则无法定位其他iframe的元素。','1');
insert into `test_keyword` ( `keyword`, `paraCount`, `template`, `elementTemplate`, `example`, `description`, `status`) values('滑动到底部','1','driver.execute_script(\"window.scrollTo(0,document.body.scrollHeight)\")',NULL,'滑动到底部','滑动页面到最底','1'); insert into `test_keyword` ( `keyword`, `paraCount`, `template`, `elementTemplate`, `example`, `description`, `status`) values('滑动到底部','1','driver.execute_script(\"window.scrollTo(0,document.body.scrollHeight)\")',NULL,'滑动到底部','滑动页面到最底','1');
insert into `test_keyword` ( `keyword`, `paraCount`, `template`, `elementTemplate`, `example`, `description`, `status`) values('反选','1','extend.extend().uncheck_checkbox(driver)',NULL,'反选','特别封装步骤,用于取消已选择的选项。(用于 mt-checkbox 类型的选择框)','1'); insert into `test_keyword` ( `keyword`, `paraCount`, `template`, `elementTemplate`, `example`, `description`, `status`) values('反选','1','extend.extend().uncheck_checkbox(driver)',NULL,'反选','特别封装步骤,用于取消已选择的选项。(用于 mt-checkbox 类型的选择框)','1');
insert into `test_keyword` ( `keyword`, `paraCount`, `template`, `elementTemplate`, `example`, `description`, `status`) values('验证文字非','3','$para1,$para2,$para3',NULL,'验证文字非|id@@text','验证指定元素中是否不包含预期的文字信息。 如验证提示是否不包含“成功”','1'); insert into `test_keyword` ( `keyword`, `paraCount`, `template`, `elementTemplate`, `example`, `description`, `status`) values('验证文字非','3','extend.extend().assert_element_text(driver, [\"$para1\",\"$para2\",\"$para3\"], isNot=True)',NULL,'验证文字非|id@@text','验证指定元素中是否不包含预期的文字信息。 如验证提示是否不包含“成功”','1');
insert into `test_keyword` ( `keyword`, `paraCount`, `template`, `elementTemplate`, `example`, `description`, `status`) values('切换主页','1','driver.switch_to_default_content()',NULL,'切换主页','从iframe中切换回主页面','1'); insert into `test_keyword` ( `keyword`, `paraCount`, `template`, `elementTemplate`, `example`, `description`, `status`) values('切换主页','1','driver.switch_to_default_content()',NULL,'切换主页','从iframe中切换回主页面','1');
insert into `test_keyword` ( `keyword`, `paraCount`, `template`, `elementTemplate`, `example`, `description`, `status`) values('刷新','1','driver.refresh()',NULL,'刷新','浏览器后退。','1'); insert into `test_keyword` ( `keyword`, `paraCount`, `template`, `elementTemplate`, `example`, `description`, `status`) values('刷新','1','driver.refresh()',NULL,'刷新','浏览器后退。','1');
insert into `test_keyword` ( `keyword`, `paraCount`, `template`, `elementTemplate`, `example`, `description`, `status`) values('填写1','3','driver.find_element_by_$para1(\"$para2\").send_keys(\"$para3\")','driver.element_by_$para1(\"$para2\")','填写|id@@input_box@@ghw','在指定元素中输入文字。不清除原已输入的值。','1'); insert into `test_keyword` ( `keyword`, `paraCount`, `template`, `elementTemplate`, `example`, `description`, `status`) values('填写1','3','driver.find_element_by_$para1(\"$para2\").send_keys(\"$para3\")','driver.element_by_$para1(\"$para2\")','填写|id@@input_box@@ghw','在指定元素中输入文字。不清除原已输入的值。','1');
insert into `test_keyword` ( `keyword`, `paraCount`, `template`, `elementTemplate`, `example`, `description`, `status`) values('尝试点击','2','extend.extend().try_click(driver,[\"$para1\",\"$para2\"])','driver.element_by_$para1(\"$para2\")','尝试点击|id@@searchBtn','尝试点击页面元素,如果点击失败,则跳过。','1'); insert into `test_keyword` ( `keyword`, `paraCount`, `template`, `elementTemplate`, `example`, `description`, `status`) values('尝试点击','2','extend.extend().try_click(driver,[\"$para1\",\"$para2\"])','driver.element_by_$para1(\"$para2\")','尝试点击|id@@searchBtn','尝试点击页面元素,如果点击失败,则跳过。','1');
insert into `test_keyword` ( `keyword`, `paraCount`, `template`, `elementTemplate`, `example`, `description`, `status`) values('填写','3','extend.extend().find_element(driver,[\"$para1\",\"$para2\"]).clear()\r\nextend.extend().find_element(driver,[\"$para1\",\"$para2\"]).send_keys(\"$para3\")','driver.element_by_$para1(\"$para2\")','填写|id@@input_box@@ghw','在指定元素中输入文字,可按 id、css、xpath、class、name、text 等方式定位元素','1'); insert into `test_keyword` ( `keyword`, `paraCount`, `template`, `elementTemplate`, `example`, `description`, `status`) values('填写','3','extend.extend().fill(driver,[\"$para1\",\"$para2\"],\"$para3\")','driver.element_by_$para1(\"$para2\")','填写|id@@input_box@@ghw','在指定元素中输入文字,可按 id、css、xpath、class、name、text 等方式定位元素','1');
insert into `test_keyword` ( `keyword`, `paraCount`, `template`, `elementTemplate`, `example`, `description`, `status`) values('选择全部','2','extend.extend().select_all(driver,[\"$para1\",\"$para2\"])','driver.element_by_$para1(\"$para2\")','选择全部|id@@select','对下拉框,选择所有选项','1'); insert into `test_keyword` ( `keyword`, `paraCount`, `template`, `elementTemplate`, `example`, `description`, `status`) values('选择全部','2','extend.extend().select_all(driver,[\"$para1\",\"$para2\"])','driver.element_by_$para1(\"$para2\")','选择全部|id@@select','对下拉框,选择所有选项','1');
insert into `test_keyword` ( `keyword`, `paraCount`, `template`, `elementTemplate`, `example`, `description`, `status`) values('点击全部','2','extend.extend().check_all(driver,[\"$para1\",\"$para2\"])','driver.element_by_$para1(\"$para2\")','点击全部|id@@CheckBox','对多选框,选择所有选项','1'); insert into `test_keyword` ( `keyword`, `paraCount`, `template`, `elementTemplate`, `example`, `description`, `status`) values('点击全部','2','extend.extend().check_all(driver,[\"$para1\",\"$para2\"])','driver.element_by_$para1(\"$para2\")','点击全部|id@@CheckBox','对多选框,选择所有选项','1');
insert into `test_keyword` ( `keyword`, `paraCount`, `template`, `elementTemplate`, `example`, `description`, `status`) values('切换窗口','1','extend.extend().switchWindow(driver)',NULL,'切换窗口','当浏览器弹出新的窗口时,切换到另一个窗口进行操作','1'); insert into `test_keyword` ( `keyword`, `paraCount`, `template`, `elementTemplate`, `example`, `description`, `status`) values('切换窗口','1','extend.extend().switchWindow(driver)',NULL,'切换窗口','当浏览器弹出新的窗口时,切换到另一个窗口进行操作','1');
insert into `test_keyword` ( `keyword`, `paraCount`, `template`, `elementTemplate`, `example`, `description`, `status`) values('验证标题','1','extend.extend().assert_title(driver,"$para1")',NULL,'验证标题|百度地图','验证页面的title中是否包含预期文字','1'); insert into `test_keyword` ( `keyword`, `paraCount`, `template`, `elementTemplate`, `example`, `description`, `status`) values('验证标题','1','extend.extend().assert_title(driver,"$para1")',NULL,'验证标题|百度地图','验证页面的title中是否包含预期文字','1');
insert into `test_keyword` ( `keyword`, `paraCount`, `template`, `elementTemplate`, `example`, `description`, `status`) values('Chrome','1','Chrome',NULL,'Chrome','初始化Chrome 浏览器','1'); insert into `test_keyword` ( `keyword`, `paraCount`, `template`, `elementTemplate`, `example`, `description`, `status`) values('Chrome','1','Chrome',NULL,'Chrome','初始化Chrome 浏览器','1');
insert into `test_keyword` ( `keyword`, `paraCount`, `template`, `elementTemplate`, `example`, `description`, `status`) values('公共方法','1',' $para1',NULL,'公共方法|游客登录','调用公共方法','1'); insert into `test_keyword` ( `keyword`, `paraCount`, `template`, `elementTemplate`, `example`, `description`, `status`) values('公共方法','1',' $para1',NULL,'公共方法|游客登录','调用公共方法','1');
insert into `test_keyword` ( `keyword`, `paraCount`, `template`, `elementTemplate`, `example`, `description`, `status`) values('填写文件','3','extend.extend().fill_file(driver,[\"$para1\",\"$para2\"],\"$para3\")','driver.element_by_$para1(\"$para2\")','填写文件|id@@input_box@@ghw','在指定元素中输入文件路径,可按 id、css、xpath、class、name、text 等方式定位元素','1');