RPC 框架

RPC 全称 Remote Procedure Call——远程过程调用。

  • RPC技术简单说就是为了解决远程调用服务的一种技术,使得调用者像调用本地服务一样方便透明。
  • RPC是一种通过网络从远程计算机程序上请求服务,不需要了解底层网络技术的协议。

集群和分布式

集群:集群(cluster)是指在多台不同的服务器中部署相同应用或服务模块,构成一个集群,通过负载均衡设备对外提供服务。在不同的服务器中部署相同的功能。

分布式:指在多台不同的服务器中部署不同的服务模块,通过远程调用协同工作,对外提供服务。不同服务器中部署不同的功能,通过网络连接起来,组成一个完整的系统。

分布式是以缩短单个任务的执行时间来提升效率的,而集群则是通过提高单位时间内执行的任务数来提升效率。

为什么要有RPC?

服务化:微服务化,跨平台的服务之间远程调用;
分布式系统架构:分布式服务跨机器进行远程调用;
服务可重用:开发一个公共能力服务,供多个服务远程调用。
系统间交互调用:两台服务器A、B,服务器 A 上的应用 a 需要调用服务器 B 上的应用 b 提供的方法,而应用 a 和应用 b 不在一个内存空间,不能直接调用,此时,需要通过网络传输来表达需要调用的语义及传输调用的数据。

使用场景

  • 大型网站:内部涉及多个子系统,服务、接口较多。
  • 注册发现机制:如Nacos、Dubbo等,一般都有注册中心,服务有多个实例,调用方调用的哪个实例无感知。
  • 安全性:不暴露资源。
  • 服务化治理:微服务架构、分布式架构。

常用RPC技术或框架

  • 应用级的服务框架:阿里的 Dubbo/Dubbox、Google gRPC、Spring Boot/Spring Cloud。
  • 远程通信协议:RMI、Socket、SOAP(HTTP XML)、REST(HTTP JSON)。
  • 通信框架:MINA 和 Netty

RPC 原理

在这里插入图片描述
RPC 是指计算机 A 上的进程,调用另外一台计算机 B 上的进程,其中 A 上的调用进程被挂起,而 B 上的被调用进程开始执行,当值返回给 A 时,A 进程继续执行。调用方可以通过使用参数将信息传送给被调用方,而后可以通过传回的结果得到信息。而这一过程,对于开发人员来说是透明的。

远程过程调用采用客户机/服务器(C/S)模式。请求程序就是一个客户机,而服务提供程序就是一台服务器。和常规或本地过程调用一样,远程过程调用是同步操作,在远程过程结果返回之前,需要暂时中止请求程序。使用相同地址空间的低权进程或低权线程允许同时运行多个远程过程调用。

在这里插入图片描述

RPC五大模块及交互关系

在这里插入图片描述

  • user(客户端)
  • user-stub(客户端存根)
  • RPCRuntime(RPC通信包)
  • server-stub(服务端存根)
  • server(服务端)

用户端:当用户希望进行远程调用时,实际上是调用的本地 user-stub 中相应的代码。user-stub 负责将调用的规范和参数打包成一个或多个包,通过 RPCRuntime(RPC通信包)传输到被调用机器。
服务端:服务端接收到这些数据包后,对应的 RPCRuntime(RPC通信包)将它们传递给 server-stub。然后 server-stub 将它们解包,并调用对应的本地实现。同时用户端的调用进程挂起,等待服务端返回结果包。当服务端调用完成时,返回到 server-stub,并通过服务端的RPCRuntime 将结果传回用户端对应的 RPCRuntime(RPC通信包)挂起的进程中。然后通过 user-stub 解包,最后将它们返回给用户。

如果把用户端和服务端代码放在一台机器上,直接绑定在一起,不使用 user-stub 和 server-stub,程序仍然可以工作。RPCRuntime(RPC通信包)是Cedar系统的一个标准部分,因此不用程序员编写通信相关代码,但是 user-stub 和 server-stub 是由一个叫做 Lupine 的程序自动生成的,也不需要程序员编写对应包处理层面的代码。

RPC 业务实现

Callee 对外提供远端可调用方法 LoginRegister,要在 user.proto 中进行注册(service UserServiceRpc)。在Callee中的Login方法接受 LoginRequest message,执行完逻辑后返回LoginResponse message 给 Caller。

Caller 可以调用 UserServiceRpc_Stub::Login发起远端调用,而 Callee 则继承UserServiceRpc类并重写UserServiceRpc::Login函数,实现Login函数的处理逻辑。这是 protobuf 提供的接口,需要服务方法提供者重写这个 Login 函数。

class UserService : public fixbug::UserServiceRpc{  //使用在rpc服务发布端(rpc服务提供者)
    public:
    bool Login(std::string name, std::string pwd){
        std::cout << "doing local service : LOGIN " << std::endl;
        std::cout << "name: " << name << " pwd: "<< pwd << std::endl;
        return true;
    }

    //新增的测试方法
    bool Register(uint32_t id,std::string name,std::string pwd)
    {
        std::cout << "doing local service: Register" << std::endl;
        std::cout << "id:" << id <<" name:" << name << " pwd:" << pwd << std::endl;
        return true;
    }

