cache教程 5.分布式节点的通信

0.对原教程的一些见解

其回顾完请求流程就是抽象了两个接口,PeerPicker和PeerGetter。这样操作,读者阅读时可能很难快速明白其含义,不好理解为什么就创建出两个接口,感觉会比较疑惑。原教程的评论中也有讨论这点。

 本教程就先不创建接口,而是使用struct方式,这样可能好理解点。

1.节点请求处理的流程

先弄清楚我们查询缓存的逻辑。

单节点: 

客户发送查询请求到节点A,该节点有缓存就立即返回,若是没有就执行用户设置的回调函数获取值并添加到缓存中,然后返回。

分布式节点:

客户端发送查询请求到某个缓存节点,该节点会判断该key是否在本地,若是不在本地,使用一致性哈希选择节点,若不是在远程节点,则就退回到本地节点处理;若在远程节点,该节点会发送请求去访问其他 node 节点。(不是客户端再去访问其他节点)

从这可以看出,一个node要处理两种请求,一个是来自客户端的外部请求,一个是来自其他远端节点的内部请求

为了清晰,划分职责,我们可以在一个node中启动两种HTTP服务,一个处理客户端请求(APIServer), 一个处理节点之间的请求(CacheServer)

2.HTTP客户端

之前我们为 HTTPPool 实现了服务端功能,通信不仅需要服务端还需要客户端,因此,我们接下来先实现客户端的功能。这个客户端是节点作为客户端去访问其他节点

  • baseURL 表示将要访问的远程节点的地址,例如 http://example.com/geecache/
type httpGetter struct {
	baseURL string
}

func (h *httpGetter) Get(group string, key string) ([]byte, error) {
	//QueryEscape 对字符串进行转义,以便可以将其安全地放置在 URL 查询中。
	u := fmt.Sprintf("%v/%v/%v", h.baseURL,
		url.QueryEscape(group),
		url.QueryEscape(key))

	res, err := http.Get(u)
	if err != nil {
		return nil, err
	}
	defer res.Body.Close()

	if res.StatusCode != http.StatusOK {
		return nil, fmt.Errorf("server returned: %v", res.Status)
	}

	bytes, err := io.ReadAll(res.Body)
	if err != nil {
		return nil, fmt.Errorf("reading response body: %v", err)
	}
	return bytes, nil
}

3.回顾上一章节实现的单节点的访问流程

func (g *Group) Get(key string) (ByteView, error) {
    //现在本地查询
	if v, ok := g.mainCache.get(key); ok {
		return v, nil
	}

	return g.load(key)   
}

func (g *Group) load(key string) (ByteView, error) {
	bytes, err := g.getter.Get(key)
	if err != nil {
		return ByteView{}, err
	}
	value := ByteView{b: cloneByte(bytes)}
	g.mainCache.add(key, value)
	return value, nil
}

那很明显是需要修改load方法,让其可以去访问远程节点。

在load方法中,伪代码如下。

func func (g *Group) load(key string) (ByteView, error){
	if 有远程节点 {
		if 找到key所在的远程节点 {
			本地作为客户端去访问该远程节点
		}
	}

	没有远程节点,只能在本地调用回调函数去源地方获取
}

要想在Group中访问节点,那么就要在Group中存储节点集合。

节点结合结构体Peers

那节点集合是不是又要创建一个结构体?那先试试创建一个结构体Peers。

因为 hash 环的 map 不是线程安全的,所以这里要加锁。

成员变量 httpGetters,映射远程节点与对应的 httpGetter。(httpGetter就是个客户端,是一个节点作为客户端),每一个远程节点对应一个 httpGetter,因为 httpGetter 与远程节点的地址 baseURL 有关,map的key是远程节点的地址,比如"http://localhost:10000"

type Peers struct {
	addr          string //这个是用于进行选择节点时用来判断是不是本地节点
	basePath      string
	mutex         sync.Mutex    //guards peersHashRing and httpGetters
	peersHashRing *consistenthash.HashRing
	httpGetters   map[string]*httpGetter
}

