优化步骤执行逻辑

This commit is contained in:
jerrylizilong 2018-11-07 14:13:51 +08:00
parent e697df54dd
commit 02d8c16cd3
4 changed files with 117 additions and 142 deletions

View File

@ -1,80 +1,47 @@
from app.core import log, buildCase, atx_steps from app.core import log, atx_steps
from app.db import test_batch_manage from app.db import test_batch_manage
from retrying import retry
class atx_core(): class atx_core():
# run a signle case. # run a signle case.
# the main method # the main method
def run_case(self,steps,caseNo,deviceList = []): def run_case(self,id,package,newstep,case,screenFileList,deviceList = []):
search_result = test_batch_manage.test_batch_manage().search_test_batch_detail1(caseNo, ['ip']) result = '2'
stepN = 'init'
search_result = test_batch_manage.test_batch_manage().search_test_batch_detail1(id, ['ip'])
if len(search_result): if len(search_result):
log.log().logger.info(search_result) log.log().logger.info(search_result)
log.log().logger.info(deviceList) log.log().logger.info(deviceList)
if search_result['ip']: if search_result['ip']:
deviceList = [search_result['ip']] deviceList = [search_result['ip']]
log.log().logger.info(deviceList) log.log().logger.info(deviceList)
step_name = ''
steps = steps.split(',') isConnected,device0,u = atx_steps.atx_driver().connectDevice(package,deviceList)
steps = buildCase.buildCase().readPublic(steps) log.log().logger.info(' is %s connected? %s' %(device0, isConnected))
step0 = steps[0].split('|') if isConnected:
if len(step0)!=2: for i in range(len(newstep)):
log.log().logger.error('android init is not defined!') stepN = case[i].replace('"', "'")
return 2, 'init', [] u, result, screenFileList = self.do_step(u,newstep[i], stepN,id,screenFileList)
elif step0[0]!='Android': if result=='2':
log.log().logger.error('android init is not defined!') break
return 2, 'init', [] return result,stepN,screenFileList
else: else:
package = step0[1] log.log().logger.info('package is not found in any device!')
isConnected,device0,u = atx_steps.atx_driver().connectDevice(package,deviceList) return '2', 'package not found', []
log.log().logger.info(' is %s connected? %s' %(device0, isConnected))
if isConnected:
log.log().logger.info('start runnning test on %s' %device0)
screenFileList = []
result = 1
for step in steps:
log.log().logger.info('current step is: %s' %step)
if len(step)==0:
pass
else:
step = step.split('|')
# log().logger.info(step)
if len(step) >= 1:
step_name = step[0]
log.log().logger.info(step_name)
if len(step) > 1:
detail = step[1].split('@@')
log.log().logger.info(detail)
else:
detail = []
trytime = 3
while trytime:
log.log().logger.info('try time: %s' % (4 - trytime))
u,result,screenFileList = atx_steps.atx_driver().run_step(u, step_name, detail,caseNo,screenFileList)
if result == 2:
log.log().logger.error('failed at %s : %s, try again!' % (step_name, detail))
trytime += -1
else:
trytime=0
log.log().logger.info('finish step %s : %s.' % (step_name, detail))
if trytime==0 and result == 2:
log.log().logger.error('failed at %s : %s after trying 3 times!' % (step_name, detail))
break
else:
pass
return result,step_name,screenFileList
else:
log.log().logger.info('package is not found in any device!')
return 2, 'package not found', []
@retry(stop_max_attempt_number=3,wait_fixed=5000)
def do_step(self,u, steps,case,id,screenFileList):
keyword = steps[0]
log.log().logger.info("id %-10d | 关键字: %-20s | 步骤:%-60s | 命令: %s" % (id, keyword, case, steps[1]))
if keyword == '截图':
result,screenFileList = atx_steps.atx_driver().take_screenshot(u,'normal', id, screenFileList)
else:
u, result, screenFileList=atx_steps.atx_driver().run_step(u, keyword, steps[1], id, screenFileList)
try:
assert result == '1'
except AssertionError as e:
print(e)
return u,result, screenFileList
# deviceList = ['172.16.131.102']
# steps = 'Android|com.ghw.sdk2,点击|id@@com.ghw.sdk2:id/btn_login,等待|5,点击|name@@FACEBOOK登录,等待|10'
# for i in range(200):
# steps += ',点击|name@@FACEBOOK登录,等待|10'
#
# while 1:
# try:
# atx_core().run_case(steps,1000,deviceList)
# except FileNotFoundError as e:
# print(e)

