RPC教程 7.服务发现与注册中心

0.前言

这一节的内容只能解决只有一个服务的情况。要是有多个服务(即是多个结构体)这种就解决不了,也即是没有服务ip地址和服务实例的映射关系。

1.为什么需要注册中心

在上一节中,客户端想要找到服务实例的ip,需要硬编码把ip写到代码中。这时可能会出问题,要是该服务实例ip改变了呢,该服务实例下线宕机了呢?这时如何是好。

// 调用单个服务实例
func clientCall(addr1, addr2 string) {
	d := xclient.NewMultiServerDiscovery([]string{"tcp@" + addr1, "tcp@" + addr2})
	xc := xclient.NewXClient(d, xclient.RandomSelect, nil)
	defer xc.Close()
	//省略其他......
}

这时,注册中心的重要性就出来了。

注册中心主要有三种角色:

  • 服务提供者(RPC Server):在启动时,向 Registry 注册自身服务,并向 Registry 定期发送心跳汇报存活状态。
  • 服务消费者(RPC Client):在启动时,向 Registry 订阅服务,把 Registry 返回的服务节点列表缓存在本地内存中,并与 RPC Sever 建立连接。
  • 服务注册中心(Registry):用于保存 RPC Server 的注册信息,当 RPC Server 节点发生变更时,Registry 会同步变更,RPC Client 感知后会刷新本地 内存中缓存的服务节点列表。

最后,RPC Client 从本地缓存的服务节点列表中,基于负载均衡算法选择一台 RPC Sever 发起调用。

 当然注册中心的功能还有很多,比如配置的动态同步、通知机制等。比较常用的注册中心有 etcd、zookeeper、consul,一般比较出名的微服务或者 RPC 框架,这些主流的注册中心都是支持的。

2.Gee Registry

主流的注册中心 etcd、zookeeper 等功能强大,与这类注册中心的对接代码量是比较大的,需要实现的接口也很多。所以这里我们选择自己实现一个简单的支持心跳保活的注册中心。

GeeRegistry 的代码独立放置在子目录 registry 中。

首先定义 GeeRegistry 结构体,默认超时时间设置为 5 min,也就是说,超过5min没有收到该注册的服务的心跳,即视其为不可用状态。

//registry.go
type ServerItem struct {
	Addr  string
	start time.Time //用于心跳时间计算
}

// GeeRegistry is a simple register center
type GeeRegistry struct {
	timeout time.Duration
	mutex   sync.Mutex //protcect servers
	servers map[string]*ServerItem
}

const (
	defaultPath    = "/_rpc_/registry"
	defaultTimeout = time.Minute * 5
)

func New(timeout time.Duration) *GeeRegistry {
	return &GeeRegistry{
		servers: make(map[string]*ServerItem),
		timeout: timeout,
	}
}

var DefalultGeeRegister = New(defaultTimeout)

然后,为 GeeRegistry 实现添加服务实例和返回服务列表的方法。

  • putServer:添加服务实例,如果服务已经存在,则更新 start。
  • aliveServers:返回可用的服务列表,如果存在超时的服务,则删除。
func (r *GeeRegistry) putServer(addr string) {
	r.mutex.Lock()
	defer r.mutex.Unlock()

	s := r.servers[addr]
	if s == nil {
		r.servers[addr] = &ServerItem{Addr: addr, start: time.Now()}
	} else {
		s.start = time.Now() // if exists, update start time to keep alive
	}
}

func (r *GeeRegistry) aliveServers() []string {
	r.mutex.Lock()
	defer r.mutex.Unlock()

	var alive []string
	for addr, s := range r.servers {
		if r.timeout == 0 || s.start.Add(r.timeout).After(time.Now()) {
			alive = append(alive, addr)
		} else {
			delete(r.servers, addr)
		}
	}
	sort.Strings(alive)
	return alive
}

 为了简单,那么rpc客户端通过HTTP去访问注册中心,且所有的有用信息都承载在 HTTP Header 中。

  • Get:返回所有可用的服务列表,通过自定义字段 X-rpc-Servers 承载。
  • Post:添加服务实例或发送心跳,通过自定义字段 X-rpc-Server 承载。
