1.RPC基本原理

文章目录

  • RPC
    • 1.定义
    • 2.概念
    • 3.优缺点
    • 4.RPC结构
    • 5.RPC消息协议
      • 5.1 消息边界
      • 5.2 内容
      • 5.3 压缩
    • 6.RPC的实现
      • 6.1 divide_protocol.py
      • 6.2 server.py
      • 6.3 client.py

RPC

1.定义

远程过程调用(remote procedure call)

2.概念

广义:所有通过网络进行通讯,的调用统称为RPC调用

狭义:不采用http协议的方式,采用自定义格式的二进制方式

3.优缺点

  • 优点
    • 效率高
    • 发起rpc调用的一方,可以忽略RPC的具体实现,如同编写本地函数调用
  • 缺点
    • 通用性不高

4.RPC结构

  • client(caller):调用者
  • client stub(bundle args/unbundle ret vals):客户端存根
  • client network service
  • server network service
  • server stub(bundle ret vals/unbundle args)

5.RPC消息协议

5.1 消息边界

  • 分隔符(\r\n)
  • 长度声明法(例如HTTP中 Content-Length)

5.2 内容

  • 二进制
  • 文本内容

5.3 压缩

  • 压缩处理是一把双刃剑,减少数据量减轻带宽压力同时,额外增加了压缩和解压缩的时间

6.RPC的实现

6.1 divide_protocol.py

import struct
from io import BytesIO


class InvalidOperation(Exception):
    ...


class DivideProtocol(object):
    """
    float divide(1:int num1, 2:int num2=1)
    """

    def _read_all(self, size):
        """
        读取指定长度的字节
        :param size: 长度
        :return: 读取出的二进制数据
        """
        if isinstance(self.conn, BytesIO):
            # BytesIO类型,用于演示
            buff = b''
            have = 0
            while have < size:
                chunk = self.conn.read(size - have)
                have += len(chunk)
                buff += chunk
            return buff

        else:
            # socket类型
            buff = b''
            have = 0
            while have < size:
                chunk = self.conn.recv(size - have)
                have += len(chunk)
                buff += chunk
                # 客户端关闭了连接
                if len(chunk) == 0:
                    raise EOFError()
            return buff

    def args_encode(self, num1, num2=1):
        """
        对调用参数进行编码
        :param num1: int
        :param num2: int
        :return: 编码后的二进制数据
        """
        # 处理参数num1, 4字节整型
        buff = struct.pack('!B', 1)
        buff += struct.pack('!i', num1)

        # 处理参数num2, 4字节整型,如为默认值1,则不再放到消息中
        if num2 != 1:
            buff += struct.pack('!B', 2)
            buff += struct.pack('!i', num2)

        # 处理消息总长度,4字节无符号整型
        length = len(buff)

        # 处理方法名,字符串类型
        name = 'divide'
        # 字符串长度,4字节无符号整型
        msg = struct.pack('!I', len(name))
        msg += name.encode()

        msg += struct.pack('!I', length) + buff

        return msg

    def args_decode(self, connection):
        """
        获取调用参数并进行解码
        :param connection: 传输工具对象,如socket对象或者BytesIO对象,从中可以读取消息数据
        :return: 解码后的参数字典
        """
        # 保存到当前对象中,供_read_all方式使用
        self.conn = connection
        param_name_map = {
            1: 'num1',
            2: 'num2'
        }
        param_len_map = {
            1: 4,
            2: 4
        }
        # 用于保存解码后的参数字典
        args = dict()

        # 读取消息总长度,4字无节符号整数
        buff = self._read_all(4)
        length = struct.unpack('!I', buff)[0]

        # 记录已读取的长度
        have = 0

        # 读取第一个参数,4字节整型
        buff = self._read_all(1)
        have += 1
        param_seq = struct.unpack('!B', buff)[0]
        param_len = param_len_map[param_seq]
        buff = self._read_all(param_len)
        have += param_len
        args[param_name_map[param_seq]] = struct.unpack('!i', buff)[0]

        if have >= length:
            return args

        # 读取第二个参数,4字节整型
        buff = self._read_all(1)
        have += 1
        param_seq = struct.unpack('!B', buff)[0]
        param_len = param_len_map[param_seq]
        buff = self._read_all(param_len)
        have += param_len
        args[param_name_map[param_seq]] = struct.unpack('!i', buff)[0]

        return args

    def result_encode(self, result):
        """
        对调用的结果进行编码
        :param result: float 或 InvalidOperation对象
        :return: 编码后的二进制数据
        """
        if isinstance(result, float):
            # 没有异常,正常执行
            # 处理结果类型,1字节无符号整数
            buff = struct.pack('!B', 1)

            # 处理结果值, 4字节float
            buff += struct.pack('!f', result)
        else:
            # 发生了InvalidOperation异常
            # 处理结果类型,1字节无符号整数
            buff = struct.pack('!B', 2)

            # 处理异常结果值, 字符串
            # 处理字符串长度, 4字节无符号整数
            buff += struct.pack('!I', len(result.message))
            # 处理字符串内容
            buff += result.message.encode()

        return buff

    def result_decode(self, connection):
        """
        对调用结果进行解码
        :param connection: 传输工具对象,如socket对象或者BytesIO对象,从中可以读取消息数据
        :return: 结果数据
        """
        self.conn = connection

        # 取出结果类型, 1字节无符号整数
        buff = self._read_all(1)
        result_type = struct.unpack('!B', buff)[0]
        if result_type == 1:
            # float的结果值, 4字节float
            buff = self._read_all(4)
            result = struct.unpack('!f', buff)[0]
            return result
        else:
            # InvalidOperation对象
            # 取出字符串长度, 4字节无符号整数
            buff = self._read_all(4)
            str_len = struct.unpack('!I', buff)[0]
            buff = self._read_all(str_len)
            message = buff.decode()
            return InvalidOperation(message)