View File

@ -80,10 +80,10 @@ class atx_driver():
time.sleep(2) time.sleep(2)
u1, result1 =self.click(u,'name','允许') u1, result1 =self.click(u,'name','允许')
time.sleep(1) time.sleep(1)
if result1==2: if result1=='2':
u1, result1 = self.click(u, 'name', '始终允许') u1, result1 = self.click(u, 'name', '始终允许')
time.sleep(1) time.sleep(1)
if result1==2: if result1=='2':
u1, result1 = self.click(u, 'name', 'ALLOW') u1, result1 = self.click(u, 'name', 'ALLOW')
time.sleep(1) time.sleep(1)
return u return u
@ -96,7 +96,8 @@ class atx_driver():
image = u.screenshot() image = u.screenshot()
image.save(fileName) image.save(fileName)
screenFileList.append(fileName1) screenFileList.append(fileName1)
return screenFileList result = '1'
return result,screenFileList
# type text into the target element. # type text into the target element.
# method : id, name the method used to locate the target element. # method : id, name the method used to locate the target element.
@ -106,39 +107,39 @@ class atx_driver():
if u(resourceId=resource_id).exists: if u(resourceId=resource_id).exists:
u(resourceId=resource_id).send_keys(text) u(resourceId=resource_id).send_keys(text)
time.sleep(1) time.sleep(1)
result = 1 result = '1'
u.adb_shell('input', 'keyevent', 'BACK') u.adb_shell('input', 'keyevent', 'BACK')
time.sleep(1) time.sleep(1)
else: else:
log.log().logger.error(u"出错了,没有找到元素! by %s , %s" %(method,resource_id)) log.log().logger.error(u"出错了,没有找到元素! by %s , %s" %(method,resource_id))
result = 2 result = '2'
elif method== 'name': elif method== 'name':
if u(text=resource_id).exists: if u(text=resource_id).exists:
u(text=resource_id).send_keys(text) u(text=resource_id).send_keys(text)
time.sleep(1) time.sleep(1)
result = 1 result = '1'
u.adb_shell('input', 'keyevent', 'BACK') u.adb_shell('input', 'keyevent', 'BACK')
time.sleep(1) time.sleep(1)
else: else:
log.log().logger.error(u"出错了,没有找到元素! by %s , %s" % (method, resource_id)) log.log().logger.error(u"出错了,没有找到元素! by %s , %s" % (method, resource_id))
result = 2 result = '2'
elif method== 'class': elif method== 'class':
if u(className=resource_id).exists: if u(className=resource_id).exists:
u(className=resource_id).send_keys(text) u(className=resource_id).send_keys(text)
time.sleep(1) time.sleep(1)
result = 1 result = '1'
u.adb_shell('input', 'keyevent', 'BACK') u.adb_shell('input', 'keyevent', 'BACK')
time.sleep(1) time.sleep(1)
else: else:
log.log().logger.error(u"出错了,没有找到元素! by %s , %s" % (method, resource_id)) log.log().logger.error(u"出错了,没有找到元素! by %s , %s" % (method, resource_id))
result = 2 result = '2'
else: else:
log.log().logger.error(u"元素方法未定义! %s" %method) log.log().logger.error(u"元素方法未定义! %s" %method)
result = 2 result = '2'
return u,result return u,result
def click(self,u,by,text): def click(self,u,by,text):
result = 2 result = '2'
if by == 'name': if by == 'name':
u, result = self.click_text(u, text) u, result = self.click_text(u, text)
elif by == 'id': elif by == 'id':
@ -154,10 +155,10 @@ class atx_driver():
if u(text=text).exists: if u(text=text).exists:
u(text=text).click() u(text=text).click()
time.sleep(1) time.sleep(1)
result = 1 result = '1'
else: else:
log.log().logger.error(u"出错了,没有找到元素! by %s , %s" % ('text', text)) log.log().logger.error(u"出错了,没有找到元素! by %s , %s" % ('text', text))
result = 2 result = '2'
return u,result return u,result
# click an element locating by it's resourceId. # click an element locating by it's resourceId.
@ -165,10 +166,10 @@ class atx_driver():
if u(resourceId=id).exists: if u(resourceId=id).exists:
u(resourceId=id).click() u(resourceId=id).click()
time.sleep(1) time.sleep(1)
result = 1 result = '1'
else: else:
log.log().logger.error(u"出错了,没有找到元素! by %s , %s" % ('id', id)) log.log().logger.error(u"出错了,没有找到元素! by %s , %s" % ('id', id))
result = 2 result = '2'
return u, result return u, result
# click an element locating by it's description. # click an element locating by it's description.
@ -176,10 +177,10 @@ class atx_driver():
if u(description=id).exists: if u(description=id).exists:
u(description=id).click() u(description=id).click()
time.sleep(1) time.sleep(1)
result = 1 result = '1'
else: else:
log.log().logger.error(u"出错了,没有找到元素! by %s , %s" % ('description', id)) log.log().logger.error(u"出错了,没有找到元素! by %s , %s" % ('description', id))
result = 2 result = '2'
return u, result return u, result
# click an element locating by it's className. # click an element locating by it's className.
@ -187,17 +188,17 @@ class atx_driver():
if u(className=id).exists: if u(className=id).exists:
u(className=id).click() u(className=id).click()
time.sleep(1) time.sleep(1)
result = 1 result = '1'
else: else:
log.log().logger.error(u"出错了,没有找到元素! by %s , %s" % ('className', id)) log.log().logger.error(u"出错了,没有找到元素! by %s , %s" % ('className', id))
result = 2 result = '2'
return u, result return u, result
# Send event to data collection. used to test sending data in SDK demo. # Send event to data collection. used to test sending data in SDK demo.
def sendData(self, u,name): def sendData(self, u,name):
u, result=self.click_text(u,name) u, result=self.click_text(u,name)
time.sleep(2) time.sleep(2)
if result==1: if result=='1':
if u(text='发送').exists: if u(text='发送').exists:
u, result =self.click_text(u,'发送') u, result =self.click_text(u,'发送')
else: else:
@ -209,7 +210,8 @@ class atx_driver():
# run step. # run step.
# if a new step type is defined, it should be added to the step_name list. # if a new step type is defined, it should be added to the step_name list.
def run_step(self,u, step_name, detail,caseNo,screenFileList): def run_step(self,u, step_name, detail,caseNo,screenFileList):
result = 1 # print(u, step_name, detail,caseNo,screenFileList)
result = '1'
if step_name == 'Android': if step_name == 'Android':
time.sleep(1) time.sleep(1)
elif step_name == '点击': elif step_name == '点击':
@ -228,10 +230,10 @@ class atx_driver():
elif step_name == '截图': elif step_name == '截图':
screenFileList=self.take_screenshot(u, 'normal',caseNo,screenFileList) screenFileList=self.take_screenshot(u, 'normal',caseNo,screenFileList)
else: else:
result=2 result='2'
log.log().logger.error('method is not defined : %s' %step_name) log.log().logger.error('method is not defined : %s' %step_name)
if result == 0: if result == '0':
log.log().logger.error('package is not fould!') log.log().logger.error('package is not fould!')
elif result == 2: elif result == '2':
screenFileList =self.take_screenshot(u, 'fail', caseNo, screenFileList) screenFileList =self.take_screenshot(u, 'fail', caseNo, screenFileList)
return u,result,screenFileList return u,result,screenFileList