//这是HTTP服务端章节的HTTPPool,这是很相似的
type HTTPPool struct {
	addr     string
	basePath string
}

那么该结构体Peers就要有添加远程节点和通过key去获取远程节点的方法。

增添远程节点方法Set

通过该方法可以知道其map的key是远程节点的地址。

// 使用用例:Set("http://localhost:8001","http://localhost:8002")
func (p *Peers) Set(peers ...string) {
	p.mutex.Lock()
	defer p.mutex.Unlock()

	p.peersHashRing = consistenthash.NewHash(50, nil)
	p.peersHashRing.Add(peers...) //在 hash 环上添加真实节点和虚拟节点
	//存储远端节点信息
	p.httpGetters = make(map[string]*httpGetter)
	for _, peer := range peers {
		p.httpGetters[peer] = &httpGetter{baseURL: peer + p.basePath}
	}
}

通过key去获取远程节点的方法PickPeer

Peers结构体中的变量addr在这里派上用场了,返回的地址要是等于本身addr,那就返回false,不用自己作为客户端再去访问自己。

func (p *Peers) PickPeer(key string) (*httpGetter, bool) {
	p.mutex.Lock()
	defer p.mutex.Unlock()
	//这里返回的peer是个地址,可以查看(Peers).Set函数中的参数
	if peer := p.peersHashRing.Get(key); peer != "" && peer != p.addr {
		fmt.Println("pick peer ", peer)
		return p.httpGetters[peer], true
	}
	return &httpGetter{}, false
}

Peers这个结构体就实现了,可以看到其与HTTPPool是很相似的。对比HTTPPool,就是成员变量添加了一些,方法也添加了一些,也没有改变HTTPPool原有的逻辑,只是扩张了。所以可以把Peers的内容添加到HTTPPool中去,具体的代码就不在这里显示了。

type HTTPPool struct {
	addr     string
	basePath string

	//新添加的,把Peers内容增添到HTTPPool中
	mutex         sync.Mutex
	peersHashRing *consistenthash.HashRing
	httpGetters   map[string]*httpGetter
}

4.集成,实现主流程

最后,我们需要将上述新增的功能集成在主流程(geecache.go)中。

在Group结构体中有改变。

新增 RegisterPeers() 方法,将 peers 注入到 Group 中。

type Group struct {
	name      string
	mainCache cache
	getter    Getter

	peers *Peers //添加了节点集合
}

// 往分组内注册节点集合
func (g *Group) RegisterPeers(peers *Peers) {
	if g.peers != nil {
		panic("RegisterPeerPicker called more than once")
	}
	g.peers = peers
}

最终再回到load函数,这个函数是需要修改的。

func (g *Group) load(key string) (value ByteView, err error) {
	if g.peers != nil {    //有远程节点的情况
		if peer, ok := g.peers.PickPeer(key); ok {    //通过key找到该远程节点
			if value, err = g.getFromPeer(peer, key); err == nil {
				return value, nil        //找到值
			}
			log.Println("[GeeCache] Failed to get from peer", err)
		}
	}

	return g.getLocally(key)    //回到本地处理
}

func (g *Group) getFromPeer(peer *httpGetter, key string) (ByteView, error) {
	bytes, err := peer.Get(g.name, key)
	if err != nil {
		return ByteView{}, err
	}
	return ByteView{b: bytes}, nil
}

func (g *Group) getLocally(key string) (ByteView, error) {
	bytes, err := g.getter.Get(key)
	if err != nil {
		return ByteView{}, err
	}
	value := ByteView{b: cloneByte(bytes)}
	g.mainCache.add(key, value)
	return value, nil
}
  • 新增 getFromPeer() 方法,使用httpGetter 访问远程节点,获取缓存值。
  • 修改 load 方法,使用 PickPeer() 方法选择节点,若非本机节点,则调用 getFromPeer() 从远程获取。若是本机节点或失败,则回退到 getLocally()

5. 测试