    // 重写基类UserServiceRpc的虚函数 下面这些方法都是框架直接调用的
    // 1. caller --> Login(LoginRequest) --> muduo --> callee
    // 2. callee --> Login(LoginRequest) --> 交到下面重写的这个Login方法上了
    void Login(::google::protobuf::RpcController* controller,
                       const ::fixbug::LoginRequest* request,
                       ::fixbug::LoginResponse* response,
                       ::google::protobuf::Closure* done)
    {
        //框架给业务.上报了请求参数LoginRequest,应用获取相应数据做本地业务
        std::string name = request->name();
        std::string pwd = request->pwd();
    
        // 做本地业务
        bool login_result = Login(name, pwd); 

        // 把响应写入包括错误码、 错误消息、返回值
        fixbug::ResultCode *code = response->mutable_result();
        code->set_errcode(0);
        code->set_errmsg("");
        response->set_success(login_result);

        //执行回调操作执行, 响应对象数据的序列化和网络发送 (都是由框架来完成的)
        done->Run();
    }

    void Register(::google::protobuf::RpcController* controller,
                       const ::fixbug::RegisterRequest* request,
                       ::fixbug::RegisterResponse* response,
                       ::google::protobuf::Closure* done)
    {
        uint32_t id = request->id();
        std::string name = request->name();
        std::string pwd = request->pwd();

        //开始做本地业务
        bool ret = Register(id, name, pwd);

        //填充回调结果
        response->mutable_result()->set_errcode(0);
        response->mutable_result()->set_errmsg("");
        response->set_success(ret);

        done->Run();
    }
};

RPC 服务提供

  1. RpcProvider 是一个服务器,接收来自 rpc 客户端的请求,且能在一定程度上承载高并发的需求(考虑多个 rpcClient 给当前 rpcProvider 发送 rpc 调用请求)。
  2. 一个 rpcclient 发送请求过来调用一个远程方法,那么 rpcProvider 收到这个请求之后,能根据请求所携带的数据自动调用发布的 rpc 方法,那么请求必须包含服务名、方法名、以及参数,这样 rpcProvider 才知道怎么调用。即 buffer = service_name + method_name + args。
//框架提供的专门负责发布rpc服务的网络对象类
class RpcProvider{
public:
    //这里是框架提供给外部使用的,可以发布rpc方法的函数接口
    //此处应该使用Service类,而不是指定某个方法
    void NotifyService(google::protobuf::Service *service);

    //启动rpc服务节点,开始提供rpc远程网络调用服务
    void Run();

private:
    //组合 EventLoop
    muduo::net::EventLoop m_eventLoop;

    //service服务类型信息
    struct ServiceInfo
    {
        google::protobuf::Service *m_service;//保存服务对象
        std::unordered_map<std::string,const google::protobuf::MethodDescriptor*> m_methodMap;//保存服务方法

    };

    //存储注册成功的服务对象和其服务方法的所有信息
    std::unordered_map<std::string,ServiceInfo> m_serviceMap;

    // 新的 socket 连接时的回调
    void OnConnection(const muduo::net::TcpConnectionPtr &conn);
    // 已建立连接用户的读写事件回调;当远程有调用 rpc 服务的请求时,OnMessage 方法就会响应
    void OnMessage(const muduo::net::TcpConnectionPtr &conn,
                                    muduo::net::Buffer *buffer,
                                    muduo::Timestamp);
    //Closure的回调操作,用于序列化RPC的响应和网络发送
    void SendRpcResponse(const muduo::net::TcpConnectionPtr&,google::protobuf::Message* );
}; 
int main(int argc, char *argv[])
{
    //先调用框架的初始化操作 provider -i config.conf,从init方法读取配置服务,比如IP地址和端口号
    MprpcApplication::Init(argc,argv);

    //项目提供者,让我们可以发布该服务
    RpcProvider provider;
    //把UserService对象发布到rpc节点上
    provider.NotifyService(new UserService());
    
    //启动一个rpc服务发布节点,run以后,进程进入阻塞状态,等待远程的rpc请求
    provider.Run();

    return 0;
}

NotifyService 函数可以将UserService服务对象及其提供的方法进行预备发布。发布完服务对象后再调用Run()就将预备发布的服务对象及方法注册到ZooKeeper上并开启了对远端调用的网络监听。

Muduo提供的网络模块监听到连接事件并处理完连接逻辑后会调用OnConnection函数,监听到已建立的连接发生可读事件后会调用OnMessage函数

RpcProvider::NotifyService() 实现

Service_Info结构体内定义了一个服务对象,以及这个服务对象内提供的方法们(以std::unordered_map形式存储)

将传入进来的服务对象 service 进行预备发布。其实说直白点就是将这个 service 服务对象及其提供的方法的 Descriptor 描述类,存储在RpcProvider::m_serviceMap中。

/*
service_name <=> service 描述  => service* 记录服务对象
                              => method_name => method 方法对象
json protobuf
*/

//这里是框架提供给外部使用的,可以发布rpc方法的函数接口
//此处应该使用Service类,而不是指定某个方法
void RpcProvider::NotifyService(google::protobuf::Service *service){
    //服务表
    ServiceInfo service_info;//服务表

    //获取了服务对象的描述信息
    const google::protobuf::ServiceDescriptor *pserviceDesc = service->GetDescriptor();
    //获取服务的名字
    std::string service_name = pserviceDesc->name();
    //获取服务对象service的方法数量
    int methodCnt= pserviceDesc->method_count();

    std::cout<<"service name:"<<service_name<<std::endl;    // 添加日志信息后更改

    for(int i=0; i<methodCnt; ++i){
        //获取了服务对象指定下标的服务方法的描述(抽象描述)
        const google::protobuf::MethodDescriptor* pmethodDesc = pserviceDesc->method(i);

        std::string method_name = pmethodDesc->name();
        //插入服务
        service_info.m_methodMap.insert({method_name, pmethodDesc});

        printf("method_name:%s \n",method_name.c_str());
    }
    //可以使用该表来调用方法
    service_info.m_service = service;
    m_serviceMap.insert({service_name, service_info});

}