View File

@ -57,7 +57,8 @@ class extend():
log.log().logger.debug(fileName) log.log().logger.debug(fileName)
driver.save_screenshot(fileName) driver.save_screenshot(fileName)
screenFileList.append(fileName1) screenFileList.append(fileName1)
result = '1' if not isError:
result = '1'
return result, screenFileList return result, screenFileList
def assert_text(self,driver,text): def assert_text(self,driver,text):

View File

@ -26,67 +26,21 @@ class process():
package = '' package = ''
log.log().logger.info('开始执行 id %-10d | 用例 %s ' %(id,case)) log.log().logger.info('开始执行 id %-10d | 用例 %s ' %(id,case))
newstep,case = buildCase.buildCase().getCase(case) newstep,case = buildCase.buildCase().getCase(case)
# print(newstep,case)
if len(newstep): if len(newstep):
runType = newstep[0][0] runType = newstep[0][0]
if len(newstep[0][1]) == 1 : if len(newstep[0][1]) == 1 :
if newstep[0][1][0]!='1': if newstep[0][1][0]!='1':
package = newstep[0][1][0] package = newstep[0][1][0]
newstep.remove(newstep[0])
case.remove(case[0])
if runType == 'Android' and isUseATX: if runType == 'Android' and isUseATX:
# 使用 atx 执行 Android 用例 # 使用 atx 执行 Android 用例
result, stepN,screenFileList = atx_core.atx_core().run_case(case,id,deviceList=deviceList) result, stepN,screenFileList = atx_core.atx_core().run_case(id,package,newstep,case,screenFileList,deviceList=deviceList)
log.log().logger.info('%s, %s, %s' %(result, stepN,screenFileList)) log.log().logger.info('%s, %s, %s' %(result, stepN,screenFileList))
else: else:
# 使用 selenium 执行 web 用例 # 使用 selenium 执行 web 用例
if browserType !=''and package =='': result,stepN,screenFileList = self.run_selenium(id,runType,package,newstep,case,screenFileList)
runType = browserType
driver = coredriver.coredriver().iniDriver(runType,devicename=package)
result = '2'
stepN = 'init'
if driver == 0: # 没有可执行的节点,无法执行
log.log().logger.info('cannot run without available hubs!')
result = '3'
stepN = 'init'
else:
if len(newstep)<2: # 用例中没有执行步骤,无法执行
result = '3'
stepN = 'no steps!'
else:
for i in range(1,len(newstep)): # 开始逐个步骤执行
stepN = case[i].replace('"',"'")
# result, stepN, screenFileList = self.do_step(driver, newstep[i], screenFileList)
try:
result,stepN, screenFileList=self.do_step(driver,newstep[i],stepN,id,screenFileList)
except:
result = '2'
log.log().logger.error('id %-10d | 失败步骤:%s ' %(id,stepN))
# ("id %-10d | 关键字: %-20s | 步骤:%-60s | 命令: %s" % (id, keyword, case, comed))
if result == '2':
trytime = 2
while trytime:
try:
result1, screenFileList = extend.extend().screenshot(driver, id, screenFileList,
True)
break
except UnexpectedAlertPresentException as e:
log.log().logger.info(e)
time.sleep(5)
try:
driver.switch_to.alert.accept()
except:
log.log().logger.error('no alert')
trytime += -1
break
# for android driver ,the ending should by driver.quite(); for webdriver ,the ending is driver.close()
try:
driver.quit()
except WebDriverException as e:
log.log().logger.error(e)
try:
driver.close()
except WebDriverException as e:
log.log().logger.error(e)
else: else:
result = '3' result = '3'
stepN = '公共方法不存在!' stepN = '公共方法不存在!'
@ -97,7 +51,58 @@ class process():
return result return result
@retry(stop_max_attempt_number=3,wait_fixed=2000) def run_selenium(self,id,runType,package,newstep,case,screenFileList):
# print(id,runType,package,newstep,case,screenFileList)
result = '2'
stepN = 'init'
if runType != '' and package == '':
driver = coredriver.coredriver().iniDriver(runType, devicename=package)
if driver == 0: # 没有可执行的节点,无法执行
log.log().logger.info('cannot run without available hubs!')
result = '3'
else:
if len(newstep) < 2: # 用例中没有执行步骤,无法执行
result = '3'
stepN = 'no steps!'
else:
for i in range(len(newstep)): # 开始逐个步骤执行
stepN = case[i].replace('"', "'")
try:
result, stepN, screenFileList = self.do_step(driver, newstep[i], stepN, id, screenFileList)
except:
log.log().logger.error('id %-10d | 失败步骤:%s ' % (id, stepN))
result ='2'
if result == '2':
trytime = 2
while trytime:
try:
result, screenFileList = extend.extend().screenshot(driver, id, screenFileList,
True)
break
except UnexpectedAlertPresentException as e:
log.log().logger.info(e)
time.sleep(5)
try:
driver.switch_to.alert.accept()
except:
log.log().logger.error('no alert')
trytime += -1
break
# for android driver ,the ending should by driver.quite(); for webdriver ,the ending is driver.close()
try:
driver.quit()
except WebDriverException as e:
log.log().logger.error(e)
try:
driver.close()
except WebDriverException as e:
log.log().logger.error(e)
return result,stepN,screenFileList
@retry(stop_max_attempt_number=3,wait_fixed=5000)
def do_step(self,driver,steps,case,id,screenFileList): def do_step(self,driver,steps,case,id,screenFileList):
keyword = steps[0] keyword = steps[0]
stepN = keyword stepN = keyword