class MethodProtocol(object):
    def __init__(self, connection):
        self.conn = connection

    def _read_all(self, size):
        """
        读取指定长度的字节
        :param size: 长度
        :return: 读取出的二进制数据
        """
        if isinstance(self.conn, BytesIO):
            # BytesIO类型,用于演示
            buff = b''
            have = 0
            while have < size:
                chunk = self.conn.read(size - have)
                have += len(chunk)
                buff += chunk

            return buff

        else:
            # socket类型
            buff = b''
            have = 0
            while have < size:
                print('have=%d size=%d' % (have, size))
                chunk = self.conn.recv(size - have)
                have += len(chunk)
                buff += chunk

                if len(chunk) == 0:
                    raise EOFError()

            return buff

    def get_method_name(self):
        # 获取方法名
        # 读取字符串长度,4字节无符号整型
        buff = self._read_all(4)
        str_len = struct.unpack('!I', buff)[0]

        # 读取字符串
        buff = self._read_all(str_len)
        name = buff.decode()
        return name

6.2 server.py

import socket
import threading

from customize_rpc.divide_protocol import DivideProtocol, MethodProtocol, InvalidOperation


class Handlers:
    @staticmethod
    def divide(num1, num2=1):
        """
        除法
        :param num1:
        :param num2:
        :return:
        """
        if num2 == 0:
            raise InvalidOperation()
        val = num1 / num2
        return val


class ServerStub(object):
    def __init__(self, connection, handlers):
        """
        服务器存根
        :param connection: 与客户端的socket连接
        :param handlers: 存放被调用的方法
        """
        self._process_map = {
            'divide': self._process_divide,
        }
        self.conn = connection
        self.method_proto = MethodProtocol(self.conn)
        self.handlers = handlers

    def process(self):
        """
        被服务器调用的入口,服务器收到请求后调用该方法
        """
        # 获取解析调用请求的方法名
        name = self.method_proto.get_method_name()

        # 调用对应的处理方法
        self._process_map[name]()

    def _process_divide(self):
        """
        执行divide本地调用,并将结果返回给客户端
        """
        # 接收调用参数
        proto = DivideProtocol()
        args = proto.args_decode(self.conn)

        # 进行本地divide调用
        try:
            result = self.handlers.divide(**args)
        except InvalidOperation as e:
            result = e

        # 构造返回值消息并返回
        result = proto.result_encode(result)
        self.conn.sendall(result)


class Server(object):
    def __init__(self, host, port, handlers):
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.host = host
        self.port = port
        self.sock.bind((host, port))
        self.handlers = handlers

    def serve(self):
        """
        开始服务
        """
        self.sock.listen(128)
        print("开始监听")
        while True:
            conn, addr = self.sock.accept()
            print("建立链接%s" % str(addr))
            stub = ServerStub(conn, self.handlers)
            try:
                while True:
                    stub.process()
            except EOFError:
                print("客户端关闭连接")
            # 关闭服务端连接
            conn.close()


class ThreadServer(object):
    def __init__(self, host, port, handlers):
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.host = host
        self.port = port
        self.sock.bind((host, port))
        self.handlers = handlers

    def serve(self):
        """
        开始服务
        """
        self.sock.listen(128)
        print("开始监听")
        while True:
            conn, addr = self.sock.accept()
            print("建立链接%s" % str(addr))
            t = threading.Thread(target=self.handle, args=(conn,))
            t.start()

    def handle(self, client):
        stub = ServerStub(client, self.handlers)
        try:
            while True:
                stub.process()
        except EOFError:
            print("客户端关闭连接")

        client.close()