RpcProvider::Run() 实现

将待发布的服务对象及其方法发布到ZooKeeper上,同时利用Muduo库提供的网络模块开启对RpcServer的(IP, Port)的监听。

// 启动rpc服务节点,开始提供rpc远程网络调用服务
void RpcProvider::Run(){
    // 获取配置文件中的 ip 和端口号初始化结构体
    std::string ip = MprpcApplication::GetInstance().GetConfig().Load("rpcserverip");
    uint16_t port = atoi(MprpcApplication::GetInstance().GetConfig().Load("rpcserverport").c_str());
    muduo::net::InetAddress address(ip, port);

    // 为了方便用户使用框架,在 Run 方法中封装 muduo
    // 创建 TcpServer 对象
    muduo::net::TcpServer tcpServer_(&m_eventLoop, address, "MprpcProvider");

     // 绑定连接回调和消息读写回调方法
    tcpServer_.setConnectionCallback(std::bind(&RpcProvider::OnConnection, this, std::placeholders::_1));
    tcpServer_.setMessageCallback(std::bind(&RpcProvider::OnMessage, this, std::placeholders::_1, 
                                    std::placeholders::_2, std::placeholders::_3));
    
    // 设置 muduo 库的线程数
    tcpServer_.setThreadNum(4);

    //把当前rpc节点上要发布的服务全部注册在zk上,让rpc client可以从zk上发现服务
    //session的timeout默认为30s,zkclient的网络I/O线程1/3的timeout内不发送心跳则丢弃此节点
    ZkClient zkCli;
    zkCli.Start();//链接zkserver
    for(auto &sp:m_serviceMap){
        //service_name
        std::string service_path ="/"+sp.first;//拼接路径
        zkCli.Create(service_path.c_str(),nullptr,0);//创建临时性节点
        for(auto &mp:sp.second.m_methodMap){
            //service_name/method_name
            std::string method_path=service_path+"/"+mp.first;//拼接服务器路径和方法路径
            char method_path_data[128]={0};
            sprintf(method_path_data,"%s:%d",ip.c_str(),port);//向data中写入路径

            //创建节点,ZOO_EPHEMERAL表示临时节点
            zkCli.Create(method_path.c_str(),method_path_data,strlen(method_path_data),ZOO_EPHEMERAL);
        }
    }

    std::cout << "MprpcProvider start service at: " << ip << ':' << port << '\n';

    // 启动网络服务
    tcpServer_.start();
    m_eventLoop.loop();
    
}

RpcProvider::OnConnection() 实现

// 新的 socket 连接时的回调
void RpcProvider::OnConnection(const muduo::net::TcpConnectionPtr &conn){
    if(!conn->connected()){
        //和rpcclient的链接断开了
        conn->shutdown();
    }
}

RpcProvider::OnMessage() 实现

Caller 端发起远程调用的时候, 会对callee的rpcserver发起tcp连接,rpcserver接受连接后,开启对客户端连接描述符的可读事件监听。caller将请求的服务方法及参数发给callee的rpcserver,此时rpcserver上的muduo网络模块监听到该连接的可读事件,然后就会执行OnMessage(…)函数逻辑。

该方法表示已建立连接用户的读写事件操作,如果有一个远程 RPC 服务的调用请求,那么OnMessage方法就会响应。

  1. 首先要从网络上接收的远程rpc调用请求的字符流;
  2. 从字符流中读取前4个字节的内容,将头部的大小转换成二进制存到这四字节里,不可能会超出范围;
  3. 根据header_size读取数据头的原始字符流,反序列化数据,得到rpc请求的详细信息;
  4. 获取rpc方法参数的字符流数据,略过recv_buf的前面的头部信息(header_size和header_str),4字节加header_size即为开始的位置;
  5. 获取service对象和method对象;
  6. 生成rpc方法调用的请求request和响应response参数;
  7. 在框架上根据远端rpc请求,调用当前rpc节点上发布的方法。
/*
在框架内部需要提前协商好通信使用的protobuf数据类型:比如发送过来的数据类型为:service_name,method_name,args
需要定义proto的message类型,进行数据头的序列化和反序列化,为防止TCP的粘包,需要对各个参数进行参数的长度明确

定义header_size(4字节) + header_str + args_str

已建立连接的用户的读写事件回调,网络上如果有一个远程的rpc服务请求,则onmessge方法就会响应
*/

