1.负载均衡策略
假设有多个服务实例,而每个实例都提供相同的功能,为了提高整个系统的吞吐量,每个实例部署在不同的机器上。客户端可以选择任意一个实例进行调用,获取想要的结果。那如何选择呢?取决于负载均衡的策略。
- 随机选择策略 - 从服务列表中随机选择一个。
- 轮询算法(Round Robin) - 依次调度不同的服务器,每次调度执行 i = (i + 1) mode n。
- 加权轮询(Weight Round Robin) - 在轮询算法的基础上,为每个服务实例设置一个权重,高性能的机器赋予更高的权重,也可以根据服务实例的当前的负载情况做动态的调整,例如考虑最近3分钟部署服务器的 CPU、内存消耗情况。
2.服务发现
需要负载均衡那就需要有多个提供相同功能的服务实例。那服务发现是什么意思呢?现在我们有多个提供相同功能的服务实例,那客户端要获取该服务的地址,就需要服务中心返回一个地址给客戶端。这个就是服务发现。
那我们先实现一个基础的服务发现模块 Discovery(定义成interface接口类型)。
我们定义两个类型:
- SelectMode 代表不同的负载均衡策略,简单起见,这里仅实现 Random 和 RoundRobin 两种策略。
- Discovery 是一个接口类型,包含了服务发现所需要的最基本的接口。
- Refresh() 从注册中心更新服务列表
- Update(servers []string) 手动更新服务列表
- Get(mode SelectMode) 根据负载均衡策略,选择一个服务实例
- GetAll() 返回所有的服务实例
代码在xclient文件夹中。
//discovery.go
type SelectMode int
const (
RandomSelect SelectMode = iota
RoundRobinSelect
)
type Discovery interface {
Refresh() error
Update(servers []string) error
Get(mode SelectMode) (string, error)
GetAll() ([]string, error)
}
紧接着,我们实现一个不需要注册中心,服务列表由手工维护的服务发现的结构体:MultiServersDiscovery。
可以用编码的方式在客户端中配置服务的地址,服务器不需要进行更多的配置,如果添加或删除了某些服务,可以调用MultipleServersDiscovery.Update
来动态更新服务。
客户端使用NewMultipleServersDiscovery方法
设置该服务的网络和地址。
//discovery.go
type MultiServerDiscovery struct {
rwMutex sync.RWMutex //protect following,即是保护servers,index
servers []string
index int
}
//使用例子: NewMultiServerDiscovery([]string{"tcp@127.0.0.1:100","tcp@127.0.0.1:2100"})
func NewMultiServerDiscovery(servers []string) *MultiServerDiscovery {
d := &MultiServerDiscovery{
servers: servers,
}
d.index = rand.Intn(math.MaxInt32 - 1)
return d
}
- index 记录 Round Robin 算法已经轮询到的位置,为了避免每次从 0 开始,初始化时随机设定一个值。
然后,实现 Discovery 接口
var _ Discovery = (*MultiServerDiscovery)(nil)
func (d *MultiServerDiscovery) Refresh() error {
return nil
}
func (d *MultiServerDiscovery) Update(servers []string) error {
d.rwMutex.Lock()
defer d.rwMutex.Unlock()
d.servers = servers
return nil
}
func (d *MultiServerDiscovery) Get(mode SelectMode) (string, error) {
//这里不能用d.rwMutex.RLock(),因为d.index有更新
d.rwMutex.Lock()
defer d.rwMutex.Unlock()
n := len(d.servers)
if n == 0 {
return "", errors.New("rpc discovery: no available servers")
}
switch mode {
case RandomSelect:
return d.servers[rand.Intn(n)], nil
case RoundRobinSelect:
s := d.servers[d.index%n]
d.index = (d.index + 1) % n
return s, nil
default:
return "", errors.New("rpc discovery: not supported select mode")
}
}
func (d *MultiServerDiscovery) GetAll() ([]string, error) {
d.rwMutex.RLock()
defer d.rwMutex.RUnlock()
// return a copy of d.servers
servers := make([]string, len(d.servers))
copy(servers, d.servers)
return servers, nil
}
3.为什么选择客户端负载均衡
RPC client 和 server 建立是长连接, 因而基于连接的负载均衡没有太大意义, 所以 该RPC 负载均衡是基于每次调用。也就是客户在同一个 client 发的请求希望它被负载均衡到所有服务端。
一般来说负载均衡器是独立的, 被放置在服务消费者和提供者之间. 代理通常需要保存请求响应副本, 因此有性能消耗也会造成额外延迟. 当请求量大时, lb (load balance)可能会变成瓶颈, 并且此时 lb 单点故障会影响整个服务。
客户端负载将lb 的功能集成到客户端进程里,然后使用负载均衡策略选择一个目标服务地址,向目标服务发起请求。LB能力被分散到每一个服务消费者的进程内部,同时服务消费方和服务提供方之间是直接调用,没有额外开销,性能比较好。
但用客户端负载均衡也有坏处, 若有多种不同的语言栈,就要配合开发多种不同的客户端,有一定的研发和维护成本。
4.支持负载均衡的客户端
之前对外使用的客户端是Dail(...),这里我们也要向用户暴露一个支持负载均衡的客户端,叫做 XClient。
//xclient.go
type XClient struct {
d Discovery
mode SelectMode
opt *geerpc.Option
mutex sync.Mutex
clients map[string]*geerpc.Client
}
func NewXClient(d Discovery, mode SelectMode, opt *geerpc.Option) *XClient {
return &XClient{
d: d,
mode: mode,
opt: opt,
clients: make(map[string]*geerpc.Client),
}
}
func (xc *XClient) Close() error {
xc.mutex.Lock()
defer xc.mutex.Unlock()
for key, client := range xc.clients {
//只是关闭,没有其他的对错误的处理
client.Close()
delete(xc.clients, key)
}
return nil
}
XClient 的构造函数需要传入三个参数,服务发现实例 Discovery、负载均衡模式 SelectMode 以及协议选项 Option。为了尽量地复用已经创建好的 Socket 连接,使用 clients 保存创建成功的 Client 实例,并提供 Close 方法在结束后,关闭已经建立的连接。
我们之前是使用dial函数来创建一个客户端,那我们为了可以复用已经创建好的socket连接,这里我们也实现一个dial函数,在内部复用socket连接。
func (xc *XClient) dial(rpcAddr string) (*geerpc.Client, error) {
xc.mutex.Lock()
defer xc.mutex.Unlock()
client, ok := xc.clients[rpcAddr]
if ok && !client.IsAvailable() {
client.Close()
delete(xc.clients, rpcAddr)
client = nil
}
if client == nil {
var err error
client, err = geerpc.XDial(rpcAddr, xc.opt)
if err != nil {
return nil, err
}
xc.clients[rpcAddr] = client
}
return client, nil
}
func (xc *XClient) call(rpcAddr string, ctx context.Context, serviceMethod string, args, reply interface{}) error {
//获取sokcet连接(复用)
client, err := xc.dial(rpcAddr)
if err != nil {
return err
}
return client.Call(ctx, serviceMethod, args, reply)
}
// serviceMethod 例子:"Foo.SUM"
func (xc *XClient) Call(ctx context.Context, serviceMethod string, args, reply any) error {
//通过负载均衡策略得到服务实例
rpcAddr, err := xc.d.Get(xc.mode)
if err != nil {
return err
}
return xc.call(rpcAddr, ctx, serviceMethod, args, reply)
}
之后实现一个调用服务的方法Call。该方法主要是三步:
- 通过负载均衡策略得到服务实例
- 获取sokcet连接(复用)
- 最终调用client.Call去发送服务请求
另外,我们为 XClient 添加一个常用功能:Broadcast。
Broadcast
表示向所有服务器发送请求,只有所有服务器正确返回时才会成功。
Broadcast
是 XClient
的一个方法, 你可以将一个请求发送到这个服务的所有节点。
如果所有的节点都正常返回,没有错误的话, Broadcast
将返回其中的一个节点的返回结果。 如果有节点返回错误的话,将返回这些错误信息中的一个。
func (xc *XClient) Broadcast(ctx context.Context, serviceMethod string, args, reply any) error {
//获取所有的服务实例
servers, err := xc.d.GetAll()
if err != nil {
return err
}
var wg sync.WaitGroup
var mutex sync.Mutex //protect e and replyDone
var e error
replyDone := reply == nil
ctx, cancel := context.WithCancel(ctx)
defer cancel()
for _, rpcAddr := range servers {
wg.Add(1)
//fmt.Printf("rpcAddrstring addr: %p\n", &rpcAddr) //其rpcAddr的地址都是一样的
go func(rpcAddr string) {
defer wg.Done()
var clonedReply any
if reply != nil {
//reply是指针的,所以需要使用Elem()
clonedReply = reflect.New(reflect.ValueOf(reply).Elem().Type()).Interface()
}
//xc.call方法中的参数clonedReply不能使用reply
err := xc.call(rpcAddr, ctx, serviceMethod, args, clonedReply)
mutex.Lock()
defer mutex.Unlock()
if err != nil && e == nil {//e==nil表明e还没有被赋值
e = err
cancel() // if any call failed, cancel unfinished calls
}
if err == nil && !replyDone {
reflect.ValueOf(reply).Elem().Set(reflect.ValueOf(clonedReply).Elem())
replyDone = true
}
}(rpcAddr)
}
wg.Wait()
return e
}
// //另一种写法,go协程中没有参数
// func (xc *XClient) Broadcast(ctx context.Context, serviceMethod string, args, reply any) error {
// .............
// for _, rpcAddr := range servers {
// wg.Add(1)
// //fmt.Printf("rpcAddrstring addr: %p\n", &rpcAddr) //其rpcAddr的地址都是一样的
// addr:=rpcAddr
// go func() {
// defer wg.Done()
// err := xc.call(addr, ctx, serviceMethod, args, clonedReply)
// ......
// }()
// }
// ................
// }
需要注意的几点:
- 为了提升性能,请求是并发的。而要等待协程去访问服务实例结束,所以可以使用sync.WaitGroup来阻塞等待。
- 并发情况下,xc.call中不能使用reply入参,需要每个协程都有自己的clonedReply参数,不然就需要用锁来控制reply,这就不值得了。每个协程都有自己的clonedReply,获得结果后,再把clonedReply赋值给reply就行,这样就只需要使用互斥锁保证 error 和 reply 能被正确赋值即可。
- 借助 context.WithCancel 确保有错误发生时,快速失败。
5.测试
首先,启动 RPC 服务的代码还是类似的,Sum 是正常的方法,Sleep 用于验证 XClient 的超时机制能否正常运作。
type My int
type Args struct{ Num1, Num2 int }
func (m *My) Sum(args Args, reply *int) error {
*reply = args.Num1 + args.Num2
return nil
}
func (m *My) Sleep(args Args, reply *int) error {
time.Sleep(time.Second * time.Duration(args.Num1))
*reply = args.Num1 + args.Num2
return nil
}
接着,有两个函数,clientCall调用单个服务实例,broadcast 调用所有服务实例。
这两个函数代码也是基本相似的,主要不同就是协程函数内的操作不同。
// 调用单个服务实例
func clientCall(addr1, addr2 string) {
d := xclient.NewMultiServerDiscovery([]string{"tcp@" + addr1, "tcp@" + addr2})
xc := xclient.NewXClient(d, xclient.RandomSelect, nil)
defer xc.Close()
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
var reply int = 1324
if err := xc.Call(context.Background(), "My.Sum", &Args{Num1: i, Num2: i * i}, &reply); err != nil {
log.Println("call Foo.Sum error:", err)
}
fmt.Println("reply: ", reply)
}(i)
}
wg.Wait()
}
func broadcast(addr1, addr2 string) {
d := xclient.NewMultiServerDiscovery([]string{"tcp@" + addr1, "tcp@" + addr2})
xc := xclient.NewXClient(d, xclient.RandomSelect, nil)
defer xc.Close()
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
var reply int = 1324
if err := xc.Broadcast(context.Background(), "My.Sum", &Args{Num1: i, Num2: i * i}, &reply); err != nil {
fmt.Println("Broadcast call Foo.Sum error:", err)
}
fmt.Println("Broadcast reply: ", reply)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
defer cancel()
var replyTimeout int = 1324
if err := xc.Broadcast(ctx, "My.Sleep", &Args{Num1: i, Num2: i * i}, &replyTimeout); err != nil {
fmt.Println("Broadcast call Foo.Sum error:", err)
}
fmt.Println("timeout Broadcast reply: ", replyTimeout)
}(i)
}
wg.Wait()
}
func main() {
ch1 := make(chan string)
ch2 := make(chan string)
//start two servers
go startServer(ch1)
go startServer(ch2)
addr1 := <-ch1
addr2 := <-ch2
time.Sleep(time.Second)
clientCall(addr1, addr2)
broadcast(addr1, addr2)
}
效果
完整代码:https://github.com/liwook/Go-projects/tree/main/geerpc/6-load-balance