if __name__ == '__main__':
    server = Server('127.0.0.1', 8000, Handlers)
    server.serve()    

6.3 client.py

import time
import socket

from customize_rpc.divide_protocol import DivideProtocol, InvalidOperation


class Channel(object):
    """
    连接通道
    """

    def __init__(self, host, port):
        self.host = host
        self.port = port

    def get_connection(self):
        """
        获取一个tcp连接
        """
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.connect((self.host, self.port))
        return sock


class ClientStub(object):
    """
    客户端存根
    """

    def __init__(self, channel: Channel):
        self.channel = channel
        self.conn = self.channel.get_connection()

    def divide(self, num1, num2=1):
        # 构造
        proto = DivideProtocol()
        args = proto.args_encode(num1, num2)
        self.conn.sendall(args)
        result = proto.result_decode(self.conn)
        if isinstance(result, InvalidOperation):
            raise result
        else:
            return result


if __name__ == '__main__':
    channel = Channel('127.0.0.1', 8000)
    stub = ClientStub(channel)

    for i in range(5):
        try:
            val = stub.divide(i * 100, 10)
        except InvalidOperation as e:
            print(e.message)
        else:
            print(val)
        time.sleep(1)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/944259.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Windows下播放文件作为麦克风声源的一种方式

近期测试一种外语的ASR识别成功率&#xff0c;样本素材是懂这门语言的同事录制的mp3文件。测试client端原本是从麦克风拾音生成媒体流的。 这样&#xff0c;就需要想办法把mp3文件转换为测试client的输入声音。物理方式上&#xff0c;可以用一根音频线&#xff0c;把电…

如何在网页端使用 IDE 高效地阅读 GitHub 源码?

如何在网页端使用 IDE 高效地阅读 GitHub 源码&#xff1f; 前言什么是 GitHub1s&#xff1f;使用 GitHub1s 阅读 browser-use 项目源码步骤 1: 打开 GitHub 项目页面步骤 2: 修改 URL 使用 GitHub1s步骤 3: 浏览文件结构步骤 4: 使用代码高亮和智能补全功能步骤 5: 快速跳转和…

Microsoft word@【标题样式】应用不生效(主要表现为在导航窗格不显示)

背景 随笔。Microsoft word 2013基础使用&#xff0c;仅做参考和积累。 问题 Microsoft word 2013&#xff0c;对段落标题文字应用【标题样式】不生效&#xff08;主要表现为在导航窗格不显示&#xff09;。 图1 图2 观察图1和图2&#xff0c;发现图1的文字在应用【标题一】样…

2021.12.28基于UDP同信的相关流程

作业 1、将TCP的CS模型再敲一遍 服务器 #include <myhead.h> #define PORT 8888 #define IP "192.168.124.123" int main(int argc, const char *argv[]) {//创建套接字//绑定本机IP和端口号//监听客户端请求//接收客户端连接请求//收发消息//创建套接字int…

OpenCV和PyQt的应用

1.创建一个 PyQt 应用程序&#xff0c;该应用程序能够&#xff1a; 使用 OpenCV 加载一张图像。在 PyQt 的窗口中显示这张图像。提供四个按钮&#xff08;QPushButton&#xff09;&#xff1a; 一个用于将图像转换为灰度图一个用于将图像恢复为原始彩色图一个用于将图像进行翻…

kibana启动报错:Invalid character in header content [“kbn-name“]

启动时候kibana报错&#xff1a; 打开 kibana配置文件&#xff0c;config/kibana.yml&#xff0c;配置上server.name即可&#xff0c;如下&#xff1a;

Pandas08

Pandas01 Pandas02 Pandas03 Pandas04 Pandas05 Pandas06 Pandas07 文章目录 内容回顾同期群分析1.1 同期群分析概念1.2 案例代码 数据分析报告数据分析工作内容数据分析简历说明用户生命周期标签1 什么是生命周期标签2 如何计算生命周期标签 内容回顾 TGI 偏好分析 TGI 目标…

网页数据的解析提取之Beautiful Soup

前面博客介绍了正则表达式的相关用法&#xff0c;只是一旦正则表达式写得有问题&#xff0c;得到的结果就可能不是我们想要的了。而且每一个网页都有一定的特殊结构和层级关系&#xff0c;很多节点都用id或 class 作区分所以借助它们的结构和属性来提取不也可以吗? 本篇博客我…

电脑缺失sxs.dll文件要怎么解决?