总结——缓存节点启动的流程

  1. 创建 Group 对象.(用于存储我们的缓存数据)
  2. 启动缓存 http 服务.(创建 HTTPPool,添加节点信息,注册到缓存分组中)
  3. 启动 API 服务.(用于与客户端进行交互)

 测试代码:

var db = map[string]string{
	"Tom":  "630",
	"Jack": "589",
	"Sam":  "567",
}

func main() {
	var port int
	var api bool
	flag.IntVar(&port, "port", 8001, "Geecache server port")
	flag.BoolVar(&api, "api", false, "Start a api server?")
	flag.Parse()

	apiAddr := "http://localhost:9999"
	addrMap := map[int]string{
		8001: "http://localhost:8001",
		8002: "http://localhost:8002",
		8003: "http://localhost:8003",
	}

	var addrs []string
	for _, v := range addrMap {
		addrs = append(addrs, v)
	}

	gee := createGroup()
	if api {
		go startAPIServer(apiAddr, gee)
	}
	startCacheServer(addrMap[port], addrs, gee)
	time.Sleep(time.Second * 1000)
}

func createGroup() *cache.Group {
	return cache.NewGroup("scores", 2<<10, cache.GetterFunc(func(key string) ([]byte, error) {
		if v, ok := db[key]; ok {
			return []byte(v), nil
		}
		return nil, fmt.Errorf("%s not exit", key)
	}))
}

func startCacheServer(addr string, addrs []string, groups *cache.Group) {
	//HTTPPool是节点结合和HTTP服务端
	peers := cache.NewHTTPPool(addr, cache.DefaultBasePath)
	peers.Set(addrs...)         //添加节点
	groups.RegisterPeers(peers) //注册节点集合
	log.Println("geecache is running at", addr)
	http.ListenAndServe(addr[7:], peers)
}

func startAPIServer(apiAddr string, groups *cache.Group) {
	http.HandleFunc("/api", func(w http.ResponseWriter, r *http.Request) {
		key := r.URL.Query().Get("key")
		view, err := groups.Get(key)
		if err != nil {
			http.Error(w, err.Error(), http.StatusInternalServerError)

			return
		}
		w.Header().Set("Content-Type", "application/octet-stream")
		w.Write(view.ByteSlice())
	})

	log.Println("fontend server is running at", apiAddr)
	http.ListenAndServe(apiAddr[7:], nil)
}

为了方便,我们将启动的命令封装为一个 shell 脚本:

我们开启了三个节点(都是在同一个台机器上的,只是用不同端口来当做一个节点,进行区分)。

在端口8003的节点上开启APIServer,用户去访问时候,都是访问端口8003的那个节点。

#!/bin/bash

#trap 命令用于在 shell 脚本退出时,删掉临时文件,结束在该shell脚本运行的后台程序
trap "rm server;kill 0" EXIT

go build -o server
./server -port=8001 &
./server -port=8002 &
./server -port=8003 -api=1 &

sleep 2
echo ">>> start test"
curl "http://localhost:9999/api?key=Tom" &
curl "http://localhost:9999/api?key=Tom" &
curl "http://localhost:9999/api?key=Tom" &

wait

结果

测试的时候,我们并发了 3 个请求 ?key=Tom,从日志中可以看到,三次均选择了节点 8001,这是一致性哈希算法的功劳。

但是会有一个问题,同时向 8001 发起了 3 次请求。试想,假如有 10 万个在并发请求该数据呢?那就会向 8001 同时发起 10 万次请求,如果 8001 又同时向数据库发起 10 万次查询请求,很容易导致缓存被击穿。

三次请求的结果是一致的,对于相同的 key,能不能只向 8001 发起一次请求?这个问题下一次解决。

6.多节点的访问流程图

完整代码:https://github.com/liwook/Go-projects/tree/main/go-cache/5-multi-nodes

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/236166.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Error opening file for writing报错解决

报错展示及描述 在安装pycharm的时候出现了一下报错&#xff0c; Error opening file for writing。 报错原因 一般出现这种报错都是文件权限的原因&#xff0c;检查一下&#xff0c;果然这个文件夹权限是【只读】 查看文件权限的方式&#xff1a;【右击】文件夹名称&#xff0…