// 已建立连接用户的读写事件回调;当远程有调用 rpc 服务的请求时,OnMessage 方法就会响应
void RpcProvider::OnMessage(const muduo::net::TcpConnectionPtr &conn, muduo::net::Buffer *buffer, muduo::Timestamp){
    //获取到数据,即网络上接受的远程rpc调用请求的字符流, Login和args
    std::string recv_buf= buffer->retrieveAllAsString();

    //读取header_size,此时的整数若按照字符串格式发送,读取时会出现问题,所以需要直接按二进制发送
    //从字符流中读取前四个字节的内容
    uint32_t header_size=0;
    recv_buf.copy((char*)&header_size,4,0);

    //根据header_size读取数据头的原始字符流,反序列化数据,得到rpc的详细请求数据
    std::string rpc_header_str=recv_buf.substr(4,header_size);//substr从4开始读读取header_size个字节的数据
    mprpc::RpcHeader rpcHeader;

    std::string service_name;//用于存储反序列化成功的服务名字
    std::string method_name;//用于存储反序列化成功的服务方法
    uint32_t args_size;//用于存储反序列化成功的参数个数

    //开始反序列化,参数接受类型为引用,返回值为bool型
    if(rpcHeader.ParseFromString(rpc_header_str)){
        //数据头反序列化成功
        service_name=rpcHeader.service_name();
        method_name=rpcHeader.method_name();
        args_size=rpcHeader.args_size();
    }else {
        //数据头反序列化失败
        std::cout<<"rpc_header_str: "<<rpc_header_str<<"parse error!"<<std::endl;
        return;
    }

    //获取rpc参数方法的字符流数据
    std::string args_str=recv_buf.substr(4+header_size,args_size);

    //打印调试信息
    std::cout << "======================================" << std::endl;
    std::cout << "header_size: " << header_size << std::endl;
    std::cout << "rpc_header_str" << rpc_header_str << std::endl;
    std::cout << "service_name: " << service_name << std::endl;
    std::cout << "method_name: " << method_name << std::endl;
    std::cout << "args_str: " << args_str << std::endl;
    std::cout << "======================================" << std::endl;

    //获取service对象和method对象
    auto it = m_serviceMap.find(service_name);
    if(it == m_serviceMap.end()){
        //如果方法不存在
        std::cout << service_name << "is not exist!" << std::endl;
        return;
    }

    auto mit = it->second.m_methodMap.find(method_name);
    if(mit == it->second.m_methodMap.end()){
        //如果服务提供的方法不存在
        std::cout << service_name << ":" << method_name << "is not exists!" << std::endl;
        return;
    }

    google::protobuf::Service *service=it->second.m_service;    // 获取service对象,对应Userservice
    const google::protobuf::MethodDescriptor *method=mit->second;   // 获取method对象,对应Login方法

    //生成rpc方法调用的请求request和相应response参数
    google::protobuf::Message *request = service->GetRequestPrototype(method).New();//生成一个新对象
    if(!request->ParseFromString(args_str)){
        std::cout << "request parse error, content:" << args_str << std::endl;
        return;
    }

    google::protobuf::Message *response = service->GetResponsePrototype(method).New();//生成一个新对象

    //给下面的method方法的调用,绑定一个Closure的回调函数,因为模板的实参推演失败,所以需要指定类型
    google::protobuf::Closure *done = google::protobuf::NewCallback
    <RpcProvider,const muduo::net::TcpConnectionPtr&,google::protobuf::Message*>
    (this, &RpcProvider::SendRpcResponse, conn,response);

    //在框架上根据远端rpc请求,调用当前rpc节点上发布的方法
    //相当于UserService调用了Login方法
    service->CallMethod(method, nullptr, request, response, done);
}

NewCallback函数会返回一个google::protobuf::Closure类的对象,该Closure类其实相当于一个闭包。这个闭包捕获了一个成员对象的成员函数,以及这个成员函数需要的参数。然后闭包类提供了一个方法Run(),当执行这个闭包对象的Run()函数时,他就会执行捕获到的成员对象的成员函数,也就是相当于执行void RpcProvider::SendRpcResponse(conn, response);,这个函数可以将reponse消息体发送给Tcp连接的另一端,即caller

CallMethod 将服务名方法名进行组装,并用protobuf提供的序列化方法序列化,然后通过服务名方法名查找ZooKeeper服务器上提供该服务方法的RpcServer的地址信息,然后返回。接着再将请求的服务方法及其参数组装并序列化,向RpcServer发起tcp连接请求,连接建立后将序列化的数据发送给RpcServer,然后再等待接收来自RpcServer的返回消息体。

RpcProvider::SendRpcResponse() 实现

//Closure的回调操作,用于序列化RPC的响应和网络发送
void RpcProvider::SendRpcResponse(const muduo::net::TcpConnectionPtr& conn,google::protobuf::Message* response){
    std::string response_str;
    //response进行序列化
    if(response->SerializeToString(&response_str)){
        //序列化成功后,通过网络把rpc方法执行的结果发送回rpc的调用方
        conn->send(response_str);
    } else {
        std::cout<<"Serialize response error!"<<std::endl;
    }
    //模拟http的短链接服务,由rpcprovider主动断开连接
    conn->shutdown();
}

RPC 服务调用

调用方需要利用到的是 Stub 类。Stub 类需要提供一个带参数的构造函数,需要重写这个实参 RpcChannel。

class MprpcChannel:public google::protobuf::RpcChannel
{
public:
    //所有通过stub代理对象调用的rpc方法都从这里处理,统一做方法调用的数据序列化和网络发送
    void CallMethod(const google::protobuf::MethodDescriptor* method,
                          google::protobuf::RpcController* controller, const google::protobuf::Message* request,
                          google::protobuf::Message* response, google::protobuf::Closure* done);
};

提供方调用函数的方法:MprpcChannel::CallMethod,调用方的框架逻辑就是将访问的对象,函数,参数序列化,socket连接到zookeeper,获取对应的 response。

