文章目录
- RPC
- 1.定义
- 2.概念
- 3.优缺点
- 4.RPC结构
- 5.RPC消息协议
- 5.1 消息边界
- 5.2 内容
- 5.3 压缩
- 6.RPC的实现
- 6.1 divide_protocol.py
- 6.2 server.py
- 6.3 client.py
RPC
1.定义
远程过程调用(remote procedure call)
2.概念
广义:所有通过网络进行通讯,的调用统称为RPC调用
狭义:不采用http协议的方式,采用自定义格式的二进制方式
3.优缺点
- 优点
- 效率高
- 发起rpc调用的一方,可以忽略RPC的具体实现,如同编写本地函数调用
- 缺点
- 通用性不高
4.RPC结构
- client(caller):调用者
- client stub(bundle args/unbundle ret vals):客户端存根
- client network service
- server network service
- server stub(bundle ret vals/unbundle args)
5.RPC消息协议
5.1 消息边界
- 分隔符(\r\n)
- 长度声明法(例如HTTP中 Content-Length)
5.2 内容
- 二进制
- 文本内容
5.3 压缩
- 压缩处理是一把双刃剑,减少数据量减轻带宽压力同时,额外增加了压缩和解压缩的时间
6.RPC的实现
6.1 divide_protocol.py
import struct
from io import BytesIO
class InvalidOperation(Exception):
...
class DivideProtocol(object):
"""
float divide(1:int num1, 2:int num2=1)
"""
def _read_all(self, size):
"""
读取指定长度的字节
:param size: 长度
:return: 读取出的二进制数据
"""
if isinstance(self.conn, BytesIO):
# BytesIO类型,用于演示
buff = b''
have = 0
while have < size:
chunk = self.conn.read(size - have)
have += len(chunk)
buff += chunk
return buff
else:
# socket类型
buff = b''
have = 0
while have < size:
chunk = self.conn.recv(size - have)
have += len(chunk)
buff += chunk
# 客户端关闭了连接
if len(chunk) == 0:
raise EOFError()
return buff
def args_encode(self, num1, num2=1):
"""
对调用参数进行编码
:param num1: int
:param num2: int
:return: 编码后的二进制数据
"""
# 处理参数num1, 4字节整型
buff = struct.pack('!B', 1)
buff += struct.pack('!i', num1)
# 处理参数num2, 4字节整型,如为默认值1,则不再放到消息中
if num2 != 1:
buff += struct.pack('!B', 2)
buff += struct.pack('!i', num2)
# 处理消息总长度,4字节无符号整型
length = len(buff)
# 处理方法名,字符串类型
name = 'divide'
# 字符串长度,4字节无符号整型
msg = struct.pack('!I', len(name))
msg += name.encode()
msg += struct.pack('!I', length) + buff
return msg
def args_decode(self, connection):
"""
获取调用参数并进行解码
:param connection: 传输工具对象,如socket对象或者BytesIO对象,从中可以读取消息数据
:return: 解码后的参数字典
"""
# 保存到当前对象中,供_read_all方式使用
self.conn = connection
param_name_map = {
1: 'num1',
2: 'num2'
}
param_len_map = {
1: 4,
2: 4
}
# 用于保存解码后的参数字典
args = dict()
# 读取消息总长度,4字无节符号整数
buff = self._read_all(4)
length = struct.unpack('!I', buff)[0]
# 记录已读取的长度
have = 0
# 读取第一个参数,4字节整型
buff = self._read_all(1)
have += 1
param_seq = struct.unpack('!B', buff)[0]
param_len = param_len_map[param_seq]
buff = self._read_all(param_len)
have += param_len
args[param_name_map[param_seq]] = struct.unpack('!i', buff)[0]
if have >= length:
return args
# 读取第二个参数,4字节整型
buff = self._read_all(1)
have += 1
param_seq = struct.unpack('!B', buff)[0]
param_len = param_len_map[param_seq]
buff = self._read_all(param_len)
have += param_len
args[param_name_map[param_seq]] = struct.unpack('!i', buff)[0]
return args
def result_encode(self, result):
"""
对调用的结果进行编码
:param result: float 或 InvalidOperation对象
:return: 编码后的二进制数据
"""
if isinstance(result, float):
# 没有异常,正常执行
# 处理结果类型,1字节无符号整数
buff = struct.pack('!B', 1)
# 处理结果值, 4字节float
buff += struct.pack('!f', result)
else:
# 发生了InvalidOperation异常
# 处理结果类型,1字节无符号整数
buff = struct.pack('!B', 2)
# 处理异常结果值, 字符串
# 处理字符串长度, 4字节无符号整数
buff += struct.pack('!I', len(result.message))
# 处理字符串内容
buff += result.message.encode()
return buff
def result_decode(self, connection):
"""
对调用结果进行解码
:param connection: 传输工具对象,如socket对象或者BytesIO对象,从中可以读取消息数据
:return: 结果数据
"""
self.conn = connection
# 取出结果类型, 1字节无符号整数
buff = self._read_all(1)
result_type = struct.unpack('!B', buff)[0]
if result_type == 1:
# float的结果值, 4字节float
buff = self._read_all(4)
result = struct.unpack('!f', buff)[0]
return result
else:
# InvalidOperation对象
# 取出字符串长度, 4字节无符号整数
buff = self._read_all(4)
str_len = struct.unpack('!I', buff)[0]
buff = self._read_all(str_len)
message = buff.decode()
return InvalidOperation(message)
class MethodProtocol(object):
def __init__(self, connection):
self.conn = connection
def _read_all(self, size):
"""
读取指定长度的字节
:param size: 长度
:return: 读取出的二进制数据
"""
if isinstance(self.conn, BytesIO):
# BytesIO类型,用于演示
buff = b''
have = 0
while have < size:
chunk = self.conn.read(size - have)
have += len(chunk)
buff += chunk
return buff
else:
# socket类型
buff = b''
have = 0
while have < size:
print('have=%d size=%d' % (have, size))
chunk = self.conn.recv(size - have)
have += len(chunk)
buff += chunk
if len(chunk) == 0:
raise EOFError()
return buff
def get_method_name(self):
# 获取方法名
# 读取字符串长度,4字节无符号整型
buff = self._read_all(4)
str_len = struct.unpack('!I', buff)[0]
# 读取字符串
buff = self._read_all(str_len)
name = buff.decode()
return name
6.2 server.py
import socket
import threading
from customize_rpc.divide_protocol import DivideProtocol, MethodProtocol, InvalidOperation
class Handlers:
@staticmethod
def divide(num1, num2=1):
"""
除法
:param num1:
:param num2:
:return:
"""
if num2 == 0:
raise InvalidOperation()
val = num1 / num2
return val
class ServerStub(object):
def __init__(self, connection, handlers):
"""
服务器存根
:param connection: 与客户端的socket连接
:param handlers: 存放被调用的方法
"""
self._process_map = {
'divide': self._process_divide,
}
self.conn = connection
self.method_proto = MethodProtocol(self.conn)
self.handlers = handlers
def process(self):
"""
被服务器调用的入口,服务器收到请求后调用该方法
"""
# 获取解析调用请求的方法名
name = self.method_proto.get_method_name()
# 调用对应的处理方法
self._process_map[name]()
def _process_divide(self):
"""
执行divide本地调用,并将结果返回给客户端
"""
# 接收调用参数
proto = DivideProtocol()
args = proto.args_decode(self.conn)
# 进行本地divide调用
try:
result = self.handlers.divide(**args)
except InvalidOperation as e:
result = e
# 构造返回值消息并返回
result = proto.result_encode(result)
self.conn.sendall(result)
class Server(object):
def __init__(self, host, port, handlers):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.host = host
self.port = port
self.sock.bind((host, port))
self.handlers = handlers
def serve(self):
"""
开始服务
"""
self.sock.listen(128)
print("开始监听")
while True:
conn, addr = self.sock.accept()
print("建立链接%s" % str(addr))
stub = ServerStub(conn, self.handlers)
try:
while True:
stub.process()
except EOFError:
print("客户端关闭连接")
# 关闭服务端连接
conn.close()
class ThreadServer(object):
def __init__(self, host, port, handlers):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.host = host
self.port = port
self.sock.bind((host, port))
self.handlers = handlers
def serve(self):
"""
开始服务
"""
self.sock.listen(128)
print("开始监听")
while True:
conn, addr = self.sock.accept()
print("建立链接%s" % str(addr))
t = threading.Thread(target=self.handle, args=(conn,))
t.start()
def handle(self, client):
stub = ServerStub(client, self.handlers)
try:
while True:
stub.process()
except EOFError:
print("客户端关闭连接")
client.close()
if __name__ == '__main__':
server = Server('127.0.0.1', 8000, Handlers)
server.serve()
6.3 client.py
import time
import socket
from customize_rpc.divide_protocol import DivideProtocol, InvalidOperation
class Channel(object):
"""
连接通道
"""
def __init__(self, host, port):
self.host = host
self.port = port
def get_connection(self):
"""
获取一个tcp连接
"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((self.host, self.port))
return sock
class ClientStub(object):
"""
客户端存根
"""
def __init__(self, channel: Channel):
self.channel = channel
self.conn = self.channel.get_connection()
def divide(self, num1, num2=1):
# 构造
proto = DivideProtocol()
args = proto.args_encode(num1, num2)
self.conn.sendall(args)
result = proto.result_decode(self.conn)
if isinstance(result, InvalidOperation):
raise result
else:
return result
if __name__ == '__main__':
channel = Channel('127.0.0.1', 8000)
stub = ClientStub(channel)
for i in range(5):
try:
val = stub.divide(i * 100, 10)
except InvalidOperation as e:
print(e.message)
else:
print(val)
time.sleep(1)