【23真题】拜托再练一套!保持手感!

今天分享的是23年河北科技大学882的信号与系统试题及解析。 本套试卷难度分析&#xff1a;22年河北科技大学822考研真题&#xff0c;我也发布过&#xff0c;若有需要&#xff0c;戳这里自取&#xff01;本套试题难度中等偏下&#xff0c;题量适中&#xff0c;对于很多基础知识…

大创项目推荐 协同过滤电影推荐系统

文章目录 1 简介1 设计概要2 课题背景和目的3 协同过滤算法原理3.1 基于用户的协同过滤推荐算法实现原理3.1.1 步骤13.1.2 步骤23.1.3 步骤33.1.4 步骤4 4 系统实现4.1 开发环境4.2 系统功能描述4.3 系统数据流程4.3.1 用户端数据流程4.3.2 管理员端数据流程 4.4 系统功能设计 …

mycat部署和配置读写分离(二)

说明&#xff1a; MyCAT 是使用 JAVA 语言进行编写开发&#xff0c;使用前需要先安装 JAVA 运行环境(JRE),由于 MyCAT 中使用了 JDK7 中的一些特性&#xff0c;所以要求必须在 JDK7 以上的版本上运行。 1. jdk1.8安装 详见jdk环境安装 2. Mysql安装 详见mysql8.0.11源码安装…

Oracle(2-15)RMAN Incomplete Recovery

文章目录 一、基础知识1、The Procedure 不完全恢复步骤2、UNTIL TIME Example 基于时间的恢复3、UNTIL SEOUENCE Example 基于序列的恢复 二、基础操作1、不完全恢复准备工作2、不完全恢复开始恢复 RMAN Incomplete Recovery RMAN的不完全恢复 目标&#xff1a; 使用“UNTIL T…

【开源】基于Vue和SpringBoot的计算机机房作业管理系统

项目编号&#xff1a; S 017 &#xff0c;文末获取源码。 \color{red}{项目编号&#xff1a;S017&#xff0c;文末获取源码。} 项目编号&#xff1a;S017&#xff0c;文末获取源码。 目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块2.1 登录注册模块2.2 课程管理模块2.3 课…

Linux基础指令(2)

今天我们继续来学我们有关于Linux的指令&#xff0c;今天的指令要比上次多多了。开始我们的学习吧。 man手册 先来看标题&#xff0c;手册我们第一时间想到的就是手册的查阅功能&#xff0c;我们都知道在我们上小学的时候&#xff0c;如果遇到不会的字&#xff0c;我们会通过…

《opencv实用探索·十六》opencv直方图计算calcHist函数解析

直方图理解&#xff1a; &#xff08;对于8位灰度图像亮度/灰度为(0-255)&#xff0c;12位灰度图像亮度/灰度为(0-4095)&#xff09; 以8位图像为例&#xff0c;亮度分为0到255共256个数值&#xff0c;数值越大&#xff0c;代表的亮度越高。其中0代表纯黑色的最暗区域&#xff…

微服务学习:Nacos配置中心

先打开Nacos&#xff08;详见微服务学习&#xff1a;Nacos微服务架构中的服务注册、服务发现和动态配置&Nacos下载&#xff09; 1.环境隔离&#xff1a; 新建命名空间&#xff1a; 记住命名空间ID&#xff1a; c82496fb-237f-47f7-91ed-288a53a63324 再配置 就可达成环…

kube-prometheus+kube-thanos

背景 最近在做监控&#xff0c;选择了thanos架构&#xff0c;使用了kube-prometheuskube-thanos&#xff0c;这里记录一下搭建过程。 原理 我选择的是sidecar的方式&#xff0c;这张图画的很好&#xff0c;thanos就理解为多个prometheus的汇合点&#xff0c;当一个query发到t…

npm run build时提示vue/types/jsx.d.ts中的错误

