目錄
成都創(chuàng)新互聯(lián)公司是一家企業(yè)級云計算解決方案提供商,超15年IDC數(shù)據(jù)中心運(yùn)營經(jīng)驗。主營GPU顯卡服務(wù)器,站群服務(wù)器,成都移動機(jī)房,海外高防服務(wù)器,機(jī)柜大帶寬、租用·托管,動態(tài)撥號VPS,海外云手機(jī),海外云服務(wù)器,海外服務(wù)器租用托管等。socketserver模塊:...1
編程接口:...2
總結(jié),創(chuàng)建服務(wù)器步驟:...4
例,實(shí)現(xiàn)EchoServer:...4
例,改寫ChatServer:...5
socket過于底層,編程雖有套路,但想要寫出健壯的代碼比較困難,所以很多語言都對socket底層API進(jìn)行封裝,py的封裝就是socketserver模塊,網(wǎng)絡(luò)服務(wù)編程框架,全球企業(yè)級快速開發(fā);
socketserver簡化了網(wǎng)絡(luò)服務(wù)器的編寫;
+------------+
| BaseServer |
+------------+
|
v
+-----------+??????? +------------------+
| TCPServer |------->| UnixStreamServer |
+-----------+??????? +------------------+
?????????|
v
+-----------+??????? +--------------------+
| UDPServer |------->| UnixDatagramServer |
+-----------+??????? +--------------------+
4個sync同步類:
TCPServer、UDPServer、UnixStreamServer、UnixDatagramServer;
很少用;
2個mixin類:
ForkingMixIn、ThreadingMixIn;
4個async異步類,生產(chǎn)中常用:
ForkingTCPServer(ForkingMixIn,TCPServer)、ForkingUDPServer(ForkingMixIn,UDPServer)?? #創(chuàng)建多進(jìn)程
ThreadingTCPServer(ThreadingMixIn,TCPServer)、ThreadingUDPServer(ThreadingMixIn,UDPServer)??#創(chuàng)建多線程
注:
一般ThreadingTCPServer夠用;
如果并發(fā)很高可考慮用ForkingTCPServer;
ThreadingUDPServer甚至也很少用,盡管在LAN中,如果忙起來時接收到的包的順序是亂的;
class BaseServer:
def __init__(self, server_address, RequestHandlerClass):?? #服務(wù)器綁定的地址信息;用于處理請求,該類必須是BaseRequestHandler類的子類
def finish_request(self, request, client_address):?? #處理請求的方法
"""Finish one request by instantiating RequestHandlerClass."""
self.RequestHandlerClass(request, client_address, self)?? #實(shí)例化,RequesthandlerClass的構(gòu)造
查看源碼,寫框架的思想:
class BaseRequestHandler:?? #和用戶連接的用戶請求處理類,server實(shí)例接收用戶請求后,最后會實(shí)例化這個類;它會一次調(diào)用三個函數(shù)setup()(每一個連接初始化)、handler()(每一次請求處理,必須覆蓋)、finish()(每一個連接清理),子類可覆蓋
def __init__(self, request, client_address, server):?? #初始化時送入3個構(gòu)造參數(shù),request、client_address、server(TCPServer),以后可在BaseRequestHandler類的實(shí)例上使用self.request(和client連接的socket對象)、self.cleint_address(是客戶端地址)、self.server(是TCPServer本身)
self.request = request
self.client_address = client_address
self.server = server
self.setup()
try:
self.handle()
finally:
self.finish()
def setup(self):?? #每一個連接初始化,初始化工作,如ChatServer中維護(hù)的數(shù)據(jù)結(jié)構(gòu)放到此段;實(shí)現(xiàn)了這三個方法,只不過是空操作,而raise NotImplementedError稱為抽象,不實(shí)現(xiàn)
pass
def handle(self):?? #每一次請求處理,必須覆蓋;handle()和sock.accept()對應(yīng),用戶連接請求過來后,建立連接并生成一個socket對象(保存在self.request中)和客戶端地址(保存在self.client_address中),之后的操作就和socket編程一樣了
pass
def finish(self):?? #每一個連接清理,清理工作
pass
注:
setup()和finish()只執(zhí)行一次;
handler()在不加鎖情況下,也是執(zhí)行一次;
例:
class MyHandler(socketserver.BaseRequestHandler):?? #右鍵MyHandler,Generate-->Overwrite Methods,可快速生成要覆蓋的方法
def handle(self):
super().handle()?? #此句可不寫,因為父類中的handler()為空操作;但如果是StreamRequestHandler則必須要寫,該類中實(shí)現(xiàn)了handler()方法
print(self.request, self.client_address, self.server)
print('{} handler'.format(self.__class__))
print(self.__dict__)
print(type(self).__dict__)
print(self.__class__.__bases__[0].__dict__)
print(threading.enumerate(), threading.current_thread())
# pass?? #TODO? ?#提醒自己還沒寫完
print('come')
for i in range(3):?? #client和server端長時間連接,在handler里循環(huán);分布式服務(wù)之間需傳遞心跳包(傳遞事務(wù)、節(jié)點(diǎn)信息等),服務(wù)之間要長連接,不能斷;數(shù)據(jù)庫連接池不應(yīng)用長連接,傳完數(shù)據(jù)就可斷開,有很多連接等著連DB
data = self.request.recv(1024)
print(data)
addr = ('127.0.0.1', 9998)
server = socketserver.ThreadingTCPServer(addr, MyHandler)?? #用多client連接測
# server = socketserver.TCPServer(addr, MyHandler)?? #同步,等前一個連接斷開后,才能接收并處理下一個連接的請求
server.serve_forever()?? #啟動大循環(huán),類似while
server.shutdown()
server.server_close()?? #建議關(guān)閉連接前先server.shutdown()
輸出:
{'request':
{'__doc__': None, '__module__': '__main__', 'handle':
(
{'setup':
[<_MainThread(MainThread, started 4136)>,
come
1、class MyHandler(socketserver.BaseRequestHandler):,通過對BaseRequestHandler類進(jìn)行子類化并覆蓋其handle()方法,來創(chuàng)建請求處理程序類,此方法處理傳入請求;
2、server=socketserver.ThreadingTCPServer(addr,MyHandler),必須實(shí)例化一個服務(wù)器類,并向其傳入服務(wù)器的地址和請求處理程序類;
3、server.serve_forever()或server.handle_request(),調(diào)用服務(wù)器對象的serve_forever()(一直啟動)或server.handle_request()(一次性的)方法;
4、server.shutdown()、server.close(),調(diào)用server.close()(關(guān)閉套接字)前先server.shutdown()等待停止server.serve_forever();
為每一個連接提供RequestHandlerClass類實(shí)例,一次調(diào)用setup()、handler()、finish()方法,且使用了try...finally結(jié)構(gòu)(查看BaseRequestHandler源碼)保證finish()方法一定能被調(diào)用,這些方法一次執(zhí)行完成;
如果想維持這個連接與客戶端通信,需要在handler()中使用循環(huán);
socketserver模塊提供不同的類,但編程接口是一樣的,即使是多進(jìn)程、多線程的類也是一樣,大大減少了編程的難度;
client發(fā)來什么,就返回什么消息;
class EchoHandler(socketserver.BaseRequestHandler):
def setup(self):
super().setup()
self.event = threading.Event()
def handle(self):
super().handle()
while not self.event.is_set():
data = self.request.recv(1024)
data = data.decode()
msg = 'ack: {} {}'.format(self.client_address, data)
msg = msg.encode()
self.request.send(msg)
print('end')
def finish(self):
super().finish()
self.event.set()
addr = ('127.0.0.1', 9998)
server = socketserver.ThreadingTCPServer(addr, EchoHandler)
# server.serve_forever()
server_thread = threading.Thread(target=server.serve_forever, daemon=True)
server_thread.start()
# server.shutdown()
# server.server_close()
try:
while True:
cmd = input('>>> ')
if cmd.strip() == 'quit':?? #只有在client都斷開,與server端沒有連接時才正常退出
break
except Exception as e:
print(e)
except KeyboardInterrupt:
print('exit')
finally:
server.shutdown()
server.server_close()
如果使用文件處理,使用StreamRequestHandler;
可用心跳機(jī)制;
class ChatHandler(socketserver.BaseRequestHandler):
clients = {}
def setup(self):
super().setup()
self.event = threading.Event()
print(self.client_address, threading.current_thread(), self.clients)
def handle(self):
super().handle()
while not self.event.is_set():
try:?? #緩沖區(qū)異常、連接異常最好自己捕獲到,雖然父類中有try,但最好自己捕獲
data = self.request.recv(1024).decode().strip()
if len(data) == 0:?? #同if not data,解決client主動斷開后產(chǎn)生的異常,20180901追加尚未測試
raise BrokenPipeError('client broken')
except Exception as e:
logging.info(e)
data = 'quit'?? #技巧,某個連接一旦有問題,會有各種異常,此處直接斷開
logging.info(data)
if data == 'quit':
break
????????self.clients[self.client_address] = self.request
msg = 'ack: {}'.format(data)
for c in self.clients.values():
c.send(msg.encode())
def finish(self):
super().finish()
self.clients.pop(self.client_address)
self.event.set()
addr = ('127.0.0.1', 9998)
server = socketserver.ThreadingTCPServer(addr, ChatHandler)
server_thread = threading.Thread(target=server.serve_forever, daemon=True)
server_thread.start()
myutils.show_threads()?? #在主線程中就可,沒必要放到工作線程中
try:
while True:
cmd = input('>>> ').strip()
if cmd == 'quit':
break
except Exception as e:
print(e)
except KeyboardInterrupt:
print('exit')
finally:
server.shutdown()
server.server_close()
輸出:
>>> [
('127.0.0.1', 8000)
[
('127.0.0.1', 8003)
[
[
2018-08-24-09:33:36?????? Thread info: 9456 Thread-3 test
[
2018-08-24-09:33:41?????? Thread info: 4008 Thread-2 test
[
2018-08-24-09:33:48?????? Thread info: 9456 Thread-3 test2
[
2018-08-24-09:33:51?????? Thread info: 4008 Thread-2 test1
[
2018-08-24-09:33:53?????? Thread info: 4008 Thread-2
2018-08-24-09:33:53?????? Thread info: 4008 Thread-2 [WinError 10053] 您的主機(jī)中的軟件中止了一個已建立的連接。
2018-08-24-09:33:53?????? Thread info: 4008 Thread-2 quit
2018-08-24-09:33:55?????? Thread info: 9456 Thread-3
2018-08-24-09:33:55?????? Thread info: 9456 Thread-3 [WinError 10053] 您的主機(jī)中的軟件中止了一個已建立的連接。
2018-08-24-09:33:55?????? Thread info: 9456 Thread-3 quit
[
quit
另外有需要云服務(wù)器可以了解下創(chuàng)新互聯(lián)cdcxhl.cn,海內(nèi)外云服務(wù)器15元起步,三天無理由+7*72小時售后在線,公司持有idc許可證,提供“云服務(wù)器、裸金屬服務(wù)器、高防服務(wù)器、香港服務(wù)器、美國服務(wù)器、虛擬主機(jī)、免備案服務(wù)器”等云主機(jī)租用服務(wù)以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡單易用、服務(wù)可用性高、性價比高”等特點(diǎn)與優(yōu)勢,專為企業(yè)上云打造定制,能夠滿足用戶豐富、多元化的應(yīng)用場景需求。