倘若不使用RPC遠(yuǎn)端調(diào)用的情況下,代碼如下:
local.py
# coding:utf-8
# 本地調(diào)用除法運(yùn)算的形式
class InvalidOperation(Exception):
def __init__(self, message = None):
self.message = message or 'involid operation'
def divide(num1, num2 = 1):
if num2 == 0:
raise InvalidOperation
res = num1 / num2
return res
try:
val = divide(200, 100)
except InvalidOperation as e:
print(e.message)
else:
print(val)
接下來(lái)將使用RPC二進(jìn)制的形式,遠(yuǎn)程過(guò)程調(diào)用代碼如下。
service.py 中自定義需要實(shí)現(xiàn)消息協(xié)議、傳輸控制,并且實(shí)現(xiàn)客戶(hù)端存根clientStub和服務(wù)器端存根serverStub,服務(wù)器定義以及channel的定義。
import struct
from io import BytesIO
import socket
class InvalidOperation(BaseException):
def __init__(self, message = None):
self.message = message or 'involid operation'
class MethodProtocol(object):
''''
解讀方法名
'''
def __init__(self, connection):
self.conn = connection
def _read_all(self, size):
"""
幫助我們讀取二進(jìn)制數(shù)據(jù)
:param size: 想要讀取的二進(jìn)制數(shù)據(jù)大小
:return: 二進(jìn)制數(shù)據(jù)bytes
"""
# self.conn
if isinstance(self.conn, BytesIO):
buff = self.conn.read(size)
return buff
else:
# 有時(shí)候長(zhǎng)度大于每次讀取的長(zhǎng)度
have = 0
buff = b''
while have < size:
chunk = self.conn.recv(size - have)
buff += chunk
l = len(chunk)
have += l
if l == 0:
# 表示客戶(hù)端已經(jīng)關(guān)閉了
raise EOFError
return buff
def get_method_name(self):
# 讀取字符串長(zhǎng)度
buff = self._read_all(4)
length = struct.unpack('!I',buff)[0]
# 讀取字符串
buff = self._read_all(length)
name = buff.decode()
return name
class DivideProtocol(object):
"""
divide過(guò)程消息協(xié)議轉(zhuǎn)換工具
"""
def args_encode(self, num1, num2=1):
"""
將原始調(diào)用的請(qǐng)求參數(shù)轉(zhuǎn)換打包成二進(jìn)制消息數(shù)據(jù)
:param num1: int
:param num2: int
:return: bytes 二進(jìn)制消息數(shù)據(jù)
"""
name = 'divide'
# 處理函數(shù)名
buff = struct.pack('!I', 6) # 無(wú)符號(hào)int
buff += name.encode()
# 處理參數(shù)1
buff2 = struct.pack('!B', 1) # 無(wú)符號(hào)byte
buff2 += struct.pack('!i', num1)
# 處理參數(shù)2
if num2 != 1:
# 沒(méi)有傳參的時(shí)候
buff2 += struct.pack('!B', 2)
buff2 += struct.pack('!i', num2)
# 處理參數(shù)邊界和組合成完整數(shù)據(jù)
buff += struct.pack('!I',len(buff2))
buff += buff2
return buff
def _read_all(self, size):
"""
幫助我們讀取二進(jìn)制數(shù)據(jù)
:param size: 想要讀取的二進(jìn)制數(shù)據(jù)大小
:return: 二進(jìn)制數(shù)據(jù)bytes
"""
# self.conn
if isinstance(self.conn, BytesIO):
buff = self.conn.read(size)
return buff
else:
# 有時(shí)候長(zhǎng)度大于每次讀取的長(zhǎng)度
have = 0
buff = b''
while have < size:
chunk = self.conn.recv(size - have)
buff += chunk
l = len(chunk)
have += l
if l == 0:
# 表示客戶(hù)端已經(jīng)關(guān)閉了
raise EOFError
return buff
def args_decode(self, connection):
"""
接受調(diào)用請(qǐng)求數(shù)據(jù)病進(jìn)行解析
:param connection: 鏈接請(qǐng)求數(shù)據(jù) socket BytesIO
:return: 因?yàn)橛卸鄠€(gè)參數(shù),定義為字典
"""
param_len_map = {
1:4,
2:4,
}
param_fmt_map = {
1:'!i',
2:'!i',
}
param_name_map = {
1: 'num1',
2: 'num2',
}
# 保存用來(lái)返回的參數(shù)字典
args = {}
self.conn = connection
# 處理方法的名字,已經(jīng)提前被處理,稍后處理
# 處理消息邊界
# 1) 讀取二進(jìn)制數(shù)據(jù)----read , ------ByteIO.read
# 2) 將二進(jìn)制數(shù)據(jù)轉(zhuǎn)換為python的數(shù)據(jù)類(lèi)型
buff = self._read_all(4)
length = struct.unpack('!I',buff)[0]
# 記錄已經(jīng)讀取的長(zhǎng)度值
have = 0
# 處理第一個(gè)參數(shù)
# 解析參數(shù)序號(hào)
buff = self._read_all(1)
have += 1
param_seq = struct.unpack('!B', buff)[0]
# 解析參數(shù)值
param_len = param_len_map[param_seq]
buff = self._read_all(param_len)
have += param_len
param_fmt = param_fmt_map[param_seq]
param = struct.unpack(param_fmt,buff)[0]
# 設(shè)置解析后的字典
param_name = param_name_map[param_seq]
args[param_name] = param
if have >= length:
return args
# 處理第二個(gè)參數(shù)
# 解析參數(shù)序號(hào)
buff = self._read_all(1)
param_seq = struct.unpack('!B', buff)[0]
# 解析參數(shù)值
param_len = param_len_map[param_seq]
buff = self._read_all(param_len)
param_fmt = param_fmt_map[param_seq]
param = struct.unpack(param_fmt, buff)[0]
# 設(shè)置解析后的字典
param_name = param_name_map[param_seq]
args[param_name] = param
return args
def result_encode(self, result):
"""
將原始結(jié)果數(shù)據(jù)轉(zhuǎn)換為消息協(xié)議二進(jìn)制數(shù)據(jù)
:param result:
:return:
"""
if isinstance(result,float):
# 處理返回值類(lèi)型
buff = struct.pack('!B', 1)
buff += struct.pack('!f', result)
return buff
else:
buff = struct.pack('!B', 2)
# 處理返回值
length = len(result.message)
# 處理字符串長(zhǎng)度
buff += struct.pack('!I', length)
buff += result.message.encode()
return buff
def result_decode(self, connection):
"""
將返回值消息數(shù)據(jù)轉(zhuǎn)換為原始返回值
:param connection: socket BytesIo
:return: float InvalidOperation對(duì)象
"""
self.conn = connection
# 處理返回值類(lèi)型
buff = self._read_all(1)
result_type = struct.unpack('!B', buff)[0]
if result_type == 1:
#正常情況
buff = self._read_all(4)
val = struct.unpack('!f', buff)[0]
return val
else:
buff = self._read_all(4)
length = struct.unpack('!I', buff)[0]
# 讀取字符串
buff = self._read_all(length)
message = buff.decode(buff)
return InvalidOperation(message)
class Channel(object):
"""
用于客戶(hù)端建立網(wǎng)絡(luò)鏈接
"""
def __init__(self, host, port):
self.host = host
self.port = port
def get_connection(self):
"""
獲取鏈接對(duì)象
:return: 與服務(wù)器通訊的socket
"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((self.host, self.port))
return sock
class Server(object):
"""
RPC服務(wù)器
"""
def __init__(self, host, port, handlers):
sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
# 地址復(fù)用
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.host = host
self.port = port
# 綁定地址
sock.bind((self.host, self.port))
# 因?yàn)樵趩?dòng)的方法中才開(kāi)啟監(jiān)聽(tīng),所以不在此處開(kāi)啟
# sock.listen(128)
self.sock = sock
self.handlers = handlers
def serve(self):
"""
開(kāi)啟服務(wù)器運(yùn)行,提供RPC服務(wù)
:return:
"""
# 開(kāi)啟服務(wù)器的監(jiān)聽(tīng),等待客戶(hù)端的鏈接請(qǐng)求
self.sock.listen(128)
print("服務(wù)器開(kāi)啟監(jiān)聽(tīng),ip地址為%s,port為%d..." % (self.host,self.port))
while True:
# 不斷的接收客戶(hù)端的鏈接請(qǐng)求
client_sock, client_addr = self.sock.accept()
print("與客戶(hù)端%s建立連接" % str(client_addr))
# 交個(gè)ServerStub,完成客戶(hù)端的具體的RPC的調(diào)用請(qǐng)求
stub = ServerStub(client_sock, self.handlers)
try:
while True:
# 不斷的接收
stub.process()
except EOFError:
# 表示客戶(hù)端關(guān)閉了連接
print('客戶(hù)端關(guān)閉了連接')
client_sock.close()
class ClientStub(object):
"""
用來(lái)幫助客戶(hù)端完成遠(yuǎn)程過(guò)程調(diào)用 RPC調(diào)用
stub = ClientStub()
stub.divide(200, 100)
"""
def __init__(self, channel):
self.channel = channel
self.conn = self.channel.get_connection()
def divide(self, num1, num2 = 1):
# 將調(diào)用的參數(shù)打包成消息協(xié)議的數(shù)據(jù)
proto = DivideProtocol()
args = proto.args_encode(num1, num2)
# 將消息數(shù)據(jù)通過(guò)網(wǎng)絡(luò)發(fā)送給服務(wù)器
self.conn.sendall(args)
# 接受服務(wù)器返回的消息數(shù)據(jù),并進(jìn)行解析
result = proto.result_decode(self.conn)
# 將結(jié)果之(正常float 或 異常InvalidOperation)返回給客戶(hù)端
if isinstance(result,float):
return result
else:
raise result
class ServerStub(object):
"""
服務(wù)端存根
幫助服務(wù)端完成遠(yuǎn)端過(guò)程調(diào)用
"""
def __init__(self, connection, handlers):
"""
:param connection: 與客戶(hù)端的鏈接
:param handlers: 真正的本地函數(shù)路由
此處不以map的形式處理,實(shí)現(xiàn)類(lèi)的形式
class Handler:
@staticmethod
def divide():
pass
@staticmethod
def add():
pass
"""
self.conn = connection
self.method_proto = MethodProtocol(self.conn)
self.process_map = {
'divide': self._process_divide,
'add': self._process_add
}
self.handlers = handlers
def process(self):
"""
當(dāng)服務(wù)端接受了客戶(hù)的鏈接,建立好鏈接后,完成遠(yuǎn)端調(diào)用的處理
:return:
"""
# 接收消息數(shù)據(jù),并解析方法的名字
name = self.method_proto.get_method_name()
# 根據(jù)解析獲得的方法名,調(diào)用相應(yīng)的過(guò)程協(xié)議,接收并解析消息數(shù)據(jù)
self.process_map[name]()
def _process_divide(self):
"""
處理除法過(guò)程調(diào)用
:return:
"""
proto = DivideProtocol()
args = proto.args_decode(self.conn)
# args = {'num1':xxx, 'num2':xxx}
# 除法過(guò)程的本地調(diào)用------------------->>>>>>>>>
# 將本地調(diào)用過(guò)程的返回值(包括可能的異常)打包成消息協(xié)議的數(shù)據(jù),通過(guò)網(wǎng)絡(luò)返回給客戶(hù)端
try:
val = self.handlers.divide(**args)
except InvalidOperation as e:
ret_message = proto.result_encode(e)
else:
ret_message = proto.result_encode(val)
self.conn.sendall(ret_message)
def _process_add(self):
"""
處理加法過(guò)程調(diào)用
此方法暫時(shí)不識(shí)閑
:return:
"""
pass
if __name__ == '__main__':
# 目的:消息協(xié)議測(cè)試,模擬網(wǎng)絡(luò)傳輸
# 構(gòu)造消息數(shù)據(jù)
proto = DivideProtocol()
# 測(cè)試一
# divide(200,100)
# message = proto.args_encode(200,100)
# 測(cè)試二
message = proto.args_encode(200)
conn = BytesIO()
conn.write(message)
conn.seek(0)
# 解析消息數(shù)據(jù)
method_proto = MethodProtocal(conn)
name = method_proto.get_method_name()
print(name)
args = proto.args_decode(conn)
print(args)
接下來(lái),只需要?jiǎng)?chuàng)建服務(wù)器實(shí)例和使用客戶(hù)端發(fā)起請(qǐng)求
server.py
from services import InvalidOperation
from services import Server
class Handlers:
@staticmethod
def divide(num1, num2 = 1):
if num2 == 0:
raise InvalidOperation('ck_god_err')
val = num1/num2
return val
if __name__ == '__main__':
# 開(kāi)啟服務(wù)器
_server = Server('127.0.0.1', 8000, Handlers)
_server.serve()
client.py
ffrom services import ClientStub
from services import Channel
from services import InvalidOperation
# 創(chuàng)建與服務(wù)器的連接
channel = Channel('127.0.0.1', 8000)
# 創(chuàng)建用于rpc調(diào)用的工具
stub = ClientStub(channel)
# 進(jìn)行調(diào)用
for i in range(5):
try:
# val = stub.divide(i * 100,100)
# val = stub.divide(i * 100)
val = stub.divide( 100, 0)
except InvalidOperation as e:
print(e.message)
else:
print(val)
當(dāng)然如果有必要的話(huà),可以在services.py添加如下代碼,改為多線(xiàn)程的方式,自己再重寫(xiě)創(chuàng)建實(shí)例就可以調(diào)用了。
class ThreadServer(object):
"""
多線(xiàn)成RPC服務(wù)器
"""
def __init__(self, host, port, handlers):
sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
# 地址復(fù)用
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.host = host
self.port = port
# 綁定地址
sock.bind((self.host, self.port))
# 因?yàn)樵趩?dòng)的方法中才開(kāi)啟監(jiān)聽(tīng),所以不在此處開(kāi)啟
# sock.listen(128)
self.sock = sock
self.handlers = handlers
def serve(self):
"""
開(kāi)啟服務(wù)器運(yùn)行,提供RPC服務(wù)
:return:
"""
# 開(kāi)啟服務(wù)器的監(jiān)聽(tīng),等待客戶(hù)端的鏈接請(qǐng)求
self.sock.listen(128)
print("服務(wù)器開(kāi)啟監(jiān)聽(tīng),ip地址為%s,port為%d..." % (self.host,self.port))
while True:
# 不斷的接收客戶(hù)端的鏈接請(qǐng)求
client_sock, client_addr = self.sock.accept()
print("與客戶(hù)端%s建立連接" % str(client_addr))
t = threading.Thread(target= self.handle, args=(client_sock,))
t.start()
# 子線(xiàn)程函數(shù)
def handle(self,client_sock):
"""
子線(xiàn)程調(diào)用的方法,用來(lái)處理一個(gè)客戶(hù)段的請(qǐng)求
:return:
"""
# 交個(gè)ServerStub,完成客戶(hù)端的具體的RPC的調(diào)用請(qǐng)求
stub = ServerStub(client_sock, self.handlers)
try:
while True:
# 不斷的接收
stub.process()
except EOFError:
# 表示客戶(hù)端關(guān)閉了連接
print('客戶(hù)端關(guān)閉了連接')
client_sock.close()
網(wǎng)頁(yè)標(biāo)題:自定義RPC的完整實(shí)現(xiàn)---深入理解rpc內(nèi)部原理
標(biāo)題鏈接:
http://weahome.cn/article/pcjiic.html