func (r *GeeRegistry) ServeHTTP(w http.ResponseWriter, req *http.Request) {
	switch req.Method {
	case "GET":
		w.Header().Set("X-rpc-Servers", strings.Join(r.aliveServers(), ","))
	case "POST":
		addr := req.Header.Get("X-rpc-Servers")
		if addr == "" {
			w.WriteHeader(http.StatusInternalServerError)
			return
		}
		r.putServer(addr) //更新保存在注册中心的服务实例
	default:
		w.WriteHeader(http.StatusMethodNotAllowed)
	}
}

func (r *GeeRegistry) HandleHTTP(registryPath string) {
	http.Handle(registryPath, r)
}

func HandleHTTP() {
	DefalultGeeRegister.HandleHTTP(defaultPath)
}

另外,也要提供 Heartbeat 方法,便于服务启动时定时向注册中心发送心跳(也是通过HTTP),默认周期比注册中心设置的过期时间少 1 min。

// only send once
func sendHeartbeat(registryURL, addr string) error {
	httpClient := &http.Client{Timeout: time.Second * 10}
	req, _ := http.NewRequest("POST", registryURL, nil)
	req.Header.Set("X-rpc-Servers", addr)
	resp, err := httpClient.Do(req)
	if err != nil {
		fmt.Println("rpc server: heart beat err:", err)
		return err
	}
	defer resp.Body.Close()
	return nil
}

// Heartbeat send a heartbeat message every once in a while
func Heartbeat(registryURL, addr string, duration time.Duration) {
	if duration == 0 {
		duration = defaultTimeout - time.Duration(1)*time.Minute
	}

	err := sendHeartbeat(registryURL, addr)
	go func() {
		//创建一个定时器
		t := time.NewTicker(duration)
		for err == nil {
			<-t.C
			err = sendHeartbeat(registryURL, addr)
		}
	}()
}

3.需要注册中心的服务发现

上一节我们实现了一个不需要注册中心,服务列表由手工维护的服务发现的结构体MultiServersDiscovery。

而现在我们实现了注册中心,那这一节的服务发现就可以继承上一节的,并添加与注册中心相关的细节

type GeeRegistryDiscovery struct {
	*MultiServerDiscovery
	registryAddr string
	timeout      time.Duration //服务列表的过期时间
	lastUpdate   time.Time
}

const defaultUpdateTimeout = time.Second * 10

func NewGeeRegistryDiscovery(registerAddr string, timeout time.Duration) *GeeRegistryDiscovery {
	if timeout == 0 {
		timeout = defaultUpdateTimeout
	}
	return &GeeRegistryDiscovery{
		MultiServerDiscovery: NewMultiServerDiscovery(make([]string, 0)),
		registryAddr:         registerAddr,
		timeout:              timeout,
	}
}
  • GeeRegistryDiscovery 嵌套了 MultiServersDiscovery,很多能力可以复用。
  • registryAddr 即注册中心的地址
  • timeout 服务列表的过期时间
  • lastUpdate 是代表最后从注册中心更新服务列表的时间,默认 10s 过期,即 10s 之后,需要从注册中心更新新的列表。

实现 Update 和 Refresh 方法,超时重新获取的逻辑在 Refresh 中实现: 

func (d *GeeRegistryDiscovery) Update(servers []string) error {
	d.rwMutex.Lock()
	defer d.rwMutex.Unlock()
	d.servers = servers
	d.lastUpdate = time.Now()
	return nil
}

// 刷新,有了注册中心,在客户端每次获取服务实例时候,需要刷新注册中心的保存的服务实例
func (d *GeeRegistryDiscovery) Refresh() error {
	d.rwMutex.Lock()
	defer d.rwMutex.Unlock()
    //注册中心保存的服务实例还没超时,不用更新
	if d.lastUpdate.Add(d.timeout).After(time.Now()) {
		return nil
	}
	httpClient := http.Client{Timeout: time.Second * 10} //http客户端最好有个超时
	resp, err := httpClient.Get(d.registryAddr)
	if err != nil {
		fmt.Println("rpc registry refresh err:", err)
		return err
	}

	defer resp.Body.Close()
	servers := strings.Split(resp.Header.Get("X-rpc-Servers"), ",")
	d.servers = make([]string, 0, len(servers))
	for _, server := range servers {
		//返回一个string类型,并将最前面和最后面的ASCII定义的空格去掉,中间的空格不会去掉
		s := strings.TrimSpace(server)
		if s != "" {
			d.servers = append(d.servers, s)
		}
	}

	d.lastUpdate = time.Now()
	return nil
}

 Get 和 GetAll 与 MultiServersDiscovery 相似,唯一的不同在于,GeeRegistryDiscovery 需要先调用 Refresh 确保服务列表没有过期