//所有通过stub代理对象调用的rpc方法都从这里处理,统一做方法调用的数据序列化和网络发送
void MprpcChannel::CallMethod(const google::protobuf::MethodDescriptor* method,
                          google::protobuf::RpcController* controller, 
                          const google::protobuf::Message* request,
                          google::protobuf::Message* response, 
                          google::protobuf::Closure* done)
{
    const google::protobuf::ServiceDescriptor* sd=method->service();
    std::string service_name=sd->name();    //service name
    std::string method_name=method->name(); //method name

    //获取参数的序列化字符串长度 args_size
    uint32_t args_size = 0;
    std::string args_str;
    if(request->SerializeToString(&args_str)){
        //序列化成功
        args_size=args_str.size();
    } else {
         controller->SetFailed("serialize request error!");//保存错误信息
        // std::cout <<"serialize request error!"<< std::endl;
        return;
    }

    //定义rpc的请求header
    mprpc::RpcHeader rpcHeader;
    rpcHeader.set_service_name(service_name);
    rpcHeader.set_method_name(method_name);
    rpcHeader.set_args_size(args_size);

    uint32_t header_size = 0;
    std::string rpc_header_str;
    if(rpcHeader.SerializeToString(&rpc_header_str)){   // response进行序列化   
        header_size = rpc_header_str.size();
    } else {
        // std::cout <<"serialize rpc header error!"<< std::endl;    // 优化
        controller->SetFailed("serialize rpc header error!");
        return;
    }

    //组织待发送的rpc请求的字符串
    std::string send_rpc_str;
    send_rpc_str.insert(0, std::string((char *)&header_size, 4));   // header_size
    send_rpc_str += rpc_header_str; // rpcheader
    send_rpc_str += args_str;   // args

    std::cout<<"======================================"<<std::endl;
    std::cout<<"header_size: "<<header_size<<std::endl;
    std::cout<<"rpc_header_str"<<rpc_header_str<<std::endl;
    std::cout<<"service_name: "<<service_name<<std::endl;
    std::cout<<"method_name: "<<method_name<<std::endl;
    std::cout<<"args_str: "<<args_str<<std::endl;
    std::cout<<"======================================"<<std::endl;

    //使用TCP编程,完成rpc方法的远程调用
    int clientfd = socket(AF_INET, SOCK_STREAM, 0);
    if(-1 == clientfd)
    {
        // std::cout << "create socket error! errno: "<< errno << std::endl;    //改用 controller 记录错误信息
        // exit(EXIT_FAILURE);
        char errtxt[512]={0};
        sprintf(errtxt,"create socket error! errno: %d",errno);
        controller->SetFailed(errtxt);
        return;
    }

    // std::string ip = MprpcApplication::GetInstance().GetConfig().Load("rpcserverip");
    // uint16_t port = atoi(MprpcApplication::GetInstance().GetConfig().Load("rpcserverport").c_str());

    /*
    rpc调用方向调用service_name服务,需要查询zk上该服务所在的host信息
    */
    ZkClient zkCli;
    zkCli.Start();
    std::string method_path="/"+service_name+"/"+method_name;

    //获取ip地址和端口号
    std::string host_data=zkCli.GetData(method_path.c_str());
    if(host_data=="")
    {
        controller->SetFailed(method_path+" is not exist!");
        return;
    }
    int idx=host_data.find(":");//分割符
    if(idx==-1)
    {
        controller->SetFailed(method_path+" address is invalid!");
        return;
    }
    std::string ip=host_data.substr(0,idx); //从字符串中返回一个指定的子串
    uint32_t port=atoi(host_data.substr(idx+1,host_data.size()-idx).c_str());   //把参数 str 所指向的字符串转换为一个整数


    struct sockaddr_in server_addr;
    server_addr.sin_family = AF_INET;
    server_addr.sin_port = htons(port);
    server_addr.sin_addr.s_addr = inet_addr(ip.c_str());

    //链接rpc服务节点
    if(-1 == connect(clientfd,(struct sockaddr*)&server_addr,sizeof(server_addr)))
    {

        // std::cout<<"connect error!errno: "<<errno<<std::endl;
        // close(clientfd);
        // exit(EXIT_FAILURE);
        close(clientfd);
        char errtxt[512]={0};
        sprintf(errtxt,"connect error! errno: %d",errno);
        controller->SetFailed(errtxt);
        return;
    }

    //发送rpc请求
    if(-1 == send(clientfd, send_rpc_str.c_str(), send_rpc_str.size(), 0))
    {
        // std::cout<<"send error!errno: "<<errno<<std::endl;
        // close(clientfd);
        // return;
        close(clientfd);
        char errtxt[512]={0};
        sprintf(errtxt,"send error! errno: %d",errno);
        controller->SetFailed(errtxt);
        return;
    }

    //接受rpc请求的响应值
    char recv_buf[1024]={0};
    int recv_size = 0;
    if(-1 == (recv_size = recv(clientfd, recv_buf, 1024, 0)))
    {
        close(clientfd);
        char errtxt[512]={0};
        sprintf(errtxt,"recv error! errno: %d",errno);
        controller->SetFailed(errtxt);
        return;
    }

    //反序列化rpc调用的响应数据
    
    std::string response_str(recv_buf, 0, recv_size);   //bug点:recv_buf遇到\0后的数据不再读取,导致反序列化失败
    //解决方案:使用string转换时会遇到\0,由于字符串特性导致不再读取,因为protobuf支持从数组转换,所以换方法直接从Array反序列化
    // if(!response->ParseFromString(response_str)){
    if(!response->ParsePartialFromArray(recv_buf,recv_size)){
        // std::cout<<"parse error! response_str:"<<response_str<<std::endl;
        // close(clientfd);
        // return;
        close(clientfd);
        char errtxt[512]={0};
        sprintf(errtxt,"arse error!! response_str: %s",response_str.c_str());
        controller->SetFailed(errtxt);
        return;
    }

    close(clientfd);
}