一、文件丢失问题&#xff1a;以sxs.dll文件缺失为例 当你在运行某个程序时&#xff0c;如果系统提示“找不到sxs.dll文件”&#xff0c;这意味着你的系统中缺少了一个名为sxs.dll的动态链接库文件。sxs.dll文件通常与Microsoft的.NET Framework相关&#xff0c;是许多应用程序…

进军AI大模型-环境配置

语言环境配置 合法上网工具&#xff1a; 这个T子试试&#xff0c;一直稳定。走我链接免费用5天: https://wibnm.com/s/ywtc01/pvijpzy python版本&#xff1a; python3.12 Langchain: Introduction | &#x1f99c;️&#x1f517; LangChain v0.3 9月16日升级的版本 pip3…

WebStorm的下载安装指南

下载 打开网站https://www.jetbrains.com/webstorm/download/#sectionwindows 或者直接网盘下载 通过网盘分享的文件&#xff1a;WebStorm-2024.3.1.1.exe 链接: https://pan.baidu.com/s/16JRZjleFYshLbVvZB49-FA?pwdn5hc 提取码: n5hc –来自百度网盘超级会员v6的分享 安…

Vue使用pages构建多页应用

经过上一篇文章&#xff0c;大家对单页应用配置的都有了一定的了解。相信大家应该对如何构建一个 Vue 单页应用项目已经有所收获和体会&#xff0c;在大部分实际场景中&#xff0c;我们都可以构建单页应用来进行项目的开发和迭代&#xff0c;然而对于项目复杂度过高或者页面模块…

springboot506基于Springboot的小区疫情购物系统录(论文+源码)_kaic

摘 要 信息数据从传统到当代&#xff0c;是一直在变革当中&#xff0c;突如其来的互联网让传统的信息管理看到了革命性的曙光&#xff0c;因为传统信息管理从时效性&#xff0c;还是安全性&#xff0c;还是可操作性等各个方面来讲&#xff0c;遇到了互联网时代才发现能补上自古…

复习打卡大数据篇——Hadoop MapReduce

目录 1. MapReduce基本介绍 2. MapReduce原理 1. MapReduce基本介绍 什么是MapReduce MapReduce是一个分布式运算程序的编程框架&#xff0c;核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序&#xff0c;并发运行在Hadoop集群上。 MapRed…

RK3506开发板:智能硬件领域的新选择,带来卓越性能与低功耗

在现代智能硬件开发中&#xff0c;选择一款性能稳定、功耗低的开发板是确保产品成功的关键。Rockchip最新推出的RK3506芯片&#xff0c;凭借其卓越的能效比、多功能扩展性和优秀的实时性能&#xff0c;已经成为智能家电、工业控制、手持终端等领域的热门选择。而基于RK3506的Ar…

Python学习(2):注释、数字、文本、列表

1 关于注释 Python 使用井号#作为单行注释的符号&#xff0c; 使用三个连续的单引号’’或者三个连续的双引号"""注释多行内容。 2 数字 2.1 基本运算 解释器像一个简单的计算器&#xff1a;你可以输入一个表达式&#xff0c;它将给出结果值。 表达式语法很直观…

加载Tokenizer和基础模型的解析及文件介绍:from_pretrained到底加载了什么?

加载Tokenizer和基础模型的解析及文件介绍 在使用Hugging Face的transformers库加载Tokenizer和基础模型时&#xff0c;涉及到许多文件的调用和解析。这篇博客将详细介绍这些文件的功能和它们在加载过程中的作用&#xff0c;同时结合代码片段进行解析。 下图是我本地下载好模…

Excel批量设置行高,Excel表格设置自动换行后打印显示不全,Excel表格设置最合适的行高后打印显示不全,完美解决方案!!!

文章目录 说个问题&#xff08;很严重&#xff01;&#xff01;&#xff01;&#xff09;写个方案会Python看这里Python环境搭建不存在多行合并存在多行合并 不会Python看这里 说个问题&#xff08;很严重&#xff01;&#xff01;&#xff01;&#xff09; 平时处理Excel表格…

goview——vue3+vite——数据大屏配置系统

低代码数据大屏配置系统&#xff1a; 数据来源是可以动态api配置的&#xff1a; 配置上面的api接口后&#xff0c;在数据过滤中进行数据格式的转化。 以上内容&#xff0c;来源于https://gitee.com/dromara/go-view/tree/master-fetch/ 后端代码如下&#xff0c;需要更改…

ADC相关算法以及热敏电阻测温

目录 前言 一、平均值滤波算法 二、快速排序算法的使用 三、中位值滤波算法 四、二分查找法 4.1 二分查找法查找某个元素是否存在 4.2 二分查找法查找接近目标数值的元素的下标 五、NTC热敏电阻实现测温 5.1 分层设计 5.2 软件流程图 ​编辑 5.3 API接口及数据结构 5…