Python 利用selectors模塊實(shí)現(xiàn)非阻塞式編程?很多新手對(duì)此不是很清楚,為了幫助大家解決這個(gè)難題,下面小編將為大家詳細(xì)講解,有這方面需求的人可以來(lái)學(xué)習(xí)下,希望你能有所收獲。
東乃ssl適用于網(wǎng)站、小程序/APP、API接口等需要進(jìn)行數(shù)據(jù)傳輸應(yīng)用場(chǎng)景,ssl證書未來(lái)市場(chǎng)廣闊!成為成都創(chuàng)新互聯(lián)的ssl證書銷售渠道,可以享受市場(chǎng)價(jià)格4-6折優(yōu)惠!如果有意向歡迎電話聯(lián)系或者加微信:13518219792(備注:SSL證書合作)期待與您的合作!
前面介紹的 socket 都是采用阻塞方式進(jìn)行通信的,當(dāng)程序調(diào)用 recv() 方法從 socket 中讀取數(shù)據(jù)時(shí),如果沒(méi)有讀取到有效的數(shù)據(jù),當(dāng)前線程就會(huì)被阻塞。為了解決這個(gè)問(wèn)題,上面程序采用了多線程并發(fā)編程,即服務(wù)器端為每個(gè)客戶端連接都啟動(dòng)一個(gè)單獨(dú)的線程,不同的線程負(fù)責(zé)對(duì)應(yīng)的 socket 的通信工作。
通過(guò) selectors 模塊允許 socket 以非阻塞方式進(jìn)行通信,selectors 相當(dāng)于一個(gè)事件注冊(cè)中心,程序只要將 socket 的所有事件注冊(cè)給 selectors 管理,當(dāng) selectors 檢測(cè)到 socket 中的特定事件之后,程序就調(diào)用相應(yīng)的監(jiān)聽(tīng)方法進(jìn)行處理。
selectors 主要支持兩種事件:
selectors.EVENT_READ:當(dāng) socket 有數(shù)據(jù)可讀時(shí)觸發(fā)該事件。當(dāng)有客戶端連接進(jìn)來(lái)時(shí)也會(huì)觸發(fā)該事件。
selectors.EVENT_WRITE:當(dāng) socket 將要寫數(shù)據(jù)時(shí)觸發(fā)該事件。
使用 selectors 實(shí)現(xiàn)非阻塞式編程的步驟大致如下:
創(chuàng)建 selectors 對(duì)象。
通過(guò) selectors 對(duì)象為 socket 的 selectors.EVENT_READ 或 selectors.EVENT_WRITE 事件注冊(cè)監(jiān)聽(tīng)器函數(shù)。每當(dāng) socket 有數(shù)據(jù)需要讀寫時(shí),系統(tǒng)負(fù)責(zé)觸發(fā)所注冊(cè)的監(jiān)昕器函數(shù)。
在監(jiān)聽(tīng)器函數(shù)中處理 socket 通信。
下面程序使用 selectors 模塊實(shí)現(xiàn)非阻塞式通信的服務(wù)器端:
import selectors, socket # 創(chuàng)建默認(rèn)的selectors對(duì)象 sel = selectors.DefaultSelector() # 負(fù)責(zé)監(jiān)聽(tīng)“有數(shù)據(jù)可讀”事件的函數(shù) def read(skt, mask): try: # 讀取數(shù)據(jù) data = skt.recv(1024) if data: # 將讀取的數(shù)據(jù)采用循環(huán)向每個(gè)socket發(fā)送一次 for s in socket_list: s.send(data) # Hope it won't block else: # 如果該socket已被對(duì)方關(guān)閉,關(guān)閉該socket, # 并從socket_list列表中刪除 print('關(guān)閉', skt) sel.unregister(skt) skt.close() socket_list.remove(skt) # 如果捕捉到異常, 將該socket關(guān)閉,并從socket_list列表中刪除 except: print('關(guān)閉', skt) sel.unregister(skt) skt.close() socket_list.remove(skt) socket_list = [] # 負(fù)責(zé)監(jiān)聽(tīng)“客戶端連接進(jìn)來(lái)”事件的函數(shù) def accept(sock, mask): conn, addr = sock.accept() # 使用socket_list保存代表客戶端的socket socket_list.append(conn) conn.setblocking(False) # 使用sel為conn的EVENT_READ事件注冊(cè)read監(jiān)聽(tīng)函數(shù) sel.register(conn, selectors.EVENT_READ, read) #② sock = socket.socket() sock.bind(('192.168.1.88', 30000)) sock.listen() # 設(shè)置該socket是非阻塞的 sock.setblocking(False) # 使用sel為sock的EVENT_READ事件注冊(cè)accept監(jiān)聽(tīng)函數(shù) sel.register(sock, selectors.EVENT_READ, accept) #① # 采用死循環(huán)不斷提取sel的事件 while True: events = sel.select() for key, mask in events: # key的data屬性獲取為該事件注冊(cè)的監(jiān)聽(tīng)函數(shù) callback = key.data # 調(diào)用監(jiān)聽(tīng)函數(shù), key的fileobj屬性獲取被監(jiān)聽(tīng)的socket對(duì)象 callback(key.fileobj, mask)
上面程序中定義了兩個(gè)監(jiān)聽(tīng)器函數(shù) accept() 和 read(),其中 accept() 函數(shù)作為“有客戶端連接進(jìn)來(lái)”事件的監(jiān)聽(tīng)函數(shù),主程序中的 ① 號(hào)代碼負(fù)責(zé)為 socket 的 selectors.EVENT_READ 事件注冊(cè)該函數(shù);read() 函數(shù)則作為“有數(shù)據(jù)可讀”事件的監(jiān)聽(tīng)函數(shù),如 accept() 函數(shù)中的 ② 號(hào)代碼所示。
通過(guò)上面這種方式,程序避免了采用死循環(huán)不斷地調(diào)用 socket 的 accept() 方法來(lái)接受客戶端連接,也避免了采用死循環(huán)不斷地調(diào)用 socket 的 recv() 方法來(lái)接收數(shù)據(jù)。socket 的 accept()、recv() 方法調(diào)用都是寫在事件監(jiān)聽(tīng)函數(shù)中的,只有當(dāng)事件(如“有客戶端連接進(jìn)來(lái)”事件、“有數(shù)據(jù)可讀”事件)發(fā)生時(shí),accept() 和 recv() 方法才會(huì)被調(diào)用,這樣就避免了阻塞式編程。
為了不斷地提取 selectors 中的事件,程序最后使用一個(gè)死循環(huán)不斷地調(diào)用 selectors 的 select() 方法“監(jiān)測(cè)”事件,每當(dāng)監(jiān)測(cè)到相應(yīng)的事件之后,程序就會(huì)調(diào)用對(duì)應(yīng)的事件監(jiān)聽(tīng)函數(shù)。
下面是該示例的客戶端程序。該客戶端程序更加簡(jiǎn)單,客戶端程序只需要讀取 socket 中的數(shù)據(jù),因此只要使用 selectors 為 socket 注冊(cè)“有數(shù)據(jù)可讀”事件的監(jiān)聽(tīng)函數(shù)即可。
import selectors, socket, threading # 創(chuàng)建默認(rèn)的selectors對(duì)象 sel = selectors.DefaultSelector() # 負(fù)責(zé)監(jiān)聽(tīng)“有數(shù)據(jù)可讀”事件的函數(shù) def read(conn, mask): data = conn.recv(1024) # Should be ready if data: print(data.decode('utf-8')) else: print('closing', conn) sel.unregister(conn) conn.close() # 創(chuàng)建socket對(duì)象 s = socket.socket() # 連接遠(yuǎn)程主機(jī) s.connect(('192.168.1.88', 30000)) # 設(shè)置該socket是非阻塞的 s.setblocking(False) # 使用sel為s的EVENT_READ事件注冊(cè)read監(jiān)聽(tīng)函數(shù) sel.register(s, selectors.EVENT_READ, read) # ① # 定義不斷讀取用戶鍵盤輸入的函數(shù) def keyboard_input(s): while True: line = input('') if line is None or line == 'exit': break # 將用戶的鍵盤輸入內(nèi)容寫入socket s.send(line.encode('utf-8')) # 采用線程不斷讀取用戶的鍵盤輸入 threading.Thread(target=keyboard_input, args=(s, )).start() while True: # 獲取事件 events = sel.select() for key, mask in events: # key的data屬性獲取為該事件注冊(cè)的監(jiān)聽(tīng)函數(shù) callback = key.data # 調(diào)用監(jiān)聽(tīng)函數(shù), key的fileobj屬性獲取被監(jiān)聽(tīng)的socket對(duì)象 callback(key.fileobj, mask)
上面程序中的 ① 號(hào)代碼為 socket 的 EVENT_READ 事件注冊(cè)了 read() 監(jiān)聽(tīng)函數(shù),這樣每當(dāng) socket 中有數(shù)據(jù)可讀時(shí),程序就會(huì)觸發(fā) read() 函數(shù)來(lái)讀取 socket 中的數(shù)據(jù)。
程序最后也采用死循環(huán)不斷地調(diào)用 selectors 的 select() 方法“監(jiān)測(cè)”事件,每當(dāng)監(jiān)測(cè)到相應(yīng)的事件之后,程序就會(huì)調(diào)用對(duì)應(yīng)的事件監(jiān)聽(tīng)函數(shù)。
先運(yùn)行上面的服務(wù)器端程序,該程序運(yùn)行后只是作為服務(wù)器,看不到任何輸出信息。再運(yùn)行多個(gè)客戶端程序(相當(dāng)于啟動(dòng)多個(gè)聊天室客戶端登錄該服務(wù)器)。接下來(lái)可以在任何一個(gè)客戶端通過(guò)鍵盤輸入一些內(nèi)容,然后按回車鍵,即可在所有客戶端(包括自己)的控制臺(tái)上接收到剛剛輸入的內(nèi)容。這也是一個(gè)粗略的 C/S 結(jié)構(gòu)的聊天室應(yīng)用。
看完上述內(nèi)容是否對(duì)您有幫助呢?如果還想對(duì)相關(guān)知識(shí)有進(jìn)一步的了解或閱讀更多相關(guān)文章,請(qǐng)關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝您對(duì)創(chuàng)新互聯(lián)的支持。