zookeeper

ZooKeeper 在这里作为服务方法的管理配置中心,负责管理服务方法提供者对外提供的服务方法。
Callee提前将本端对外提供的服务方法名及自己的通信地址信息(IP:Port)注册到ZooKeeper。
当Caller发起远端调用时,会先拿着自己想要调用的服务方法名询问 ZooKeeper,ZooKeeper 告知Caller想要调用的服务方法在哪台服务器上(ZooKeeper返回目标服务器的IP:Port给Caller),Caller便向目标服务器Callee请求服务方法调用。Callle在本地执行相应服务方法后将结果返回给Caller。

安装java环境

在这里插入图片描述

1.sudo apt-get install openjdk-8-jdk
2. 配置环境变量,编辑如下文件:vim ~/.bashrc
在最后一行加:

export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
export PATH=$JAVA_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar

3.测试jdk是否安装成功:java -version
在这里插入图片描述

Ubuntu安装JDK

Zookeeper分布式协调服务

下载 zookeeper.tar.gz,解压后
1.cd conf,将 zoo_sample.cfg 改名为 zoo.cfgmv zoo_sample.cfg zoo.cfg
2.进入bin目录,启动zkServer, ./zkServer.sh start
3.可以通过netstat查看zkServer的端口,在bin目录启动zkClient.sh链接zkServer,熟悉zookeeper怎么组织节点

在这里插入图片描述
在这里插入图片描述

zk的原生开发API(c/c++接口)

1.sudo ./configure
2.sudo make
在这里插入图片描述
zookeeper 源码编译生成C函数接口,在 ./configure 后生成的 Makefile 文件中,默认是将警告当成错误的,因此导致上图中的警告,总是以错误形式展现,编译失败

进入到生成的 Makefile 中,修改第548行,将AM_CFLAGS -Wall -Werror 改为 AM_CFLAGS -Wall上述问题

Linux安装zookeeper原生C API接口出现的make编译错误

3.make install

zookeeper 项目应用

ZooKeeper相当于是一个特殊的文件系统,不过和普通文件系统不同的是,这些节点都可以设置关联的数据,而文件系统中只有文件节点可以存放数据,目录节点不行。ZooKeeper内部为了保持高吞吐和低延迟,再内存中维护了一个树状的目录结构,这种特性使ZooKeeper不能存放大量数据,每个节点存放数据的上线为1M。

服务对象名在ZooKeeper中以永久性节点的形式存在,当RpcServer与ZooKeeper断开连接后,整个节点还是会存在。方法对象名则以临时性节点存在,RpcServer与ZooKeeper断开后临时节点被删除。临时节点上带着节点数据,在本项目中,节点数据就是提供该服务方法的RpcServer的通信地址(IP+Port)

//封装的zk客户端类
class ZkClient
{
public:
    ZkClient(); 
    ~ZkClient();
    //zkclinet启动链接zkserver
    void Start();
    //在zkserver上根据指定的path创建znode节点
    void Create(const char *path,const char *data,int datalen,int state=0);
    //传入参数指定的znode节点路径,或者znode节点的值
    std::string GetData(const char *path);

private:
    //zk的客户端句柄
    zhandle_t *m_zhandle;
};
#include"zookeeperutil.h"
#include"mprpcapplication.h"
#include<semaphore.h>
#include<iostream>

// 全局的 watcher 观察器  zkserver 给 zkclient 的通知
// 参数 type 和 state 分别是 ZooKeeper 服务端返回的事件类型和连接状态
void global_watcher(zhandle_t *zh,int type,int state,const char *path,void *watcherCtx)
{
    if(type==ZOO_SESSION_EVENT) //回调的消息类型是和会话相关的消息类型
    {
        if(state==ZOO_CONNECTED_STATE)  //zkclient和zkserver链接成功
        {
            sem_t *sem=(sem_t*)zoo_get_context(zh);
            sem_post(sem);  //信号量资源加一
        }
    }
}

ZkClient::ZkClient():m_zhandle(nullptr)
{}

ZkClient::~ZkClient()
{
    if(m_zhandle!=nullptr)
    {
        zookeeper_close(m_zhandle);//关闭句柄释放资源
    }
}

