python-selenium并發(fā)執(zhí)行測試用例(方法一 各模塊每一條并發(fā)執(zhí)行)
我們提供的服務(wù)有:成都做網(wǎng)站、網(wǎng)站設(shè)計、微信公眾號開發(fā)、網(wǎng)站優(yōu)化、網(wǎng)站認(rèn)證、南海ssl等。為數(shù)千家企事業(yè)單位解決了網(wǎng)站和推廣的問題。提供周到的售前咨詢和貼心的售后服務(wù),是有科學(xué)管理、有技術(shù)的南海網(wǎng)站制作公司
總執(zhí)行代碼:
# coding=utf-8
import unittest,os,time
import HTMLTestRunner
import threading
import sys
sys.path.append('C:/Users/Dell/Desktop/CARE/program')#使用編輯器,要指定當(dāng)前目錄,不然無法執(zhí)行第20行代碼
def creatsuite():
casedir = []
list = os.listdir(os.path.dirname(os.getcwd()))#獲取當(dāng)前路徑的上一級目錄的所有文件夾,這里可以改成絕對路徑(要搜索的文件路徑)
for xx in list:
if "baidu" in xx:
casedir.append(xx)
suite =[]
for n in casedir:
testunit = unittest.TestSuite()
unittest.defaultTestLoader._top_level_dir = None
#(unittest.defaultTestLoader(): defaultTestLoader()類,通過該類下面的discover()方法可自動更具測試目錄start_dir匹配查找測試用例文件(test*.py),
并將查找到的測試用例組裝到測試套件,因此可以直接通過run()方法執(zhí)行discover)
discover = unittest.defaultTestLoader.discover(str(n),pattern='tet_*.py',top_level_dir=None)
for test_suite in discover:
for test_case in test_suite:
testunit.addTests(test_case)
suite.append(testunit)
return suite, casedir
def runcase(suite,casedir):
lastPath = os.path.dirname(os.getcwd())#獲取當(dāng)前路徑的上一級
resultDir = lastPath+"\\run\\report\\" #報告存放路徑
now = time.strftime("%Y-%m-%d %H.%M.%S",time.localtime())
filename = resultDir + now +" result.html"
fp = file(filename, 'wb')
proclist=[]
s=0
for i in suite:
runner = HTMLTestRunner.HTMLTestRunner(stream=fp,title=str(casedir[s])+u'測試報告',description=u'用例執(zhí)行情況:')
proc = threading.Thread(target=runner.run,args=(i,))
proclist.append(proc)
s=s+1
for proc in proclist:
proc.start()
for proc in proclist:
proc.join()
fp.close()
if __name__ == "__main__":
runtmp=creatsuite()
runcase(runtmp[0],runtmp[1])
在利用Python進(jìn)行系統(tǒng)管理的時候,特別是同時操作多個文件目錄,或者遠(yuǎn)程控制多臺主機(jī),并行操作可以節(jié)約大量的時間。多進(jìn)程是實(shí)現(xiàn)并發(fā)的手段之一,需要注意的問題是:
例如當(dāng)被操作對象數(shù)目不大時,可以直接利用multiprocessing中的Process動態(tài)成生多個進(jìn)程,十幾個還好,但如果是上百個,上千個。。。手動的去限制進(jìn)程數(shù)量卻又太過繁瑣,此時可以發(fā)揮進(jìn)程池的功效。
我們就可以通過維護(hù)一個進(jìn)程池來控制進(jìn)程數(shù)目,比如httpd的進(jìn)程模式,規(guī)定最小進(jìn)程數(shù)和最大進(jìn)程數(shù)..
ps: 對于遠(yuǎn)程過程調(diào)用的高級應(yīng)用程序而言,應(yīng)該使用進(jìn)程池,Pool可以提供指定數(shù)量的進(jìn)程,供用戶調(diào)用,當(dāng)有新的請求提交到pool中時,如果池還沒有滿,那么就會創(chuàng)建一個新的進(jìn)程用來執(zhí)行該請求;但如果池中的進(jìn)程數(shù)已經(jīng)達(dá)到規(guī)定最大值,那么該請求就會等待,直到池中有進(jìn)程結(jié)束,就重用進(jìn)程池中的進(jìn)程。
創(chuàng)建進(jìn)程池的類:如果指定numprocess為3,則進(jìn)程池會從無到有創(chuàng)建三個進(jìn)程,然后自始至終使用這三個進(jìn)程去執(zhí)行所有任務(wù),不會開啟其他進(jìn)程
參數(shù)介紹:
方法介紹:
主要方法:
其他方法(了解部分)
應(yīng)用:
發(fā)現(xiàn):并發(fā)開啟多個客戶端,服務(wù)端同一時間只有3個不同的pid,干掉一個客戶端,另外一個客戶端才會進(jìn)來,被3個進(jìn)程之一處理
回調(diào)函數(shù):
需要回調(diào)函數(shù)的場景:進(jìn)程池中任何一個任務(wù)一旦處理完了,就立即告知主進(jìn)程:我好了額,你可以處理我的結(jié)果了。主進(jìn)程則調(diào)用一個函數(shù)去處理該結(jié)果,該函數(shù)即回調(diào)函數(shù)
我們可以把耗時間(阻塞)的任務(wù)放到進(jìn)程池中,然后指定回調(diào)函數(shù)(主進(jìn)程負(fù)責(zé)執(zhí)行),這樣主進(jìn)程在執(zhí)行回調(diào)函數(shù)時就省去了I/O的過程,直接拿到的是任務(wù)的結(jié)果。
如果在主進(jìn)程中等待進(jìn)程池中所有任務(wù)都執(zhí)行完畢后,再統(tǒng)一處理結(jié)果,則無需回調(diào)函數(shù)
并行和并發(fā)
無論是并行還是并發(fā),在用戶看來都是'同時'運(yùn)行的,不管是進(jìn)程還是線程,都只是一個任務(wù)而已,真是干活的是cpu,cpu來做這些任務(wù),而一個cpu同一時刻只能執(zhí)行一個任務(wù)。
并發(fā)是偽并行,即看起來是同時運(yùn)行。單個cpu+多道技術(shù)就可以實(shí)現(xiàn)并發(fā),(并行也屬于并發(fā)),簡單的可以理解為快速在多個線程來回切換,感覺好像同時在做多個事情。
只有具備多個cpu才能實(shí)現(xiàn)并行,單核下,可以利用多道技術(shù),多個核,每個核也都可以利用多道技術(shù)(多道技術(shù)是針對單核而言的)。? 有四個核,六個任務(wù),這樣同一時間有四個任務(wù)被執(zhí)行,假設(shè)分別被分配給了cpu1,cpu2,cpu3,cpu4,一旦任務(wù)1遇到I/O就被迫中斷執(zhí)行,此時任務(wù)5就拿到cpu1的時間片去執(zhí)行,這就是單核下的多道技術(shù) ,而一旦任務(wù)1的I/O結(jié)束了,操作系統(tǒng)會重新調(diào)用它(需知進(jìn)程的調(diào)度、分配給哪個cpu運(yùn)行,由操作系統(tǒng)說了算),可能被分配給四個cpu中的任意一個去執(zhí)行。
相關(guān)推薦:《Python視頻教程》
多道技術(shù):內(nèi)存中同時存入多道(多個)程序,cpu從一個進(jìn)程快速切換到另外一個,使每個進(jìn)程各自運(yùn)行幾十或幾百毫秒,這樣,雖然在某一個瞬間,一個cpu只能執(zhí)行一個任務(wù),但在1秒內(nèi),cpu卻可以運(yùn)行多個進(jìn)程,這就給人產(chǎn)生了并行的錯覺,即偽并發(fā),以此來區(qū)分多處理器操作系統(tǒng)的真正硬件并行(多個cpu共享同一個物理內(nèi)存)。
同步執(zhí)行:一個進(jìn)程在執(zhí)行某個任務(wù)時,另外一個進(jìn)程必須等待其執(zhí)行完畢,才能繼續(xù)執(zhí)行。
異步執(zhí)行:一個進(jìn)程在執(zhí)行某個任務(wù)時,另外一個進(jìn)程無需等待其執(zhí)行完畢,就可以繼續(xù)執(zhí)行,當(dāng)有消息返回時,系統(tǒng)會通知后者進(jìn)行處理,這樣可以提高執(zhí)行效率。
舉個例子,打電話時就是同步通信,發(fā)短息時就是異步通信。
相關(guān)推薦:
Python如何實(shí)現(xiàn)線程間同步
多進(jìn)程/多線程+Queue
一般來說,在Python中編寫并發(fā)程序的經(jīng)驗(yàn)是:計算密集型任務(wù)使用多進(jìn)程,IO密集型任務(wù)使用多進(jìn)程或者多線程.另外,因?yàn)樯婕暗劫Y源共享,所以需要同步鎖等一系列麻煩的步驟,代碼編寫不直觀.另外一種好的思路是利用多進(jìn)程/多線程+Queue的方法,可以避免加鎖這樣麻煩低效的方式.
現(xiàn)在在Python2中利用Queue+多進(jìn)程的方法來處理一個IO密集型任務(wù).
假設(shè)現(xiàn)在需要下載多個網(wǎng)頁內(nèi)容并進(jìn)行解析,單進(jìn)程的方式效率很低,所以使用多進(jìn)程/多線程勢在必行.
我們可以先初始化一個tasks隊(duì)列,里面將要存儲的是一系列dest_url,同時開啟4個進(jìn)程向tasks中取任務(wù)然后執(zhí)行,處理結(jié)果存儲在一個results隊(duì)列中,最后對results中的結(jié)果進(jìn)行解析.最后關(guān)閉兩個隊(duì)列.
下面是一些主要的邏輯代碼.
# -*- coding:utf-8 -*-
#IO密集型任務(wù)
#多個進(jìn)程同時下載多個網(wǎng)頁
#利用Queue+多進(jìn)程
#由于是IO密集型,所以同樣可以利用threading模塊
import multiprocessing
def main():
tasks = multiprocessing.JoinableQueue()
results = multiprocessing.Queue()
cpu_count = multiprocessing.cpu_count() #進(jìn)程數(shù)目==CPU核數(shù)目
create_process(tasks, results, cpu_count) #主進(jìn)程馬上創(chuàng)建一系列進(jìn)程,但是由于阻塞隊(duì)列tasks開始為空,副進(jìn)程全部被阻塞
add_tasks(tasks) #開始往tasks中添加任務(wù)
parse(tasks, results) #最后主進(jìn)程等待其他線程處理完成結(jié)果
def create_process(tasks, results, cpu_count):
for _ in range(cpu_count):
p = multiprocessing.Process(target=_worker, args=(tasks, results)) #根據(jù)_worker創(chuàng)建對應(yīng)的進(jìn)程
p.daemon = True #讓所有進(jìn)程可以隨主進(jìn)程結(jié)束而結(jié)束
p.start() #啟動
def _worker(tasks, results):
while True: #因?yàn)榍懊嫠芯€程都設(shè)置了daemon=True,故不會無限循環(huán)
try:
task = tasks.get() #如果tasks中沒有任務(wù),則阻塞
result = _download(task)
results.put(result) #some exceptions do not handled
finally:
tasks.task_done()
def add_tasks(tasks):
for url in get_urls(): #get_urls() return a urls_list
tasks.put(url)
def parse(tasks, results):
try:
tasks.join()
except KeyboardInterrupt as err:
print "Tasks has been stopped!"
print err
while not results.empty():
_parse(results)
if __name__ == '__main__':
main()
利用Python3中的concurrent.futures包
在Python3中可以利用concurrent.futures包,編寫更加簡單易用的多線程/多進(jìn)程代碼.其使用感覺和Java的concurrent框架很相似(借鑒?)
比如下面的簡單代碼示例
def handler():
futures = set()
with concurrent.futures.ProcessPoolExecutor(max_workers=cpu_count) as executor:
for task in get_task(tasks):
future = executor.submit(task)
futures.add(future)
def wait_for(futures):
try:
for future in concurrent.futures.as_completed(futures):
err = futures.exception()
if not err:
result = future.result()
else:
raise err
except KeyboardInterrupt as e:
for future in futures:
future.cancel()
print "Task has been canceled!"
print e
return result
總結(jié)
要是一些大型Python項(xiàng)目也這般編寫,那么效率也太低了.在Python中有許多已有的框架使用,使用它們起來更加高效.
但是自己的一些"小打小鬧"的程序這樣來編寫還是不錯的.:)
python線程有兩種,類或者函數(shù)
后者很簡單,就跟pthread一樣用啊。
不要打印就好了,或者你自己維護(hù)一個print加個自旋鎖
第三個從但是開始就沒看懂
既然要加print那一定要加鎖了,否則是沒辦法
既然你想搞一個控制線程,那就由它來打印咯
某個時間段內(nèi),數(shù)據(jù)涌來,這就是并發(fā)。如果數(shù)據(jù)量很大,就是高并發(fā)
高并發(fā)的解決方法:
1、隊(duì)列、緩沖區(qū)
假設(shè)只有一個窗口,陸續(xù)涌入食堂的人,排隊(duì)打菜是比較好的方式
所以,排隊(duì)(隊(duì)列)是一種天然解決并發(fā)的辦法
排隊(duì)就是把人排成 隊(duì)列,先進(jìn)先出,解決了資源使用的問題
排成的隊(duì)列,其實(shí)就是一個緩沖地帶,就是 緩沖區(qū)
假設(shè)女生優(yōu)先,每次都從這個隊(duì)伍中優(yōu)先選出女生出來先打飯,這就是 優(yōu)先隊(duì)列
例如queue模塊的類Queue、LifoQueue、PriorityQueue(小頂堆實(shí)現(xiàn))
2、爭搶
只開一個窗口,有可能沒有秩序,也就是誰擠進(jìn)去就給誰打飯
擠到窗口的人占據(jù)窗口,直到打到飯菜離開
其他人繼續(xù)爭搶,會有一個人占據(jù)著窗口,可以視為鎖定窗口,窗口就不能為其他人提供服務(wù)了。
這是一種鎖機(jī)制
誰搶到資源就上鎖,排他性的鎖,其他人只能等候
爭搶也是一種高并發(fā)解決方案,但是,這樣可能不好,因?yàn)橛锌赡苡腥撕荛L時間搶不到
3、預(yù)處理
如果排長隊(duì)的原因,是由于每個人打菜等候時間長,因?yàn)橐缘牟藳]有,需要現(xiàn)做,沒打著飯不走開,鎖定著窗口
食堂可以提前統(tǒng)計大多數(shù)人最愛吃的菜品,將最愛吃的80%的熱門菜,提前做好,保證供應(yīng),20%的冷門菜,現(xiàn)做
這樣大多數(shù)人,就算鎖定窗口,也很快打到飯菜走了,快速釋放窗口
一種提前加載用戶需要的數(shù)據(jù)的思路,預(yù)處理 思想,緩存常用
更多Python知識,請關(guān)注:Python自學(xué)網(wǎng)!!