解决方法一&#xff1a; 可能是因为vue版本过高引起的 我直接将package.json中vue以及vue-template-compiler的版本的前面^去掉&#xff0c;安装指定的版本 注意&#xff1a;vue和vue-template-compiler需要版本一致 参考链接&#xff1a;链接 解决方法二&#xff1a; 如果如…

LV.13 D2 开发板启动流程 学习笔记

一、开发板启动过程 EMMC&#xff1a;相当于电脑的外存&#xff0c;断电不丢失 开发板上电后首先运行SOC内部iROM中固化的代码(BL0)&#xff0c;这段代码先对基本的软硬件环境(时钟等...)进行初始化&#xff0c;然后再检测拨码开关位置获取启动方式&#xff0c;然后再将对应存储…

解决HTTP 429错误的Scrapy中间件配置

引言 在进行网络数据抓取时&#xff0c;经常会遇到HTTP 429错误&#xff0c;表示请求速率已超出API限制。为避免封禁或限制访问&#xff0c;需要调整Scrapy的请求速率&#xff0c;以在不触发HTTP 429错误的情况下完成数据抓取。针对这一问题&#xff0c;可使用Scrapy的AutoThr…

3DMax物理画笔物体填充放置绘制画笔插件安装使用方法

3DMax物理画笔物体填充放置绘制画笔插件&#xff0c;允许您使用笔刷以非常自然的方式用物品快速填充场景&#xff0c;并使用刚体模拟自动放置它们。 无论你是从事建筑、游戏电影还是商业。。。等等&#xff0c;你经常需要用一些物品为你的场景添加细节。手工放置它们是乏味的&…

采集数据更快捷,轻松生成调查问卷二维码

现在用二维码的方式来采集用户的数据&#xff0c;是现在很常用的一种统计数据的手段&#xff0c;这种方法更加简单快捷做好数据统计&#xff0c;那么表单类型的二维码能如何快速生成呢&#xff1f;下面来教大家在线二维码生成器的使用方法&#xff0c;能够用简单的步骤快速制作…

最长子字符串的长度 (一) - 华为OD统一考试(C卷)

OD统一考试&#xff08;C卷&#xff09; 分值&#xff1a; 100分 题解&#xff1a; Java / Python / C 题目描述 给你一个字符串 s&#xff0c;字符串s首尾相连成一个环形&#xff0c;请你在环中找出字符出现了偶数次最长子字符串的长度。 输入描述 输入是一串小写字母组成的…

玩转大数据14:分布式计算框架的选择与比较

1. 引言 随着大数据时代的到来&#xff0c;越来越多的企业和组织需要处理海量数据。分布式计算框架提供了一种有效的方式来解决大数据处理的问题。分布式计算框架将计算任务分解成多个子任务&#xff0c;并在多个节点上并行执行&#xff0c;从而提高计算效率。 2. 分布式计算…

低代码(low code)开发平台,我选JNPF

近年来&#xff0c;低代码开发技术正以迅猛的步伐崭露头角&#xff0c;成为数字化转型浪潮下的重要工具。据 Gartner 预测&#xff0c;到 2025 年&#xff0c;低代码技术将占据 70% 的新应用开发份额&#xff0c;引领着企业应用开发的新趋势。然而&#xff0c;随之而来的是市场…

传音荣获2023首届全国人工智能应用场景创新挑战赛“智能遥感专项赛”三等奖

11月26日&#xff0c;2023首届全国人工智能应用场景创新挑战赛“智能遥感专项赛”在北京圆满落幕。传音参赛项目《传音智慧应用平台产业化》凭借在技术攻关、社会效益和经济效益等多方面的突出优势荣获“智能遥感专项赛”三等奖。 本次竞赛以“场景驱动数智强国”为主题&#…

张正友相机标定法原理与实现

张正友相机标定法是张正友教授1998年提出的单平面棋盘格的相机标定方法。传统标定法的标定板是需要三维的,需要非常精确,这很难制作,而张正友教授提出的方法介于传统标定法和自标定法之间,但克服了传统标定法需要的高精度标定物的缺点,而仅需使用一个打印出来的棋盘格就可…