//zkclinet启动链接zkserver
void ZkClient::Start()
{
    //加载zk的IP和端口号,默认为2181
    std::string host=MprpcApplication::GetInstance().GetConfig().Load("zookeeperip");
    std::string port=MprpcApplication::GetInstance().GetConfig().Load("zookeeperport");
    std::string connstr=host+":"+port;

    //调用原生API,端口与IP,回调函数,会话超时时间
    /*
    zookeeper_mt:多线程版本
    zookeeper的API客户端程序提供了三个线程
    API调用线程
    网络I/O线程:专门在一个线程里处理网络I/O
    watcher回调线程
    */
    m_zhandle=zookeeper_init(connstr.c_str(), global_watcher, 30000, nullptr, nullptr, 0);
    // 仅仅通过判断接口返回的句柄是否为NULL,并不能表示句柄是可用的。
    // 因为,会话的建立过程是异步的,必须等到会话状态变成ZOO_CONNECTED_STATE才表示句柄可用。
    if(nullptr==m_zhandle)
    {
        std::cout<<"zookeeper_init error!"<<std::endl;
        exit(EXIT_FAILURE);
    }

    sem_t sem;
    sem_init(&sem,0,0); //初始化资源为0,用于多线程间的同步
    // 将刚才定义的同步信号量sem通过这个 zoo_set_context 函数可以传递给 m_zhandle 进行保存。
    // 在global_watcher中可以将这个sem从m_zhandle取出来使用。
    zoo_set_context(m_zhandle,&sem);    //设置上下文,添加额外信息

    sem_wait(&sem); // 阻塞结束后才连接成功!!!
    std::cout<<"zookeeper_init success!"<<std::endl;

}
//在zkserver上根据指定的path创建znode节点
void ZkClient::Create(const char *path,const char *data,int datalen,int state)
{
    char path_buffer[128];
    int bufferlen=sizeof(path_buffer);
    int flag;

    //检查该节点是否存在
    flag=zoo_exists(m_zhandle,path,0,nullptr);
    if(ZNONODE==flag)//该节点并不存在
    {
        //创建指定path的znode节点
        flag=zoo_create(m_zhandle,path,data,datalen,&ZOO_OPEN_ACL_UNSAFE,state,path_buffer,bufferlen);
        if(flag==ZOK)
        {
            std::cout<<"znode create success... path:"<<path<<std::endl;
        }
        else
        {
            std::cout<<"flag:"<<flag<<std::endl;
            std::cout<<"znode create error... path:"<<path<<std::endl;
            exit(EXIT_FAILURE);
        }
    }
}
//传入参数指定的znode节点路径,获取znode节点的值
std::string ZkClient::GetData(const char *path)
{
    char buffer[64];
    int bufferlen=sizeof(buffer);
    int flag=zoo_get(m_zhandle,path,0,buffer,&bufferlen,nullptr);//获取信息与返回值
    if(flag!=ZOK)//如果获取失败
    {
        std::cout<<"get znode error... path:"<<path<<std::endl;
        return "";
    }
    else
    {
        //获取成功
        return buffer;
    }
    
}

watcher 机制就是ZooKeeper客户端对某个 znode 建立一个watcher事件,当该znode发生变化时,这些ZK客户端会收到ZK服务端的通知,然后ZK客户端根据znode的变化来做出业务上的改变。

ZooKeeper服务端收到来自客户端 callee 的连接请求后,服务端为节点创建会话(此时这个节点状态发生改变),服务端会返回给客户端callee一个事件通知,然后触发watcher回调(执行global_watcher函数).

总结

深入浅出RPC服务(一)RPC来源-论文解读
深入浅出RPC服务(二)不同层的网络协议
RPC 详解
RPC——RPC协议介绍及原理详解
C++实现轻量级RPC分布式网络通信框架
https://blog.csdn.net/weixin_52344401/article/details/131343863

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/652793.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

疑惑点:动作监听时this的含义:可以理解为接口的多态

全部代码&#xff1a; package test;import javax.swing.*; import java.awt.event.ActionEvent; import java.awt.event.ActionListener; import java.util.Random;public class test3 extends JFrame implements ActionListener {JButton jb1 new JButton("你点我啊&am…

谢宁DOE培训适合哪些人?

近年来&#xff0c;谢宁DOE培训以其专业、系统的课程内容&#xff0c;受到了众多学习者的青睐。那么&#xff0c;这个培训究竟适合哪些人呢&#xff1f;深圳天行健企业管理咨询公司解析如下&#xff1a; 首先&#xff0c;谢宁DOE培训适合质量管理部门的专业人员。质量总监、质量…

Softing线上研讨会 | 使用Softing smartLink SW-HT将AB PLC下的HART设备连接到艾默生AMS设备管理器

| (免费) 线上研讨会时间&#xff1a;2024年6月25日 14:00~14:45 / 22:30~23:15 艾默生AMS设备管理器凭借其全面功能、优秀诊断能力、兼容性以及远程监控和管理功能&#xff0c;在过程工业中被证明是一款先进的工厂资产管理工具&#xff0c;可用于设备配置、诊断和监控、仪表校…

【Pandas】深入解析`pd.read_pickle()`函数

【Pandas】深入解析pd.read_pickle()函数 &#x1f308; 欢迎莅临我的个人主页&#x1f448;这里是我深耕Python编程、机器学习和自然语言处理&#xff08;NLP&#xff09;领域&#xff0c;并乐于分享知识与经验的小天地&#xff01;&#x1f387; &#x1f393; 博主简介&…

TCP:传输控制协议

文章目录 TCP的服务TCP首部TCP连接的建立与终止建立连接协议连接终止协议TCP状态迁移图2MSL等待状态FIN_WAIT_2状态 流量控制快的发送方和慢的接收方滑动窗口 拥塞控制慢开始和拥塞避免快重传和快恢复 TCP的服务 应用数据被分割成TCP认为最适合发送的数据块当TCP发送一个段后&…

交流负载箱:电力系统的稳定利器

交流负载箱是电力系统中的一种重要设备&#xff0c;主要用于模拟电网中的负载情况&#xff0c;以便对电力系统进行各种性能测试和分析。它是电力系统的稳定利器&#xff0c;对于保障电力系统的稳定运行起着至关重要的作用。 交流负载箱可以模拟电网中的负载情况&#xff0c;为电…

西安航空学院电子工程学院领导莅临泰迪智能科技参观交流

5月26日&#xff0c;西安航空学院电子工程学院院长杨亚萍、专业教师刘坤莅临广东泰迪智能科技股份有限公司产教融合实训基地参观交流。泰迪智能科技董事长张良均、副总经理施兴、产品中心负责周东平、校企合作经理吴桂锋与泰迪智能科技韩伟进行热情了接待。双方就专业建设、协同…

Unity 之 Android 【获取设备的序列号 (Serial Number)/Android_ID】功能的简单封装