func (d *GeeRegistryDiscovery) Get(mode SelectMode) (string, error) {
	if err := d.Refresh(); err != nil {
		return "", err
	}
	//d.Get(mode) 表示调用的是(GeeRegistryDiscovery).Get
	return d.MultiServerDiscovery.Get(mode) //d.MultiServerDiscovery是调用MultiServerDiscovery的Get()
}

func (d *GeeRegistryDiscovery) GetAll() ([]string, error) {
	if err := d.Refresh(); err != nil {
		return nil, err
	}
	return d.MultiServerDiscovery.GetAll()
}

4.测试

添加函数 startRegistry,之后需要稍微修改 startServer,定期向注册中心发送心跳保活(Heartbeat)。

这里使用sync.WaitGroup是为了等待该操作执行完毕才会往后执行,因为这些函数都是新开协程运行。

func startServer(registryAddr string, wg *sync.WaitGroup) {
	var myServie My

	l, _ := net.Listen("tcp", "localhost:0") //端口是0表示端口随机
	server := geerpc.NewServer()
	//这里一定要用&myServie,因为前面Sum方法的接受者是*My;若接受者是My,myServie或者&myServie都可以
	server.Register(&myServie)
	registry.Heartbeat(registryAddr, "tcp@"+l.Addr().String(), 0)  //定时发送心跳
	wg.Done()
	server.Accept(l)
}

func startRegistry(wg *sync.WaitGroup) {
	l, _ := net.Listen("tcp", "localhost:9999")
	registry.HandleHTTP()
	wg.Done()
	http.Serve(l, nil)
}

接下来,将 call 和 broadcast 的 MultiServersDiscovery 替换为 GeeRegistryDiscovery,不再需要硬编码服务列表。 

这里就重点对比下NewGeeRegistryDiscovery方法和之前的不同之处。

// 调用单个服务实例
func clientCall(registryAddr string) {
	// d := xclient.NewMultiServerDiscovery([]string{"tcp@" + addr1, "tcp@" + addr2})
	d := xclient.NewGeeRegistryDiscovery(registryAddr, 0)
	xc := xclient.NewXClient(d, xclient.RandomSelect, nil)
	defer xc.Close()

	var wg sync.WaitGroup

	for i := 0; i < 5; i++ {
		wg.Add(1)
		go func(i int) {
			defer wg.Done()
			var reply int = 1324
			if err := xc.Call(context.Background(), "My.Sum", &Args{Num1: i, Num2: i * i}, &reply); err != nil {
				log.Println("call Foo.Sum error:", err)
			}
			fmt.Println("reply: ", reply)
		}(i)
	}
	wg.Wait()
}

func broadcast(registryAddr string) {
	// d := xclient.NewMultiServerDiscovery([]string{"tcp@" + addr1, "tcp@" + addr2})
	d := xclient.NewGeeRegistryDiscovery(registryAddr, 0)
	xc := xclient.NewXClient(d, xclient.RandomSelect, nil)
	defer xc.Close()

	var wg sync.WaitGroup

	for i := 0; i < 5; i++ {
		wg.Add(1)
		go func(i int) {
			defer wg.Done()
			var reply int = 1324
			if err := xc.Broadcast(context.Background(), "My.Sum", &Args{Num1: i, Num2: i * i}, &reply); err != nil {
				fmt.Println("Broadcast call Foo.Sum error:", err)
			}
			fmt.Println("Broadcast reply: ", reply)

			ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
			defer cancel()
			var replyTimeout int = 1324
			if err := xc.Broadcast(ctx, "My.Sleep", &Args{Num1: i, Num2: i * i}, &replyTimeout); err != nil {
				fmt.Println("Broadcast call Foo.Sum error:", err)
			}
			fmt.Println("timeout Broadcast reply: ", replyTimeout)

		}(i)
	}
	wg.Wait()
}

最后是main函数。

确保注册中心启动后,再启动 RPC 服务端,最后客户端远程调用。

func main() {
	registryAddr := "http://localhost:9999/_rpc_/registry"

	var wg sync.WaitGroup
	wg.Add(1)
	go startRegistry(&wg) //开启注册中心服务
	wg.Wait()
	time.Sleep(time.Second)

	wg.Add(2)
	go startServer(registryAddr, &wg)
	go startServer(registryAddr, &wg)
	wg.Wait()

	time.Sleep(time.Second)
	clientCall(registryAddr)
	broadcast(registryAddr)
}

