1、grpc重试策略
RPC 调用失败可以分为三种情况:
1、RPC 请求还没有离开客户端;
2、RPC 请求到达服务器,但是服务器的应用逻辑还没有处理该请求;
3、服务器应用逻辑开始处理请求,并且处理失败;
最后一种情况是通过 server 配置的重试策略来处理的,是本文主要讲解的内容。而对于前两种情况,
gRPC 客户端会自动重试,与重试策略的配置并没有太大关系。因为这两种情况,服务端的逻辑并没有开始处理请
求,所以始终可以重试,也被称为透明重试 。
对于第一种情况,因为RPC没有离开客户端,所以可以一直重试,直到成功或者直到RPC的截止时间为止。
对于第二种情况,虽然RPC 到达了服务端,但是应用逻辑并没有处理请求,所以,客户端会立即重试一次,如果
再次失败, RPC 将根据配置的重试策略来进行处理。
注意,这种情况可能会增加链路上的负载。
下文介绍的重试限流只是为了防止服务器的应用逻辑服务过载,而这些重试并且不会进入应用逻辑层,所以他们不
会把他们算作失败,同样透明重试也不会受到重试配置 maxAttempts
的限制。
gRPC 的重试策略有两种,分别是重试(retryPolicy
)和对冲(hedging
),一个RPC方法只能配置一种重试策略。
下面将演示重试策略的使用。
1.1 重试策略
此示例显示了如何在gRPC客户端上启用和配置重试。
1.1.1 proto的编写和编译
syntax = "proto3";
option go_package = "./;echo";
package echo;
message EchoRequest {
string message = 1;
}
message EchoResponse {
string message = 1;
}
service Echo {
rpc UnaryEcho(EchoRequest) returns (EchoResponse) {}
}
$ protoc -I . --go_out=plugins=grpc:. ./echo.proto
1.1.2 服务端
package main
import (
"context"
pb "demo/pb"
"flag"
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"log"
"net"
"sync"
)
var port = flag.Int("port", 50052, "port number")
type failingServer struct {
pb.UnimplementedEchoServer
mu sync.Mutex
reqCounter uint
reqModulo uint
}
// 此方法将使reqModulo-1次RPC失败,并返回状态代码不可用,并在reqModulo次数上成功RPC
func (s *failingServer) maybeFailRequest() error {
s.mu.Lock()
defer s.mu.Unlock()
s.reqCounter++
if (s.reqModulo > 0) && (s.reqCounter%s.reqModulo == 0) {
return nil
}
return status.Errorf(codes.Unavailable, "maybeFailRequest: failing it")
}
func (s *failingServer) UnaryEcho(ctx context.Context, req *pb.EchoRequest) (*pb.EchoResponse, error) {
if err := s.maybeFailRequest(); err != nil {
log.Println("request failed count:", s.reqCounter)
return nil, err
}
log.Println("request succeeded count:", s.reqCounter)
return &pb.EchoResponse{Message: req.Message}, nil
}
func main() {
flag.Parse()
address := fmt.Sprintf(":%v", *port)
lis, err := net.Listen("tcp", address)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
fmt.Println("listen on address", address)
s := grpc.NewServer()
// 将服务器配置为每四次通过一次RPC;将客户端配置为进行四次尝试
failingservice := &failingServer{
reqCounter: 0,
reqModulo: 4,
}
pb.RegisterEchoServer(s, failingservice)
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
1.1.3 客户端
package main
import (
"context"
pb "demo/pb"
"flag"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"log"
"time"
)
var (
addr = flag.String("addr", "localhost:50052", "the address to connect to")
// see https://github.com/grpc/grpc/blob/master/doc/service_config.md to know more about service config
retryPolicy = `{
"methodConfig": [{
"name": [{"service": "grpc.examples.echo.Echo"}],
"waitForReady": true,
"retryPolicy": {
"MaxAttempts": 4,
"InitialBackoff": ".01s",
"MaxBackoff": ".01s",
"BackoffMultiplier": 1.0,
"RetryableStatusCodes": [ "UNAVAILABLE" ]
}
}]}`
)
// 使用grpc.WithDefaultServiceConfig()设置服务配置
func retryDial() (*grpc.ClientConn, error) {
return grpc.Dial(*addr, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(retryPolicy))
}
func main() {
flag.Parse()
// Set up a connection to the server.
conn, err := retryDial()
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer func() {
if e := conn.Close(); e != nil {
log.Printf("failed to close connection: %s", e)
}
}()
c := pb.NewEchoClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
reply, err := c.UnaryEcho(ctx, &pb.EchoRequest{Message: "Try and Success"})
if err != nil {
log.Fatalf("UnaryEcho error: %v", err)
}
log.Printf("UnaryEcho reply: %v", reply)
}
1.1.4 测试
[root@zsx demo]# go run server/server.go
listen on address :50052
2023/02/27 18:31:43 request failed count: 1
2023/02/27 18:31:46 request failed count: 2
2023/02/27 18:31:51 request failed count: 3
2023/02/27 18:31:53 request succeeded count: 4
[root@zsx demo]# go run client/client.go
2023/02/27 18:31:43 UnaryEcho error: rpc error: code = Unavailable desc = maybeFailRequest: failing it
exit status 1
[root@zsx demo]# go run client/client.go
2023/02/27 18:31:46 UnaryEcho error: rpc error: code = Unavailable desc = maybeFailRequest: failing it
exit status 1
[root@zsx demo]# go run client/client.go
2023/02/27 18:31:51 UnaryEcho error: rpc error: code = Unavailable desc = maybeFailRequest: failing it
exit status 1
[root@zsx demo]# go run client/client.go
2023/02/27 18:31:53 UnaryEcho reply: message:"Try and Success"
# 项目结构
[root@zsx protoc]# tree demo/
demo/
├── client
│ └── client.go
├── go.mod
├── go.sum
├── pb
│ ├── echo.pb.go
│ └── echo.proto
└── server
└── server.go
3 directories, 6 files
1.1.5 用于配置gRPC重试策略的选项
重试是通过服务配置启用的,服务配置可以由名称解析器提供。在上面的配置中,我们为
grpc.example.echo.Echo
方法设置了重试策略。
下表描述了用于配置 gRPC 重试策略的选项:
选项 | 描述 |
---|---|
MaxAttempts | 最大调用尝试次数,包括原始尝试。 此值受 GrpcChannelOptions.MaxRetryAttempts (默认值为 5)的限制。 必须为该选项提供值,且值必须大于 1。 |
InitialBackoff | 重试尝试之间的初始退避延迟。 介于 0 与当前退避之间的随机延迟确定何时进行下一次重试尝试。 每次尝试后,当前退避将乘以 BackoffMultiplier 。 必须为该选项提供值,且值必须大于 0。 |
MaxBackoff | 最大退避会限制指数退避增长的上限。 必须为该选项提供值,且值必须大于 0。 |
BackoffMultiplier | 每次重试尝试后,退避将乘以该值,并将在乘数大于 1 的情况下以指数方式增加。 必须为该选项提供值,且值必须大于 0。 |
RetryableStatusCodes | 状态代码的集合。 具有匹配状态的失败 gRPC 调用将自动重试。 有关状态代码的更多信息,请参阅https://grpc.github.io/grpc/core/md_doc_statuscodes.html 。 至少需要提供一个可重试的状态代码。 |
1、最大重试次数 maxAttempts
指定一次RPC 调用中最多的请求次数,包括第一次请求。如果设置了调用的过期
时间,那么到了过期时间,无论重试情况如果都会返回超时错误 DeadlineExceeded
。
2、指数退避在进行下一次重试请求前,会计算需要等待的时间:
- 第一次重试间隔是
random(0, initialBackoff)
- 第 n 次的重试间隔为
random(0, min( initialBackoff*backoffMultiplier**(n-1) , maxBackoff))
重试状态码 retryableStatusCode
:
3、当 RPC 调用返回非 OK 响应,会根据 retryableStatusCode 来判断是否进行重试。
通常,只有表明服务逻辑没有处理请求的状态码才应该进行重试,如果服务提供了幂等或者可以安全的多次请求
时,那么就可以指定更详细的参数。
比如,删除资源的 RPC 调用失败,并返回了 INTERNAL错误码,那么可能在返回错误前就已经删除了资源
如果该方法是幂等的,那么进行重试就没什么问题,否则,重试就可能会导致一些异常问题。
4、retryPolicy
参数要求:
-
maxAttempts 必须是大于 1 的整数,对于大于5的值会被视为5。
-
initialBackoff 和 maxBackoff 必须指定,并且必须具有大于0。
-
backoffMultiplier 必须指定,并且大于零。
-
retryableStatusCodes 必须制定为状态码的数据,不能为空,并且状态码必须是有效的 gPRC 状态码,可以
是整数形式,并且不区分大小写 (
[14], ["UNAVAILABLE"], ["unavailable"]
)。
1.1.6 Hedging
Hedging 是一种备选重试策略。 Hedging 允许在不等待响应的情况下,主动发送单个 gRPC 调用的多个副本。
Hedged gRPC 调用可以在服务器上执行多次,并使用第一个成功的结果。 重要的是,务必仅针对可安全执行多
次且不会造成负面影响的方法启用 hedging。
与重试相比,Hedging 具有以下优缺点:
-
Hedging 的优点是,它可能会更快地返回成功的结果。 它允许同时进行多个 gRPC 调用,并在出现第一个成
功的结果时完成。
-
Hedging 的一个缺点是它可能会造成浪费。 进行了多个调用并且这些调用全部成功。 而仅使用第一个结果,
并放弃其余结果。
HedgingPolicy 配置:
// RPC 调用最多发送4次请求,每次间隔0.5s
// 如果没有指定hedgingDelay或者为"0s"的话,就同时发送四个请求
"hedgingPolicy":{
"maxAttempts": 4,
"hedgingDelay": "0.5s",
"nonFatalStatusCodes":[
"UNAVAILABLE",
"INTERNAL",
"ABORTED"
]
}
下表描述了用于配置 gRPC hedging 策略的选项:
选项 | 描述 |
---|---|
MaxAttempts | Hedging 策略将发送的调用数量上限。 MaxAttempts 表示所有尝试的总数,包括原始尝试。 此值受 GrpcChannelOptions.MaxRetryAttempts (默认值为 5)的限制。 必须为该选项提供值,且值必须大于 2。 |
HedgingDelay | 第一次调用立即发送,而后续 hedging 调用按该值延迟发送。 如果延迟设置为零或 null ,那么所有所有 hedged 调用都将立即发送。 HedgingDelay 为可选,默认值为零。 值必须为零或更大。 |
NonFatalStatusCodes | 指示其他 hedge 调用仍可能会成功的状态代码集合。 如果服务器返回非致命状态代码,hedged 调用将继续。 否则,将取消未完成的请求,并将错误返回到应用。 有关状态代码的更多信息,请参阅https://grpc.github.io/grpc/core/md_doc_statuscodes.html 。 |
1.1.7 重试限流
当客户端的失败和成功比超过某个阈值时,gRPC 会通过禁用这些重试策略来防止由于重试导致服务器过载。
service 配置:
"retryThrottling":{
"maxTokens": 10,
"tokenRatio": 0.1
}
重试限流是根据服务器来设置的,而不是针对方法或者服务。
对于每一个服务器,gRPC 客户端会维护一个 token_count
变量,最初设置为 maxToken
,值的范围是
0 - maxToken
。
对于每个 RPC 请求都会对 token_count
产生一下效果:
- 每个失败的 RPC 请求都会递减token_count 1
- 成功 RPC 将会递增 token_count tokenRatio
需要注意这里的失败 RPC 是指返回的状态码符合retryableStatusCodes,nonFatalStatusCodes或者服务器回推
通知不在重试的RPC。
如果 token_count <= ( maxTokens / 2)
,则关闭重试策略,直到 token_count > (maxTokens/2)
,恢复
重试。
对于对冲 RPC,发送第一个RPC请求后,如果 token_count > (maxTokens/2)
,才会发送后续的对冲请求。
当 token_count <= ( maxTokens / 2)
时,重试请求会被取消,并且将状态码返回给调用者。
验证:
- maxTokens 必须制定,并且必须在(0, 1000] 范围
- tokenRatio 必须,并且必须大于0的浮点数,超过三位的小数会被忽略
1.2 重试策略案例
1.2.1 proto编写和编译
syntax = "proto3";
package pb;
option go_package = "./;pb";
service Greeter {
rpc SayHello (HelloRequest) returns (HelloReply) {}
}
message HelloRequest {
string name = 1;
}
message HelloReply {
string message = 1;
}
$ protoc -I . --go_out=plugins=grpc:. ./helloword.proto
1.2.2 服务端
package main
import (
"context"
pb "demo/pb"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"log"
"net"
)
const (
port = ":50051"
)
type server struct {
failCount int
pb.UnimplementedGreeterServer
}
// 该函数定义必须与helloworld.pb.go定义的SayHello一致
func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
//打印客户端传入HelloRequest请求的Name参数
log.Printf("Received: %v", in.GetName())
// 前四次调用服务端返回错误
if s.failCount <= 3 {
s.failCount++
return nil, status.Errorf(codes.Unavailable, "test fail")
}
//将name参数作为返回值,返回给客户端
return &pb.HelloReply{Message: "Hello " + in.GetName()}, nil
}
// main方法函数开始执行的地方
func main() {
// 调用标准库,监听50051端口的tcp连接
lis, err := net.Listen("tcp", port)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
//创建grpc服务
s := grpc.NewServer()
//将server对象,也就是实现SayHello方法的对象,与grpc服务绑定
pb.RegisterGreeterServer(s, &server{})
// grpc服务开始接收访问50051端口的tcp连接数据
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
1.2.3 客户端
package main
import (
"context"
pb "demo/pb"
"google.golang.org/grpc"
"log"
"time"
)
const (
address = "localhost:50051"
)
var (
retryPolicy = `{
"RetryThrottling": {
"MaxTokens": 4,
"TokenRatio": 0.1
},
"MethodConfig": [{
"Name": [{"Service": "Greeter"}],
"RetryPolicy": {
"MaxAttempts": 6,
"InitialBackoff": "2s",
"MaxBackoff": "10s",
"BackoffMultiplier": 1.0,
"RetryableStatusCodes": [ "UNAVAILABLE" ]
}
}]}`
)
// "Service": "" 表示全局应用
func main() {
// 访问服务端address,创建连接conn
conn, err := grpc.Dial(address, grpc.WithInsecure(), grpc.WithBlock(), grpc.WithDefaultServiceConfig(retryPolicy))
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
c := pb.NewGreeterClient(conn)
// 设置客户端访问超时时间1秒
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Second)
defer cancel()
// 客户端调用服务端 SayHello 请求,传入Name 为 "world", 返回值为服务端返回参数
r, err := c.SayHello(ctx, &pb.HelloRequest{Name: "world"})
if err != nil {
log.Fatalf("could not greet: %v", err)
}
// 根据服务端处理逻辑,返回值也为"world"
log.Printf("Greeting: %s", r.GetMessage())
}
1.2.4 测试
[root@zsx demo]# go run server/server.go
2023/02/27 21:05:12 Received: world
2023/02/27 21:05:14 Received: world
2023/02/27 21:05:15 Received: world
2023/02/27 21:05:16 Received: world
2023/02/27 21:05:20 Received: world
[root@zsx demo]# go run client/client.go
2023/02/27 21:05:12 could not greet: rpc error: code = Unavailable desc = test fail
exit status 1
[root@zsx demo]# go run client/client.go
2023/02/27 21:05:14 could not greet: rpc error: code = Unavailable desc = test fail
exit status 1
[root@zsx demo]# go run client/client.go
2023/02/27 21:05:15 could not greet: rpc error: code = Unavailable desc = test fail
exit status 1
[root@zsx demo]# go run client/client.go
2023/02/27 21:05:16 could not greet: rpc error: code = Unavailable desc = test fail
exit status 1
[root@zsx demo]# go run client/client.go
2023/02/27 21:05:20 Greeting: Hello world
# 项目结构
[root@zsx protoc]# tree demo/
demo/
├── client
│ └── client.go
├── go.mod
├── go.sum
├── pb
│ ├── helloword.pb.go
│ └── helloword.proto
└── server
└── server.go
3 directories, 6 files
参考地址:https://github.com/grpc/proposal/blob/master/A6-client-retries.md