Unity 之 Android 【获取设备的序列号 (Serial Number)/Android_ID】功能的简单封装 目录 Unity 之 Android 【获取设备的序列号 (Serial Number)/Android_ID】功能的简单封装 一、简单介绍 二、获取设备的序列号 (Serial Number) 实现原理 1、Android 2、 Unity 三、注意…

【leetcode2765--最长交替子数组】

要求&#xff1a;给定一个数组&#xff0c;找出符合【x, x1,x,x-1】这样循环的最大交替数组长度。 思路&#xff1a;用两层while循环&#xff0c;第一个while用来找到符合这个循环的开头位置&#xff0c;第二个用来找到该循环的结束位置&#xff0c;并比较一下max进行记录。 …

C++模板使用

文章目录 目录 文章目录 前言 一、交换函数(泛型编程) 二、函数模板 2.1 函数模板概念 2.2函数模板格式 2.3使用方法 2.4 函数模板的原理 2.4.1库中的swap 2.5 函数模板的实例化 2.6 模板参数的匹配原则 三、类模板 3.1 类模板的定义格式 3.2类模板声明和定义分离 前言 C语言阶…

Springboot事务控制中A方法调用B方法@Transactional生效与不生效情况实战总结

介绍 本篇对Springboot事务控制中A方法调用B方法Transactional生效与不生效情况进行实战总结&#xff0c;让容易忘记或者困扰初学者甚至老鸟的开发者&#xff0c;只需要看这一篇文章即可立马找到解决方案&#xff0c;这就是干货的价值。喜欢的朋友别忘记来个一键三连哈&#x…

抖音本地生活服务商入驻指南分享!

当前&#xff0c;各大平台的团购外卖业务持续火爆&#xff0c;并逐渐成为众多创业赛道中的大热门。其中&#xff0c;本地生活服务更是在短时间内杀出重围&#xff0c;成为创业者们的首选。 根据抖音生活服务近日发布的《2023年度数据报告》&#xff0c;2023年&#xff0c;抖音生…

微信小程序图片懒加载如何实现?

微信小程序开发时&#xff0c;对于有图片的列表在加载时&#xff0c;为了用户体验更好&#xff0c;必需要对图片做懒加载。 如下图所示&#xff0c;页面在打开时&#xff0c;图片会按需加载&#xff0c;这样用户体验没有那么生硬。 以下将介绍图片懒加载的步骤&#xff1a; 1.…

R18 NTN中的RACH-less HO

在看R18 38.300时,发现NTN场景 增加了如下黄色字体的内容,R18 NTN支持了RACH-less HO,索性就简单看了看。 NTN RACH less HO相关的描述主要在38.331,38.213和38.321中。38.300中的描述显示:网络侧会通过RRCReconfiguration消息将RACH-less HO相关的配置下发给UE, 其中会包…

Java语言ADR药物不良反应系统源码Java+IntelliJ+IDEA+MySQL一款先进的药物警戒系统

Java语言ADR药物不良反应系统源码JavaIntelliJIDEAMySQL一款先进的药物警戒系统源码 ADR药物不良反应监测系统是一个综合性的监测平台&#xff0c;旨在收集、报告、分析和评价药品在使用过程中可能出现的不良反应&#xff0c;以确保药品的安全性和有效性。 以下是对该系统的详细…

Java 面向对象编程(OOP)

面向对象编程&#xff08;Object-Oriented Programming&#xff0c;OOP&#xff09;是Java编程语言的核心思想之一。通过OOP&#xff0c;Java提供了一种结构化的编程方式&#xff0c;使代码更易于维护和扩展。 一、类和对象 1. 类的定义 类是对象的蓝图或模板&#xff0c;定…

【qt】一次性学会所有对话框

对话框 一.前言二.文件对话框1.选择一个文件2.选择多个文件3.选择目录4.保存文件 三.颜色对话框1.获取颜色 四.字体对话框1.获取字体 五.输入对话框1.输入文本2.输入整数3.输入小数4.输入条目 六.消息对话框1.问题框2.信息框3.警告框4.危机框5.关于框6.关于qt框七.总结 一.前言…

CSS学习笔记:动画——使用animation添加动画效果

过渡和动画 啥是过渡? 例如transition: all 0.5s; -> 拥有该属性的标签&#xff0c;在样式改变时&#xff0c;将在设定的时间内逐渐过渡到另一个样式 啥是动画&#xff1f; 和过渡有点类似&#xff0c;只不过常常用于实现多个状态间的变化过程&#xff0c;动画过程可控…

基于PHP+MySQL组合开发的720VR全景小程序源码系统 一键生成三维实景 前后端分离带网站的安装代码包以及搭建教程

系统概述 这款源码系统是专门为实现 720VR 全景展示而设计的。它结合了先进的技术和创新的理念&#xff0c;能够将真实场景以全景的形式呈现给用户&#xff0c;让用户仿佛身临其境。该系统采用 PHP 进行后端开发&#xff0c;MySQL 作为数据库管理系统&#xff0c;确保了系统的…

【JAVA |Object类重写实例】Cloneable 接口、Comparable接口、比较器

✨✨谢谢大家捧场&#xff0c;祝屏幕前的小伙伴们每天都有好运相伴左右&#xff0c;一定要天天开心哦&#xff01;✨✨ &#x1f388;&#x1f388;作者主页&#xff1a; &#x1f388;丠丠64-CSDN博客&#x1f388; ✨✨ 帅哥美女们&#xff0c;我们共同加油&#xff01;一起…