运行结果:

代码: https://github.com/liwook/Go-projects/tree/main/geerpc/7-registry

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/363785.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

python+selenium的web自动化】- 元素的常用操作详解(一)

&#x1f525; 交流讨论&#xff1a;欢迎加入我们一起学习&#xff01; &#x1f525; 资源分享&#xff1a;耗时200小时精选的「软件测试」资料包 &#x1f525; 教程推荐&#xff1a;火遍全网的《软件测试》教程 &#x1f4e2;欢迎点赞 &#x1f44d; 收藏 ⭐留言 &#x1…

PySimpleGUI 综合应用|英语文本朗读以及转换为语音Mp3

PySimpleGUI 综合应用 目录 PySimpleGUI 综合应用 应用界面 完整代码 所需模块 PySimpleGUI pyttsx3 pyaudio rapidfuzz 字典格式 应用界面 完整代码 英语朗读器.pyw import PySimpleGUI as sg import pyttsx3,pyaudio,pyperclip import os,re,datetime,wave,threa…

EasyExcel使用,实体导入导出

简介 Java解析、生成Excel比较有名的框架有Apache poi、jxl。但他们都存在一个严重的问题就是非常的耗内存&#xff0c;poi有一套SAX模式的API可以一定程度的解决一些内存溢出的问题&#xff0c;但POI还是有一些缺陷&#xff0c;比如07版Excel解压缩以及解压后存储都是在内存中…

@JsonProperty(“xx“)的使用

JsonProperty(“xx”) 指定JSON字段的名称 常见的场景 1、第三方调用本系统&#xff0c;参数接收不到 &#xff0c;没有使用驼峰命名&#xff0c;或者传参格式不一致问题 2、前后端调试&#xff0c;接收不到个别参数 遇到的场景描述&#xff1a; 1&#xff0c;项目提供ope…

Vue中的指令修饰符计算属性和方法的区别

一、指令修饰符 1.什么是指令修饰符&#xff1f; 所谓指令修饰符就是通过“.”指明一些指令后缀 不同的后缀封装了不同的处理操作 —> 简化代码 2.按键修饰符 keyup.enter —>当点击enter键的时候才触发 代码演示&#xff1a; <!DOCTYPE html> <html lang…

【ChatGPT】文本向量化与余弦相似度:揭开文本处理的神秘面纱(5)

1、引言 在这个数字化的时代&#xff0c;我们每天都会面对大量的文本信息&#xff0c;从社交媒体到新闻报道&#xff0c;文本无处不在。但是&#xff0c;计算机要如何理解和处理这些文字呢&#xff1f;本文将为大家揭开其中的一些奥秘&#xff0c;详细解释文本向量化的概念&am…

C++ hash—unordered_mapset

目录 一. unordered系列关联式容器 1、文档说明 2、接口说明 1. 构造 2. 容量 3. 迭代器 4. 元素访问 5. 查询 6. 修改 7. 桶操作 8. 测试 二、unordered_set 1、​​​​​​​文档说明 2、接口说明 1. 构造 2. 容量 3. 迭代器 4. 元素访问 5. 插入和删除…

【脑电信号处理与特征提取】P7-贾会宾:基于EEG/MEG信号的大尺度脑功能网络分析

基于EEG/MEG信号的大尺度脑功能网络分析 Q: 什么是基于EEG/MEG信号的大尺度脑功能网络分析&#xff1f; A: 基于脑电图&#xff08;EEG&#xff09;或脑磁图&#xff08;MEG&#xff09;信号的大尺度脑功能网络分析是一种研究大脑活动的方法&#xff0c;旨在探索脑区之间的功能…

Win11系统连接带HDMI接口的显示器后,电脑没有声音如何调试

解决这个问题的方法很简单&#xff0c;没有那么复杂。之所以使用HDMI接口连接了显示器后没声音&#xff0c;原因就是HDMI接口是包含音频视频两种信号的接口。当电脑的HDMI接口被使用时&#xff0c;系统就会默认从HDMI设备输出声音信号了&#xff0c;而此时如果HDMI设备没有声音…

vivado里的LUT、LUTRAM、FF、BRAM、DSP、IO、BUFG、MMCM资源介绍

