文章目录
- rpc是什么?又如何实现服务通信?
- 理解rpc
- RPC的通信过程
- 通信协议的选择
- 小结
- RPC VS Restful
- net_rpc实践案例
- net/rpc包介绍
- 创建服务端
- 创建client
- 看看net_rpc的通信调度实现的内部原理
- 明确目标
- 基于自己实现的角度分析我会怎么做
- 代码分析
- grpc介绍与下载安装
- 前言与背景
- grpc针对上面问题的解决方案简述
- grpc下载
- grpc框架实践案例
- 分析
- 流程
- 项目结构
- 编写描述文件 user.protoc
- 编译生成.pb.go文件
- 实现服务端接口
- 实现客户端调用
- 小结
- protobuf的协议原理
- protobuf介绍
- 最初的背景
- protobuf的工作过程
- protobuf的语法
- protobuf序列化
- 小结
- 理解grpc的请求协议http2
- 了解http1及问题
- 理解http2的设计
- 服务注册
- 服务初始化
- 拦截器
- 监听与调度
- grpc的客户端请求发送实现原理分析
- 初步分析
- 客户端初始化
- 请求调度
- 获取连接与关闭
- 小结
rpc是什么?又如何实现服务通信?
理解rpc
RPC在大众的定义上称为:远程过程调用,下面举个栗子给大家分析。
如:从订单服务中获取用户的信息
在单体服务下(new UserService).GetUser(uid)
我们可以这样的方式调用获取,但是当服务拆分开后代码也拆分为两份,这个时候就无法如单体服务那样new
的方式调度,需通过rpc进行调度。
在每个微服务中会开启一个rpc服务供其他服务访问,请求方这时会创建一个rpc的客户端,然后通过这个客户端向目标服务发送rpc请求,服务端收到请求后,根据请求信息就可以调用GetUser(uid)返回结果。
再来理解rpc的定义【远程调度过程】,
- 这里的远程就是指因为系统根据服务拆分了,原本的调度过程是通过new的方式,现在要改为从当前服务器去另一台服务器中调度。
- 那调度过程又是指什么呢?指的是rpc_client请求rpc_server的过程,而此处的过程在细节上是调度的传输协议、连接方式、序列化及数据格式等。
即该定义【RPC:远程调度过程】,就是对在微服务中服务与服务之间相互调度的方式过程的统称。
RPC的通信过程
了解到RPC的定义后那RPC又是如何通信的呢:我们通过如下几个问题来理解。
问题1:我们应该如何找到目标服务
通过ip和port既可以确定目标服务器和程序。
问题2:请求的目标是什么?
根据上面的例子是调用的userServer中的GetUser方法,及以用户服务中的业务方法作为目标。
问题3:调度传输的数据结构如何设计?
针对该问题可能存在的几个情况
- 首先需要固定一个特定格式作为rpc_client与rpc_server的通信格式标准
- 请求的目标服务可能存在多个方法,因此我们在请求的时候需要告服务,我们要请求的方法
- 请求的目标服务还需要考虑传递参数
- 再细节点可能因网络关系会存在消息接收延迟,可能同时接收了多个消息,因此还需考虑消息的区分问题。
因此可以约定如下格式
type Request struct {
Method string `json:"method"`
Params interface{} `json:"params"`
Id string `json:"id"`
}
type Response struct {
Id string `json:"id"`
Result interface{} `json:"result"`
Error error `json:"error"`
}
问题4:数据用传输协议?
rpc的通信中传输协议可以用json/xml/binary等
问题5:rpc服务客户端基于什么网络协议实现?
rpc并没有固定采用特定的网络协议来实现,在目前业界中以tcp与http2协议为主实现,也可以通过http甚至是udp也是可以的。
最后我们来看看RPC的通信过程示意图:
在示意图中
- 客户端与服务端之间会先约定统一的编码/数据格式,一般都是以服务端定义为主。
- 客户端在本地代码中发起调度
- client stub(客户端存根):实际就是对rpc请求的封装包代码,主要事情是将接收到的调度请求,进行组装为对应的数据结构/编码/并将消息通过网络发送给目标服务器
- server stub(服务端存根):实际就是对rpc请求的封装包代码,主要是将接收到的消息按照定义好的编码拆包,获取请求方法与参数,并根据方法名和参数进行本地调用。
- 服务端处理完请求,将结果通过server stub处理返回给调用的客户端。
- 调用的客户端中基于client stub获取到请求后解析,最终得到本次rpc调用的结果
通信协议的选择
为什么RPC会大都以tcp协议为主实现呢?而不是采用http。
RPC通信采用TCP为主而不是HTTP的原因有以下几点:
- 效率:TCP是一种面向连接的可靠传输协议,相对于HTTP来说,TCP协议的通信效率更高。TCP在传输数据时,采用了流式传输的方式,可以保证数据的完整性和可靠性,适合于需要高效传输大量数据的场景。
- 传输量:RPC通常用于传输大量的数据,而HTTP协议对于传输大量数据的支持相对较弱。HTTP协议在传输数据时,会对数据进行分块传输,增加了数据的传输量和传输时间。
- 灵活性:RPC通信可以使用多种通信协议,包括TCP、UDP等,而HTTP协议只能使用TCP作为传输协议。RPC可以根据具体的需求选择合适的通信协议,提供更灵活的通信方式。
- 安全性:TCP协议支持加密和身份验证等安全机制,可以提供更高的安全性。对于需要保护数据安全的场景,RPC通信采用TCP协议可以更好地满足安全需求。
尽管HTTP协议也可以用于RPC通信,但相对于TCP来说,它的通信效率和传输量较低,不适合于需要高效传输大量数据的场景。因此,RPC通信更倾向于采用TCP作为主要的传输协议。
小结
本节中主要分析RPC的定义远程调度过程,以及RPC的具体通信过程。需注意的是RPC的网络实现方式不是固定于某一种可以基于http、tcp、http2,只是目前业界中以tcp居多。
RPC VS Restful
Restful是一种用于构建网络应用程序的软件架构风格,把服务器看成是资源,通过URL来定义服务器上的资源进行某种操作。
如:
GET /user/posts 获取用户 GET /user/getPosts 获取用户
POST /user/posts 发布用户 GET /user/addPosts 发布用户
PUT /user/posts 修改用户 GET /user/editPosts 修改用户
DELETE /user/posts 删除用户 GET /user/deletePosts 删除用户
因此本身restful是一种规范,是一种对http请求交互及协议的规范,实际上RPC与restful本身并没有可比性。
区别点:
- 定义区别:RPC是一种思想是对微服务之间的调度过程的定义,restful是一种http请求协议的标准规范
- 目标不同:RPC主要的调度对象本质对象是service;restful是基于http请求方法,调度的本质对象是controller
- 通信协议:rpc可以自定义协议,而restful是统一的http协议
- 传输协议:在rpc中传输的协议可以用二进制性能消耗低,而restful是json字符串
适用
微服务内部服务直接基于RPC,而对外则提供restful接口
net_rpc实践案例
net/rpc包介绍
在go语言中自带了net/rpc
包,在这个包中包含了对rpc的封装,可以理解为就是之前提到的client stub
与server stub
其中rpc/jsonrpc
是go内部提供的一种默认的序列化方式。
创建服务端
假定要实现的功能是客户端调用服务端中的GetUser获取用户信息
而创建服务端的流程为:
- 定义返回数据
- 确定通信之间请求与响应的数据结构
- 创建服务
- 将服务注册到rpc中并基于指定的方式启动监听
首先准备好要请求返回的数据
type User struct {
Id string
Name string
Phone string
Pass string
}
var users = map[string]*User{
"1": {
Id: "1",
Name: "bala",
Phone: "13100001111",
Pass: "123456",
},
"2": {
Id: "2",
Name: "小王",
Phone: "13200001111",
Pass: "123456",
},
}
同时再定义好GetUser需要接收的请求参数与响应结果
type (
GetUserReq struct {
Id string `json:"id"`
}
GetUserResp struct {
Id string
Name string
Phone string
}
)
定义服务UserServer, 并完善GetUser方法,需注意在GetUser方法的参数中第一个参数为请求信息,第二个参数为rpc返回数据
type UserServer struct{}
func (u *UserServer) GetUser(req GetUserReq, resp *GetUserResp) error {
if user, ok := users[req.Id]; ok {
*resp = GetUserResp{
Id: user.Id,
Name: user.Name,
Phone: user.Phone,
}
return nil
}
return errors.New("不存在查询用户")
}
最后我们还需要将服务跑起来
func main() {
// 创建服务
userServer := new(UserServer)
// 将UserServer注册为rpc服务
rpc.Register(userServer)
// 创建TCP监听
listener, err := net.Listen("tcp", ":1234")
if err != nil {
log.Fatal("监听TCP端口失败", err)
}
log.Println("服务已启动,等待客户端连接...")
for {
// 等待客户端连接
conn, err := listener.Accept()
if err != nil {
log.Fatal("接受客户端连接失败:", err)
}
// 并发处理客户端请求
go rpc.ServeConn(conn)
}
}
创建client
在客户端中主要的流程为
- 建立连接
- 准备好要发送的请求信息
- 远程调度
import (
"log"
"net/rpc"
)
type (
GetUserReq struct {
Id string `json:"id"`
}
GetUserResp struct {
Id string
Name string
Phone string
}
)
func main() {
// 连接RPC服务端
client, err := rpc.Dial("tcp", "localhost:1234")
if err != nil {
log.Fatal("连接RPC服务失败:", err)
}
// 定义请求参数和响应结果的变量
req := GetUserReq{
Id: "3",
}
var resp GetUserResp
// 调用远程方法
err = client.Call("UserServer.GetUser", req, &resp)
if err != nil {
log.Fatal("调用远程方法失败:", err)
}
log.Println("远程方法调用结果:", resp)
}
在代码中调用的方法名如何确定呢?在rpc/server
中UserServer注册为rpc之后,rpc会以UserServer作为名称前缀,而如果要调用GetUser,则通过UserServer.GetUser识别目标方法。
看看net_rpc的通信调度实现的内部原理
明确目标
这里呢分析一下技巧,对于内核源码的分析并不是一下子直接就去看内核代码,而是先捋清楚我们要看源码的目的目标,否则容易会在内核复杂的源码中迷失自己的方向。
我们主要的目标是理解go中net/rpc的调度原理,因此本节的目标则主要是理解在net/rpc中是如何传递解析请求数据、如何根据请求信息调度服务方法、请求的信息是如何序列化与反序列化的。
基于自己实现的角度分析我会怎么做
然后我们需要进一步的分析自己如果去实现会怎么做?通过自己的去实现的角度思考,可以增强对源码的认识度同时也可以让自己的在看的过过程更清晰。
所以在分析之前需先理解rpc通讯的关键问题与要素
- rpc之间通讯的数据格式与传输参数
- rpc之间的数据编码与通讯方式
- 接收到请求信息之后是如何运行对应服务的。
rpc之间通讯的数据格式与传输参数
如果我们自己来实现首先定义两者之间的通信数据结构
{
"method": "UserServer.GetUser", // 请求方法
"params": "1", // 请求参数
"id": "1"
}
编码方式:
我们先定位json与tcp协议通讯。
接收到请求信息之后是如何运行对应服务的。
在请求的参数中首先可以依据method是能够定位到服务和方法,而本质是用字符串,这时我们可以想到在go语言中可以基于反射找到对应的方法并执行。
结合理解绘制执行图
代码分析
在有了初步分析之后下一步我们需要去看代码验证自己的猜想,同时理解在代码中的具体实现。
服务注册:
func main() {
// 创建服务
userServer := new(UserServer)
// 将UserServer注册为rpc服务
rpc.Register(userServer)
// 创建TCP监听
listener, err := net.Listen("tcp", ":1234")
if err != nil {
log.Fatal("监听TCP端口失败", err)
}
log.Println("服务已启动,等待客户端连接...")
for {
// 等待客户端连接
conn, err := listener.Accept()
if err != nil {
log.Fatal("接受客户端连接失败:", err)
}
// 并发处理客户端请求
go rpc.ServeConn(conn)
}
}
代码从rpc的服务注册开始分析。通过代码可以看到当创建了UserServer对象之后,下一步就会调用rpc.Register方法进行注册。
而根据我们的分析的流程,这个时候在内核中应该会通过反射获取到需要注册的服务对象以及方法,一一进行绑定注册。
func (server *Server) register(rcvr any, name string, useName bool) error {
s := new(service)
s.typ = reflect.TypeOf(rcvr)
s.rcvr = reflect.ValueOf(rcvr)
// ...
if _, dup := server.serviceMap.LoadOrStore(sname, s); dup {
return errors.New("rpc: service already defined: " + sname)
}
return nil
}
通过代码的阅读可以看到最终注册到服务的server.serviceMap
属性中。
服务的调度
在了解到服务的注册后,我们再进一步的了解服务的调度过程。
而rpc的请求调的入口可以根据代码可以看到是基于rpc.ServeConn
完成请求的
// 并发处理客户端请求
go rpc.ServeConn(conn)
func (server *Server) ServeConn(conn io.ReadWriteCloser) {
buf := bufio.NewWriter(conn)
srv := &gobServerCodec{
rwc: conn,
dec: gob.NewDecoder(conn),
enc: gob.NewEncoder(buf),
encBuf: buf,
}
server.ServeCodec(srv)
}
func ServeConn(conn io.ReadWriteCloser) {
DefaultServer.ServeConn(conn)
}
func (server *Server) ServeCodec(codec ServerCodec) {
sending := new(sync.Mutex)
wg := new(sync.WaitGroup)
for {
service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec)
println("service", service)
if err != nil {
if debugLog && err != io.EOF {
log.Println("rpc:", err)
}
if !keepReading {
break
}
// send a response if we actually managed to read a header.
if req != nil {
server.sendResponse(sending, req, invalidRequest, codec, err.Error())
server.freeRequest(req)
}
continue
}
wg.Add(1)
go service.call(server, sending, wg, mtype, req, argv, replyv, codec)
}
wg.Wait()
codec.Close()
}
通过代码的方法命名可以知道在内容中是先接收到请求参数,并读取信息,然后再调用服务的方法。在上面我们可以为了验证猜想在代码中增加打印输出看看是否符合我们的猜想。
再继续往后看可以看到rpc的编码以及调度是通过反射实现的。
grpc介绍与下载安装
前言与背景
在前面的内容中给大家介绍了rpc的功能、案例以及go中net/rpc的实现,在目前的业界中项目的开发技术u运用越来越多,如多种语言的结合开发,多平台的结合开发。在这种情况下就会带来新的问题,如
- rpc如何设计可以很好的支持版本的迭代发展问题
- 在各个项目中对于rpc的交互如何实现跨平台跨语言的问题
针对普通rpc在通讯中存在的问题google提出grpc来解决。grpc是一个高性能、开源、通用的rpc框架,基于http2协议标准设计开发,grpc使用protocol buffers作为默认的序列化和反序列化机制,支持多种开发语言。grpc还提供了一种简单的方法来精确的定义服务,并且为客户端和服务端自动生成可靠的功能库。
grpc针对上面问题的解决方案简述
基于grpc的使用在感悟上,grpc更像是定义rpc的通讯模板或者说是基于说明书创作,在使用grpc的之前,需要先定义一个.protoc的文件,可以取名叫user.protoc或者是goods.protoc,这个文件的作用就相当于一个说明书。服务端根据该文件创建服务定义方法以及每个方法的请求与相应的信息,而客户端通过这个文件就可以知道目标服务端的方法定义、请求参数、响应参数。
而在通讯中的数据传输的协议序列化与反序列化,通过grpc框架内部提供的功能库实现,而通讯之中的协议运用http2基于post请求。
grpc下载
grpc所使用的关键包和软件为如下内容
// 对请求参数进行序列化和反序列化
github.com/golang/protobuf/proto
// grpc框架核心包库代码
google.golang.org/grpc
// grpc的protoc代码生成工具
github.com/golang/protobuf/protoc-gen-go
// 编译器
protoc.exe
grpc框架实践案例
分析
在上一节简要的介绍了grpc框架的应用,而后我们下载了一些工具和包库,在基于grpc项目的开发中,首先我们需要先定义好.protoc.
流程
- 编写.protoc描述文件
- 编译生成.pb.go文件
- 服务端实现约定的接口并提供服务
- 客户端根据约定的方法请求服务端
项目结构
|--user
|--client
|--main.go
|--server
|--main.go
|--proto
|--user.proto
|--user.pb.go
编写描述文件 user.protoc
syntax = "proto3"; // 指定proto版本
package user; // 指定默认包名
// 指定golang包名
option go_package = "./user";
// 定义Hello服务
service User {
// 定义SayHello方法
rpc GetUser(GetUserReq) returns (GetUserResp) {}
}
// HelloRequest 请求结构
message GetUserReq {
string id = 1;
}
// HelloResponse 响应结构
message GetUserResp {
string id = 1;
string name = 2;
string phone = 3;
}
user.proto
文件中定义了一个UserService,该服务包含一个GetUser
方法,同时声明了GetUserReq
和GetUserResp
消息结构用于请求和响应。客户端使用GetUserReq
参数调用GetUser
方法请求服务端,服务端响应GetUserResp
消息。
编译生成.pb.go文件
cd proto
# 执行命令
protoc --go-grpc_out=require_unimplemented_servers=false:. --go_out=. ./user.proto
它会在当前目录下生成user_grpc.pb.go与user.pb.go文件,根据.proto文件中的说明,包含服务端接口UserServer的描述,客户端请求实现UserClient
实现服务端接口
在通过命令构建后,基于之前的代码往下实现,首先定义一个User结构体对象,实现在.protoc中描述的接口,再通过grpc实现服务的注册。
package main
import (
"context"
"errors"
"log"
"net"
"google.golang.org/grpc"
pb "imooc.com/traning/3-6/protoc/user"
)
type User struct {
Id string
Name string
Phone string
Pass string
}
var users = map[string]*User{
"1": {
Id: "1",
Name: "balabala",
Phone: "13100001111",
Pass: "123456",
},
"2": {
Id: "2",
Name: "小王",
Phone: "13200001111",
Pass: "123456",
},
}
type UserServer struct{}
func (u *UserServer) mustEmbedUnimplementedUserServer() {
panic("implement me")
}
func (u *UserServer) GetUser(ctx context.Context, req *pb.GetUserReq) (*pb.GetUserResp, error) {
if user, ok := users[req.Id]; ok {
return &pb.GetUserResp{
Id: user.Id,
Name: user.Name,
Phone: user.Phone,
}, nil
}
return nil, errors.New("不存在查询用户")
}
func main() {
address := "127.0.0.1:1234"
listen, err := net.Listen("tcp", address)
if err != nil {
log.Fatalf("failed to listen %v", err)
}
s := grpc.NewServer()
pb.RegisterUserServer(s, new(UserServer))
log.Println("服务已启动,等待客户端连接...")
s.Serve(listen)
}
通过代码可以看出grpc在使用上与net/rpc包的使用两者没有太多的区别。
实现客户端调用
func main() {
client, err := grpc.Dial("127.0.0.1:1234", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatalf("did not connect %v", err)
}
defer client.Close()
c := pb.NewUserClient(client)
r, err := c.GetUser(context.Background(), &pb.GetUserReq{Id: "1"})
if err != nil {
log.Fatalf("req get user err %v", err)
}
log.Println(r)
}
客户端初始化连接后直接调用.pb.go中实现的GetUser方法,即可向服务端发起请求,在使用上与本地调用没有太多差别。
小结
grpc在使用上与标准库提供的rpc包是类似的,因此上手是很快的也非常的简单,但是grpc在rpc通讯中除了调度还提供如拦截器、认证、负载均衡等功能。
protobuf的协议原理
protobuf介绍
protobuf是谷歌开发的一款无关平台、语言、可扩展、轻量级高效的序列化结构的数 据格式,用于将自定义数据结构序列化成字节流,和将字节流反序列化为数据结构。不同应用之间互相通信的数据交换格式,只要实现相同的协议格式,即后缀为proto文件 被编译成不同的语言版本,加入各自的项目中,这样不同的语言可以解析其它语言通过 Protobuf序列化的数据
protobuf的优势:
- 序列化后体积比json和xml小,适合网络传输
- 序列化反序列化速度快,比JSON的处理速度
Protobuf官方工程主页上显示的已支持的开发语言多达10种,分别有:C++、Java、 Python、Objective-C、C#、JavaNano、Ruby、Go、PHP,基本上主流的语言都已 支持。
最初的背景
protobuf最初并不是为了应用于解决序列化速度,而是解决新旧版本的兼容问题。因为当服务进行升级和优化时
主要解决:
- 更容易引入新的字段
- 据格式更加具有自我描述性
protobuf的工作过程
对于序列化协议来说,使用方只需要关注业务对象本身,即 idl 定义,序列化和反序列化的代码只需要通过工具生成即可
对于序列化协议来说,使用方只需要关注业务对象本身即 idl 定义,序列化和反序列化的代码只需要通过工具生成即可
protobuf的语法
protobuf主要是对客户端与服务端在相互通讯中的标准为主,因此在protobuf中主要是
- service服务定义语法
- message数据结构及字段定义语法
首先看message:
// HelloResponse 响应结构
message GetUserResp {
string id = 1;
string name = 2;
repeated string phone = 3;
}
message与go语言中的struct是类似,可以理解为是定义一个对象或者一种数据结构。
在message中字段的定义格式如下:
限定修饰符 | 数据类型 | 字段名称 | = 字段标识号 | [字段默认值]
-限定修饰符是可选项不是必须的,为字段增加规则
- required:该规则规定,消息体中该字段的值是必须要设置的
- optional:消息体中该规则的字段的值可以存在,也可以为空,optional的字 段可以根据defalut设置默认值。
- repeated:消息体中该规则字段可以存在多个(包括0个),该规则对应java 的数组或者go语言的slice。
-在protoc协议中的数据类型映射如下:
而标识号是每个字段等号后面的值,该值在当前的message中是唯一的不能重复定义,一旦定义开始就不能改变,标识号从整数1开始递增,其中[19000-19999]范围的值是protobuf的预留字段,不建议使用。
service语法
service可以在.proto文件中定义一个服务接口,通过service既可以向客户端与服务端描述服务对象及服务对象的实现方法
service UserService{}
如果我们希望在service中实现某一个rpc的方法可以通过以下格式定义
rpc 方法(请求参数) returns(响应值)
在编程上的建议
- 一个.proto中可以定义多个service,但是建议一个proto只定义一个service即主要的事项围绕与该service进行,这样有利于维护各个服务的定义。
- 在每个service中会有请求参数与响应参数的message,推荐是可以根据方法名来定义请求与响应的message名,这样可以在后期文件内容过的时候区分各个message的功能,可以针对性的扩展需求。
protobuf序列化
在了解protobuf的语法定义后,接下来的关键就是protobuf的序列化,它的实现方式与传统方式具有区别。
在前面了解protobuf的语法时候,相信大家都会有这么个问题,就是为什么会定义标识?
{
"id" : "a11111",
"name": "balabala",
"class": "golang"
}
如上是一份json的数据格式,也就是信息传输中发送的数据结构,我们有没有办法对该传输的结构再优化减少占用内存呢?
针对该问题,基于json的数据结构可以发现,在数据通讯中key值实际上是一个固定的值,是不会发生改变的,在protobuf中就针对该值下手了。
将发生的key改为int类型即定:
id = 1
name = 2
class = 3
数据信息也随之发生改变
{
1: "a11111",
2: "balabala",
3: "golang"
}
这样就减少了发送信息中所占用的内存,因此在protobuf中每一个字段后的标识号,就是用于序列化的时候标识具体的解析key。
同时protobuf在序列化的时候采用Varint方式设定,该设定在整体的对int在存储上进一步进行优化,原一个int类型的数值需4个byte,在经过优化后只需2个byte。
利用巧妙的数值位置交换设定,就可以很好的减少数值存储的空间。
在序列化时,Protobuf按照TLV的格式序列化每一个字段,T即Tag,也叫Key;V是该字段对应的值value;L是Value的长度,如果一个字段是整形,这个L部分会省略。
序列化后的Value是按原样保存到字符串或者文件中,Key按照一定的转换条件保存起 来,序列化后的结果就是 KeyValueKeyValue…依次类推的样式
小结
通过这节主要学习的是protobuf的主要解决的问题、语法、序列化编码过程。对于protobuf的语法重点掌握service与message,他们在往后的工作中应用较多。而对于protobuf的设计与序列化过程,最重要的是理解设定的思想。
理解grpc的请求协议http2
了解http1及问题
在讲http2之前先了解http1,因为http2是http1的扩展,所以http2的语义是、功能、http方法、状态码、url和首部字段等这些核心概念是不变的。
在浏览器中通过输入url我们就可以访问到想访问的服务器,并从中获取相关的服务器资源信息
而url指向的服务器地址,可以翻译为就是协议+域名+端口+路径的信息组成,而其中 http则就是最常用的协议,默认端口是80,通常可以省略
HTTP 超文本传输协议是位于 TCP/IP 体系结构中的应用层协议,它是万维网的数据通信基础。
http的连接建立过程
- 浏览器请求,先通过dns解析域名的ip地址
- 通过tcp三次握手建立tcp连接
- 发起http请求
- 目标服务器接受到http请求并处理
- 目标服务器往浏览器发送http响应
- 浏览器解析并渲染页面
http1报文
http报文由请求行、首部、实体主体组成,它们之间由CRLF分隔开 注意:实体包括首部和实体主体,sp即是空格space。
http1协议的问题
- 低效的tcp利用
- 臃肿的消息首部
- 明文传输
- 传输效率低
理解http2的设计
接下来看看http2,在目前的浏览器浏览器中http2的实现是基于ssl/tls,因此http2的连接建立过程和https是差不多的,而http2在http1的基础上做了如下改进。
- 传输的改进:二进制传输
- 请求头优化
- 多路复用
- 服务端可以推送
- 基于SSL/TLS提高安全性
- 传输协议的改进
http1在传输中会遇到如下问题:
- 一次只能处理一个请求或响应完成之前不能停止解析。
- 无法预判解析需要多少内存
- 还需考虑个别浏览器用LF做分隔符
http2在传输中的是采用帧进行解决,在设计思想上与分段传输及tcp包头有相似之处。
在应用层和传输层之间增加一个二进制分帧层,http/2会将所有传输的信息分割为更小的消息和帧,并采用二进制格式的编码。这时原本以固有的header+body的组合报文方式就改为了一个个碎片,这些碎片数据信息可以乱序发送,而浏览器会根据流id重新组合起来,以获得实际传输的数据。
简化的理解为,http2将原本的数据包拆分成为多个小的数据包,然后在每个数据包上编个序号,基于流发送客户端。客户端在接收到这些数据包后,就可以根据固定位获取编号再组装数据。
2.请求头的优化
http协议是不带状态的,每次请求都必须要附带上所有的信息,因此客户端在请求中会收到很多重复的内容,如cookie、user agent这样就会浪费很多的资源,也影响速度。
http/2对这一点上做了优化。
- 是信息压缩:头信息使用gzip或compress压缩后再发送
- 双方信息缓存:客户端和服务端同时维护一张头信息表,当需要更改时发送对应信的索引号即可
属性 | 值 |
---|---|
:method | GET |
:scheme | https |
:host | www.baidu.com |
:path | |
accept | image/jpeg |
user-agent | Mozilla/5.0… |
3.多路复用
在连接和传输上http2也做了优化,在目前浏览器中打开一个页面总会加载很多的js/css/img等文件信息,因http1是短连接,因此每一个信息都可以视为是一个新的请求,需要与服务端建立多次tcp的连接才能获取到所有信息。
并且在收到数据信息后,还需要按照顺序一一对应,就形成了队头阻塞的情况。
而http2则只需要建立一次tcp链接,通过多条流,基于帧中的标识就可以知道信息是属于哪个请求,大大的提升了http传输的性能。
服务注册
在使用grpc的时候,我们是基于protoc命令将.proto文件编译成grpc的类库文件,而服务端则依据该类库文件中定义的接口进行实现,同时在客户端中会根据该类库文件中定义的接口对服务端调度。
约定的服务是通过user_grpc.db.go中所提供的RegisterUserServer方法完成服务的注册工作,我们可以先看看user_grpc.pb.go的内容。
type UserServer interface {
// 定义SayHello方法
GetUser(context.Context, *GetUserReq) (*GetUserResp, error)
Ping(context.Context, *Request) (*Response, error)
}
// UnimplementedUserServer should be embedded to have forward compatible implementations.
type UnimplementedUserServer struct {
}
func (UnimplementedUserServer) GetUser(context.Context, *GetUserReq) (*GetUserResp, error) {
return nil, status.Errorf(codes.Unimplemented, "method GetUser not implemented")
}
func (UnimplementedUserServer) Ping(context.Context, *Request) (*Response, error) {
return nil, status.Errorf(codes.Unimplemented, "method Ping not implemented")
}
// UnsafeUserServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to UserServer will
// result in compilation errors.
type UnsafeUserServer interface {
mustEmbedUnimplementedUserServer()
}
func RegisterUserServer(s grpc.ServiceRegistrar, srv UserServer) {
s.RegisterService(&User_ServiceDesc, srv)
}
在文件中可以看到内容里定义了UserServer
服务的接口,并提供了一个对UserServer
的默认实现,而再代码下可以看到RegisterUserServer
方法,在方法中要求传递grpc.ServiceRegistrar
即grpc的服务注册接口,其次是UserServer
即接口的实现对象。
在方法中的内容是s.RegisterService(&User_ServiceDesc, srv)
这样,到此我们可以理解grpc在服务注册的时候是基于 服务的描述ServiceDesc
完成对服务注册的工作。
func _User_GetUser_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(GetUserReq)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(UserServer).GetUser(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: User_GetUser_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(UserServer).GetUser(ctx, req.(*GetUserReq))
}
return interceptor(ctx, in, info, handler)
}
func _User_Ping_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(Request)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(UserServer).Ping(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: User_Ping_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(UserServer).Ping(ctx, req.(*Request))
}
return interceptor(ctx, in, info, handler)
}
// User_ServiceDesc is the grpc.ServiceDesc for User service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
var User_ServiceDesc = grpc.ServiceDesc{
ServiceName: "user.User",
HandlerType: (*UserServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "GetUser",
Handler: _User_GetUser_Handler,
},
{
MethodName: "Ping",
Handler: _User_Ping_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "user.proto",
}
在内容中通过protoc命令生成的时候,依据.protoc文件定义了服务方法的调用和服务的描述User_ServiceDesc
在User_ServiceDesc
中定义了要注册的服务对象名,服务的调用方法列表,以及服务的处理类型,在客户端请求的时候会依据【服务名.方法名】的方式请求,因此在RegisterUserServer
进行服务注册的时候,内部是肯定用到map进行存储,key则为服务名。
对代码跟踪查看在grpc/server.go中确实是以map存储,并且信息的来源主要从ServiceDesc中获取。
func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
if ss != nil {
ht := reflect.TypeOf(sd.HandlerType).Elem()
st := reflect.TypeOf(ss)
if !st.Implements(ht) {
logger.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)
}
}
s.register(sd, ss)
}
func (s *Server) register(sd *ServiceDesc, ss interface{}) {
s.mu.Lock()
defer s.mu.Unlock()
s.printf("RegisterService(%q)", sd.ServiceName)
if s.serve {
logger.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName)
}
if _, ok := s.services[sd.ServiceName]; ok {
logger.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
}
info := &serviceInfo{
serviceImpl: ss,
methods: make(map[string]*MethodDesc),
streams: make(map[string]*StreamDesc),
mdata: sd.Metadata,
}
for i := range sd.Methods {
d := &sd.Methods[i]
info.methods[d.MethodName] = d
}
for i := range sd.Streams {
d := &sd.Streams[i]
info.streams[d.StreamName] = d
}
s.services[sd.ServiceName] = info
}
回到User_ServiceDesc
中,从_User_GetUser_Handler
的代码逻辑中,可以分解为是做了三个事情
- 请求参数的解析
- 无拦截器则直接执行服务的方法
- 如果有拦截器,则将方法封装在拦截器里并执行
func _User_GetUser_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(GetUserReq)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(UserServer).GetUser(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: User_GetUser_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(UserServer).GetUser(ctx, req.(*GetUserReq))
}
return interceptor(ctx, in, info, handler)
}
在调用请求方法的时候标准库中使用的是反射机制,而在此处是直接利用了断言为服务对象的类型接口,然后再调用方法。相对来说从代码的结构上较为明了,同时性能也会稍快一些。
因此grpc的注册我们就可以进行如下理解:
服务初始化
在看看grpc对服务的初始化操作事项
// 这是创建grpc server的代码
s := grpc.NewServer()
具体的创建方法
// NewServer creates a gRPC server which has no service registered and has not
// started to accept requests yet.
func NewServer(opt ...ServerOption) *Server {
opts := defaultServerOptions
for _, o := range globalServerOptions {
o.apply(&opts)
}
for _, o := range opt {
o.apply(&opts)
}
s := &Server{
lis: make(map[net.Listener]bool),
opts: opts,
conns: make(map[string]map[transport.ServerTransport]bool),
services: make(map[string]*serviceInfo),
quit: grpcsync.NewEvent(),
done: grpcsync.NewEvent(),
czData: new(channelzData),
}
s.cv = sync.NewCond(&s.mu)
// ....
return s
}
在代码中先对ServerOption进行操作,然后创建出grpc.Server的实例对象。当前服务初始化的这些参数是有什么作用呢?
理解源码参数技巧:对源码分析的时候需依据方法名属性名及设计的参数名分析。如下
s := &Server{
lis: make(map[net.Listener]bool)
}
在代码中lis的value值是map,以net.Listener作为了key,因此实际上我们通过当前参数的value就可以推断出它的作用是监听地址列表。那为什么map的value采用bool类型呢?
从作用上可以分析lis是存监听的地址列表,顾在存储上可以考虑运用数组,但是数组在查询的时间效率上是o(n),而哈希则是O(1),顾效率上运用map是较为合适的。而使用bool类型则主要是考虑map的读取验证,bool在go中默认值是false,map在使用中判断是否存在通常采用如下的写法。
if tmp, ok := lis[key]; !ok {
return "不存在"
}
但如果运用bool值之后的写法, 就会更加的简洁。
if !lis[key] {
return "不存在"
}
根据前面的思想我们就可以理解各个参数的意义和作用
- lis: 监听地址列表
- opts:服务选项
- conns:客户端的连接
- m: 服务对象的映射
- quit: 退出信号
- done: 完成信号
- czData: 存储使用的相关数据
- cv: 程序退出信号 【通过参数的注释分析得出结论】
在创建grpc的server对象的时候,传递了一些option方法然后在初始化的时候执行调度。这个方式在大多数的框架中有应用。
type ServerOption interface {
apply(*serverOptions)
}
type serverOptions struct {
creds credentials.TransportCredentials
codec baseCodec
cp Compressor
dc Decompressor
unaryInt UnaryServerInterceptor
streamInt StreamServerInterceptor
chainUnaryInts []UnaryServerInterceptor
chainStreamInts []StreamServerInterceptor
binaryLogger binarylog.Logger
inTapHandle tap.ServerInHandle
statsHandlers []stats.Handler
maxConcurrentStreams uint32
maxReceiveMessageSize int
maxSendMessageSize int
unknownStreamDesc *StreamDesc
keepaliveParams keepalive.ServerParameters
keepalivePolicy keepalive.EnforcementPolicy
initialWindowSize int32
initialConnWindowSize int32
writeBufferSize int
readBufferSize int
connectionTimeout time.Duration
maxHeaderListSize *uint32
headerTableSize *uint32
numServerWorkers uint32
recvBufferPool SharedBufferPool
}
如ServerOption它实际是一个接口,约定一个apply方法,要求传递serverOptions操作对象。同时在代码的下方提供一个实现该结构的对象以及相关的调用方法。
type funcServerOption struct {
f func(*serverOptions)
}
func (fdo *funcServerOption) apply(do *serverOptions) {
fdo.f(do)
}
func newFuncServerOption(f func(*serverOptions)) *funcServerOption {
return &funcServerOption{
f: f,
}
}
func WriteBufferSize(s int) ServerOption {
return newFuncServerOption(func(o *serverOptions) {
o.writeBufferSize = s
})
}
通过上下文的代码结合,可以看到在grpc.NewServer中我们传递的ServerOption对象会通过for不断调用apply方法,而该方法实际上是对serverOptions进行赋值。
这种方式可以理解为是一种设计模式,它主要操作的对象是叫Options的对象,其定义是在程序中如果存在一些可选操作处理的时候称为Option,它与Config具有相似之处都是一种配置,但不同之处在于它是以方法的方式传递,通过方法约定和设置传输参数的正确性。在代码阅读的时候可以将Option视为是在包运行中的一份配置文件即可。
拦截器
在ServerOption中可以设置整个服务需要进行ttls验证,也可以在服务中设置拦截器等等,比如给grpc的server添加拦截器。
type serverOptions struct {
unaryInt UnaryServerInterceptor
chainUnaryInts []UnaryServerInterceptor
}
func ChainUnaryInterceptor(interceptors ...UnaryServerInterceptor) ServerOption {
return newFuncServerOption(func(o *serverOptions) {
o.chainUnaryInts = append(o.chainUnaryInts, interceptors...)
})
}
在添加拦截器的方法中,它要求我们传递UnaryServerInterceptor类型
type UnaryServerInterceptor func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (resp interface{}, err error)
参数介绍:
- ctx : 上下文
- req: 请求参数
- info: rpc方法所有的信息
- handler: 调度的方法本身
在程序中新增两个拦截器
// 错误处理
func ErrHandlerInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
log.Println("err handler status")
resp, err = handler(ctx, req)
log.Println("err handler end err ", err, " req ", req)
return
}
// 日志记录
func LogInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
log.Println("log handler status")
resp, err = handler(ctx, req)
log.Println("log handler end err ", err, " req ", req)
return
}
为了验证前面的分析在grpc.NewServer
方法中增加打印输出ServerOption参数的变化。
func NewServer(opt ...ServerOption) *Server {
opts := defaultServerOptions
fmt.Println("chainUnaryInts start",len(opts.chainUnaryInts))
for _, o := range globalServerOptions {
o.apply(&opts)
}
for _, o := range opt {
o.apply(&opts)
}
fmt.Println("chainUnaryInts end",len(opts.chainUnaryInts))
}
然后启动服务
D:\01.project\01.go\src\imooc.com\practice\grpc\server>go run .
chainUnaryInts start 0
chainUnaryInts end 2
2023/09/04 14:25:12 服务已启动,等待客户端连接...
再启动客户端请求之后的结果
D:\01.project\01.go\src\imooc.com\practice\grpc\server>go run .
chainUnaryInts start 0
chainUnaryInts end 2
2023/09/04 14:25:12 服务已启动,等待客户端连接...
2023/09/04 14:30:58 err handler status
2023/09/04 14:30:58 log handler status
2023/09/04 14:30:58 log handler end err <nil> req id:"1"
2023/09/04 14:30:58 err handler end err <nil> req id:"1"
注意!在往期的版本中grpc是不支持多个拦截器的,在目前新版本中开始支持多个拦截器。具体的实现方式就在下边,通过递归不断嵌套定义的拦截器。
// chainUnaryServerInterceptors chains all unary server interceptors into one.
func chainUnaryServerInterceptors(s *Server) {
// Prepend opts.unaryInt to the chaining interceptors if it exists, since unaryInt will
// be executed before any other chained interceptors.
interceptors := s.opts.chainUnaryInts
if s.opts.unaryInt != nil {
interceptors = append([]UnaryServerInterceptor{s.opts.unaryInt}, s.opts.chainUnaryInts...)
}
var chainedInt UnaryServerInterceptor
if len(interceptors) == 0 {
chainedInt = nil
} else if len(interceptors) == 1 {
chainedInt = interceptors[0]
} else {
chainedInt = chainUnaryInterceptors(interceptors)
}
s.opts.unaryInt = chainedInt
}
func chainUnaryInterceptors(interceptors []UnaryServerInterceptor) UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (interface{}, error) {
return interceptors[0](ctx, req, info, getChainUnaryHandler(interceptors, 0, info, handler))
}
}
func getChainUnaryHandler(interceptors []UnaryServerInterceptor, curr int, info *UnaryServerInfo, finalHandler UnaryHandler) UnaryHandler {
if curr == len(interceptors)-1 {
return finalHandler
}
return func(ctx context.Context, req interface{}) (interface{}, error) {
return interceptors[curr+1](ctx, req, info, getChainUnaryHandler(interceptors, curr+1, info, finalHandler))
}
}
监听与调度
最后可以了解一下服务的监听,最终的主要是在请求的监听和处理,如下是具体的核心代码
func (s *Server) Serve(lis net.Listener) error {
// ...
var tempDelay time.Duration // how long to sleep on accept failure
for {
rawConn, err := lis.Accept()
if err != nil {
if ne, ok := err.(interface {
Temporary() bool
}); ok && ne.Temporary() {
if tempDelay == 0 {
tempDelay = 5 * time.Millisecond
} else {
tempDelay *= 2
}
if max := 1 * time.Second; tempDelay > max {
tempDelay = max
}
s.mu.Lock()
s.printf("Accept error: %v; retrying in %v", err, tempDelay)
s.mu.Unlock()
timer := time.NewTimer(tempDelay)
select {
case <-timer.C:
case <-s.quit.Done():
timer.Stop()
return nil
}
continue
}
// ...
}
tempDelay = 0
// ...
s.serveWG.Add(1)
go func() {
s.handleRawConn(lis.Addr().String(), rawConn)
s.serveWG.Done()
}()
}
}
在代码中分为两段,在监听对象的时候出现如果存在异常的时候程序认为对方繁忙会触发休眠,第一次失败休眠5ms如果继续失败则,在原基础上翻倍,直到上限1s,当日每次休眠结束它都会尝试去访问它。
如果程序执行成功则会将休眠制0,然后开辟一个协程根据不同的listener不同的调用监听模式处理。
grpc的客户端请求发送实现原理分析
初步分析
在通过.proto文件生成的示例代码中除了服务的定义外,封装了对服务的调用方式。
type UserClient interface {
// 定义SayHello方法
GetUser(ctx context.Context, in *GetUserReq, opts ...grpc.CallOption) (*GetUserResp, error)
Ping(ctx context.Context, in *Request, opts ...grpc.CallOption) (*Response, error)
}
type userClient struct {
cc grpc.ClientConnInterface
}
func NewUserClient(cc grpc.ClientConnInterface) UserClient {
return &userClient{cc}
}
func (c *userClient) GetUser(ctx context.Context, in *GetUserReq, opts ...grpc.CallOption) (*GetUserResp, error) {
out := new(GetUserResp)
err := c.cc.Invoke(ctx, User_GetUser_FullMethodName, in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *userClient) Ping(ctx context.Context, in *Request, opts ...grpc.CallOption) (*Response, error) {
out := new(Response)
err := c.cc.Invoke(ctx, User_Ping_FullMethodName, in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
在代码中先定义针对客户端操作的接口对象,并且构建好了调用的实例,再结合我们编写的代码就很容易理解了。
func main() {
client, err := grpc.Dial("127.0.0.1:1234", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatalf("did not connect %v", err)
}
defer client.Close()
c := pb.NewUserClient(client)
r, err := c.GetUser(context.Background(), &pb.GetUserReq{Id: "1"})
if err != nil {
log.Fatalf("req get user err %v", err)
}
log.Println(r)
}
在代码中先初始化grpc获取到客户端,初始化pb类库中的客户端对象将该客户端传递进去,然后调用pb类库中的方法发起rpc调度。
客户端初始化
客户端的初始化与服务端的初始化类似,在请求的参数上定义了Option作为请求参数传递,设置内部程序的运行。
func Dial(target string, opts ...DialOption) (*ClientConn, error) {
return DialContext(context.Background(), target, opts...)
}
接着继续对DialContext进行分析
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
cc := &ClientConn{
target: target,
csMgr: &connectivityStateManager{},
conns: make(map[*addrConn]struct{}),
dopts: defaultDialOptions(),
czData: new(channelzData),
}
// ...
cc.retryThrottler.Store((*retryThrottler)(nil))
cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{nil})
cc.ctx, cc.cancel = context.WithCancel(context.Background())
cc.exitIdleCond = sync.NewCond(&cc.mu)
chainUnaryClientInterceptors(cc)
chainStreamClientInterceptors(cc)
// ...
// Return early for non-blocking dials.
if !cc.dopts.block {
return cc, nil
}
// A blocking dial blocks until the clientConn is ready.
for {
s := cc.GetState()
if s == connectivity.Idle {
cc.Connect()
}
if s == connectivity.Ready {
return cc, nil
} else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure {
if err = cc.connectionError(); err != nil {
terr, ok := err.(interface {
Temporary() bool
})
if ok && !terr.Temporary() {
return nil, err
}
}
}
if !cc.WaitForStateChange(ctx, s) {
// ctx got timeout or canceled.
if err = cc.connectionError(); err != nil && cc.dopts.returnLastError {
return nil, err
}
return nil, ctx.Err()
}
}
}
在DialContext中,要求传递context.Context以及连接的信息,而该方法的主要功能结合上面的核心代码理解主要是
- 初始化clientConn
- 初始化重试机制与拦截器
- 初始化channelz
- 验证连接协议方式并初始化
- 初始化请求超时的控制
- 初始化并解析地址信息
- 与服务建立连接
请求调度
在proto中已经生成好了关于调用的方法,在初始化客户端的时候就会将clientConn传递到userClient的实例中。
func (c *userClient) Ping(ctx context.Context, in *Request, opts ...grpc.CallOption) (*Response, error) {
out := new(Response)
err := c.cc.Invoke(ctx, User_Ping_FullMethodName, in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
再继续跟踪Invoke方法
func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply interface{}, opts ...CallOption) error {
if err := cc.idlenessMgr.onCallBegin(); err != nil {
return err
}
defer cc.idlenessMgr.onCallEnd()
// allow interceptor to see all applicable call options, which means those
// configured as defaults from dial option as well as per-call options
opts = combine(cc.dopts.callOptions, opts)
if cc.dopts.unaryInt != nil {
return cc.dopts.unaryInt(ctx, method, args, reply, cc, invoke, opts...)
}
return invoke(ctx, method, args, reply, cc, opts...)
}
func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)
if err != nil {
return err
}
if err := cs.SendMsg(req); err != nil {
return err
}
return cs.RecvMsg(reply)
}
在请求调度中可以明显看出主要分为三块内容:
- newClientStream: 获取请求客户端的stream,在其内部封装了ClientStream、负载均衡、超时、编码、stream等操作。
- cs.SendMsg(req): 发送rpc请求,但不会返回输出结果
- cs.RecvMsg: 同步等待接收到Rpc方法的响应结果
获取连接与关闭
对newClientStream的代码跟踪,在代码内部是去获取一个用于rpc调度的连接。
var newStream = func(ctx context.Context, done func()) (iresolver.ClientStream, error) {
return newClientStreamWithParams(ctx, desc, cc, method, mc, onCommit, done, opts...)
}
接下来我们看看grpc的close方法
func (cc *ClientConn) Close() error {
defer cc.cancel()
cc.mu.Lock()
if cc.conns == nil {
cc.mu.Unlock()
return ErrClientConnClosing
}
for cc.idlenessState == ccIdlenessStateExitingIdle {
cc.exitIdleCond.Wait()
}
conns := cc.conns
cc.conns = nil
cc.csMgr.updateState(connectivity.Shutdown)
pWrapper := cc.blockingpicker
rWrapper := cc.resolverWrapper
bWrapper := cc.balancerWrapper
idlenessMgr := cc.idlenessMgr
cc.mu.Unlock()
// The order of closing matters here since the balancer wrapper assumes the
// picker is closed before it is closed.
if pWrapper != nil {
pWrapper.close()
}
if bWrapper != nil {
bWrapper.close()
}
if rWrapper != nil {
rWrapper.close()
}
if idlenessMgr != nil {
idlenessMgr.close()
}
for ac := range conns {
ac.tearDown(ErrClientConnClosing)
}
cc.addTraceEvent("deleted")
// TraceEvent needs to be called before RemoveEntry, as TraceEvent may add
// trace reference to the entity being deleted, and thus prevent it from being
// deleted right away.
channelz.RemoveEntry(cc.channelzID)
return nil
}
在这个方法中,会关闭所有底层传输。
- context/cancel
- 清空并关闭客户端/解析器/负载均衡的连接
- 添加跟踪引用
- 移除当前通道信息
小结
根据源码的阅读我们可以了解到在grpc中:
- 调用grpc.Dial会去连接服务端:而连接的状态会设置为等待接收请求的状态
- 在创建了clientConn后,如果不进行close那么是会造成内存泄漏的问题【在常驻进出的情况下】
- 在grpc接收请求的时候,会在连接失败的时候触发休眠的情况。
- 在目前的版本中grpc支持多个拦截器,但需注意早期的grpc版本是不支持多个拦截器的