threading模塊
讓客戶滿意是我們工作的目標(biāo),不斷超越客戶的期望值來(lái)自于我們對(duì)這個(gè)行業(yè)的熱愛(ài)。我們立志把好的技術(shù)通過(guò)有效、簡(jiǎn)單的方式提供給客戶,將通過(guò)不懈努力成為客戶在信息化領(lǐng)域值得信任、有價(jià)值的長(zhǎng)期合作伙伴,公司提供的服務(wù)項(xiàng)目有:國(guó)際域名空間、網(wǎng)頁(yè)空間、營(yíng)銷軟件、網(wǎng)站建設(shè)、泰來(lái)網(wǎng)站維護(hù)、網(wǎng)站推廣。
局域網(wǎng)IP掃描實(shí)例
# 單線程:
import subprocess,time,threading a = time.clock() with open("check_ping.txt",'w') as f: for i in range(1,20): my_ip = ".".join(["192.163.37",str(i)]) try: subprocess.check_call('ping -n 1 -w 1 %s'%(my_ip),shell=True) except subprocess.CalledProcessError: pass else: f.write("%s 可以ping通\n"%my_ip) b = time.clock() print(b) # 總共花費(fèi)8s多
# 多線程(一)創(chuàng)建 Thread 的實(shí)例,傳給它一個(gè)函數(shù)
a = time.clock() def check_ping(IP,obj): try: subprocess.check_call('ping -n 1 -w 1 %s' % IP, shell=True) except subprocess.CalledProcessError: pass else: obj.write("%s 可以ping通\n" % IP) def main(): threads = [] with open("check_ping_1.txt", 'w') as f: for i in range(1, 20): my_ip = ".".join(["192.163.37", str(i)]) t = threading.Thread(target=check_ping,args=(my_ip,f)) #Thread方法:實(shí)例化一個(gè)線程對(duì)象,把函數(shù)(target)和參數(shù)(args)傳進(jìn)去,然后返回Thread實(shí)例,這里并沒(méi)有執(zhí)行。 threads.append(t) num = range(len(threads)) for i in num: threads[i].start() #執(zhí)行線程的start方法,線程開(kāi)始執(zhí)行 for i in num: threads[i].join() #這行線程的join方法,等待線程結(jié)束,如果主進(jìn)程不需要等待線程結(jié)束,可以不需要調(diào)用join方法。 if __name__ == '__main__': main() b = time.clock() print(b) # 總共花費(fèi)0.9s
# 多線程(二)創(chuàng)建 Thread 的實(shí)例,傳給它一個(gè)可調(diào)用的類實(shí)例
a = time.clock() class Thread_func: def __init__(self,func,args): self.func = func self.args = args def __call__(self): #__call__方法:將類模擬成函數(shù),實(shí)例化后的類再次實(shí)例化相當(dāng)于執(zhí)行了__call__方法。 self.func(*self.args) def check_ping(IP,obj): try: subprocess.check_call('ping -n 1 -w 1 %s' % IP, shell=True) except subprocess.CalledProcessError: pass else: obj.write("%s 可以ping通\n" % IP) def main(): threads = [] with open("check_ping_2.txt", 'w') as f: for i in range(1, 20): my_ip = ".".join(["192.163.37", str(i)]) t = threading.Thread(target=Thread_func(check_ping,(my_ip,f))) #Thread_func實(shí)例化時(shí)已經(jīng)傳入了參數(shù),所以Thread方法中就不用args來(lái)傳參了。 threads.append(t) num = range(len(threads)) for i in num: threads[i].start() for i in num: threads[i].join() if __name__ == '__main__': main() b = time.clock() print(b) # 總共花費(fèi)1
# 多線程(三)派生 Thread 的子類,并創(chuàng)建子類的實(shí)例
a = time.clock() class Thread_func(threading.Thread): #繼承Thread,調(diào)用更靈活。 def __init__(self,func,args): threading.Thread.__init__(self) self.func = func self.args = args def run(self): #這里必須使用run方法,當(dāng)線程開(kāi)啟后執(zhí)行。 self.func(*self.args) def check_ping(IP,obj): try: subprocess.check_call('ping -n 1 -w 1 %s' % IP, shell=True) except subprocess.CalledProcessError: pass else: obj.write("%s 可以ping通\n" % IP) def main(): threads = [] with open("check_ping_3.txt", 'w') as f: for i in range(1, 20): my_ip = ".".join(["192.163.37", str(i)]) t = Thread_func(check_ping,(my_ip,f)) threads.append(t) num = range(len(threads)) for i in num: threads[i].start() for i in num: threads[i].join() if __name__ == '__main__': main() b = time.clock() print(b) # 花費(fèi)0.7