vivado里的LUT、LUTRAM、FF、BRAM、DSP、IO、BUFG、MMCM资源介绍 提示&#xff1a;以下是本篇文章正文内容&#xff0c;写文章实属不易&#xff0c;希望能帮助到各位&#xff0c;转载请附上链接。 vivado实现电路用到的资源类型 LUT&#xff08;Look-Up Table&#xff09;&am…

文心一言APP上线新功能,一张照片、三句话即可生成专属数字分身

只需一张照片、录制三句话&#xff0c;就能拥有一个自己的数字分身&#xff1f;这不是科幻电影&#xff0c;而是文心一言APP上线的新功能 - 数字分身。 目前&#xff0c;文心一言APP正在内测数字分身新功能&#xff0c;明天起&#xff0c;iOS和Android用户升级新版本后&#xf…

给定长度为n的递增数组a,进行n - 1次操作:求当前a数组的差分数组,然后使a为差分数组,继续进行操作。求最后数组的元素是什么

题目 思路: #include <bits/stdc++.h> using namespace std; #define int long long typedef long long ll; #define pb push_back #define lson p << 1 #define rson p << 1 | 1 #define fi first #define se second const int maxn = 1e6 + 5, maxm = 5e…

java之基础知识、零碎知识

MENU java学习路程之篇一、知识点、path环境变量、计算机发展史、数据的存储和运算、人机交互、计算机语言java学习路程之篇二、知识点、JAVA背景介绍、配置JAVA_HOME、跨平台、JVM、JRE、JDKjava学习路程之篇三、知识点、类、模块、项目、操作、下载、安装、IDEA、开发工具jav…

Vue-49、Vue技术实现动画效果

1、首先&#xff0c;在Vue项目中的src/components文件夹下创建一个名为AnimatedBox.vue的文件。 2、编辑AnimatedBox.vue文件&#xff0c;添加以下代码&#xff1a; <template><div class"animated-box" click"toggle"><transition name&q…

Excel没有内置统计字数功能,但可以用一些变通的方法

是否需要计算Excel工作簿中某个单元格或单元格范围内的单词数? 出于多种原因,你可能需要计算文本数据中的字数。也许你有逗号分隔的列表,需要计算每个列表中的项目数。 不幸的是,Excel没有内置的单词计数方法。但是有一些聪明的方法可以得到你需要的结果。 这篇文章将向…

【Docker】【深度学习算法】在Docker中使用gunicorn启动多个并行算法服务,优化算法服务:从单进程到并行化

文章目录 优化算法服务&#xff1a;从单进程到并行化单个服务架构多并行服务架构Docker化并指定并行服务数量 扩展知识 优化算法服务&#xff1a;从单进程到并行化 在实际应用中&#xff0c;单个算法服务的并发能力可能无法满足需求。为了提高性能和并发处理能力&#xff0c;我…

OpenHarmony—开发及引用静态共享包(API 9)

HAR(Harmony Archive&#xff09;是静态共享包&#xff0c;可以包含代码、C库、资源和配置文件。通过HAR可以实现多个模块或多个工程共享ArkUI组件、资源等相关代码。HAR不同于HAP&#xff0c;不能独立安装运行在设备上&#xff0c;只能作为应用模块的依赖项被引用。 接下来&a…

Matlab图像模拟加噪——高斯噪声、椒盐噪声、泊松噪声、乘性噪声、均匀噪声、指数噪声

1.高斯噪声 (1)通过均值和方差来产生 Jimnoise(I, gaussian, 0, 0.01);%高斯噪声&#xff0c;均值为0&#xff0c;方差为0.01(2)通过位置信息来产生 Iim2double(I); Vzeros(size(I)); %建立矩阵V for i1:size(V, 1)V(i,:)0.02*i/size(V,1); end Jimnoise(I, localvar, V); …

Android Jetpack Compose之底部导航栏的实现

目录 1.概述2. 效果展示3. 代码实现3.1 定义底部导航栏的tab项3.2 整体页面架构搭建3.3 底部导航栏的实现3.4 所有代码 4.总结 1.概述 写过一段Android jetpack compose 界面的小伙伴应该都用过Compose的脚手架Scaffold&#xff0c;利用它我们可以很快的实现一个现代APP的主流…

android开发---简单购物商城(JAVA) (一)

包括&#xff1a;商品展示&#xff0c;商品详情&#xff0c;购物车&#xff0c;删除&#xff0c;一键清除&#xff0c;返回 运用sqllist 另外因为一篇写不下 继续可看 源码二 下面是目录 运行样子 下面是源码 AndroidManifest.xml <?xml version"1.0" e…