【Go | 从0实现简单分布式缓存】-3:分布式节点通信

在这里插入图片描述

本文目录

  • 一、通信流程
  • 二、peers.go
  • 三、http.go
  • 四、geecache.go
  • 五、测试代码

本文为极客兔兔动手写分布式缓存GeeCache学习笔记。

一、通信流程

在前面一节中,已经为 HTTPPool 实现了服务端功能,通信不仅需要服务端还需要客户端,因此本节来实现 HTTPPool 客户端的功能。

最开始我们的流程定义如下,前面已经实现了1和3,那么现在需要实现2这个点。

在这里插入图片描述
现在我们进一步来细化2这个点的步骤,来看看是怎么实现的。

在这里插入图片描述

二、peers.go

首先抽象出 2 个接口,PeerPickerPickPeer() 方法用于根据传入的 key 选择相应节点 PeerGetter

接口 PeerGetterGet() 方法用于从对应 group 查找缓存值。PeerGetter 就对应于上述流程中的 HTTP 客户端。

type PeerPicker interface {
	PickPeer(key string) (peer PeerGetter, ok bool)
	//根据传进来的键,去选择相对应的接口(就是调用一致性哈希算法)。
}

// PeerGetter is the interface that must be implemented by a peer.
type PeerGetter interface {
	Get(group string, key string) ([]byte, error)
	//从客户端去请求其他节点的对应的值的过程。
}

三、http.go

首先需要编写Peers的Set函数,也就是节点的注册函数。

先初始化一个一致性哈希环,将传入的节点地址添加到哈希环中。为每个节点创建一个 httpGetter 对象,并将它们存储到一个映射中,以便后续通过节点地址快速访问对应的 Getter 对象。整个过程是线程安全的,通过互斥锁保护共享资源的并发访问。

一句话来表示就是:Set() 方法实例化了一致性哈希算法,并且添加了传入的节点。并为每一个节点创建了一个 HTTP 客户端 httpGetter

peers ...string 是方法的参数,表示一个可变数量的字符串切片。调用时可以传入任意数量的字符串作为参数,这些字符串代表 HTTP 节点的地址。

func (p *HTTPPool) Set(peers ...string) {
	p.mu.Lock()
	defer p.mu.Unlock()
	p.peers = consistenthash.New(defaultReplicas, nil)
	p.peers.Add(peers...)
	p.httpGetters = make(map[string]*httpGetter, len(peers))
	for _, peer := range peers {
		p.httpGetters[peer] = &httpGetter{baseURL: peer + p.basePath}
	}
}

consistenthash.New 是一致性哈希库的构造函数,用于创建一个新的一致性哈希环。

defaultReplicas 是一个常量,表示每个节点在哈希环上的虚拟副本数量。一致性哈希通过虚拟节点(副本)来提高负载均衡的效果。

nil 是一致性哈希的哈希函数参数,这里使用默认的哈希函数。

p.peers.Add 方法会将每个节点添加到一致性哈希环中。一致性哈希环会根据节点的哈希值将它们分布在环上。

p.httpGetters 是一个映射(map),用于存储每个节点的 HTTP Getter 对象。

make(map[string]*httpGetter, len(peers)) 创建了一个映射,键是节点地址(string),值是指向 httpGetter 的指针。

for _, peer := range peers 遍历传入的 peers 切片,peer 是当前节点的地址。p.httpGetters[peer] 是将节点地址作为键存储到映射中。&httpGetter{baseURL: peer + p.basePath} 创建了一个 httpGetter 对象,并将其地址存储到映射中。也就是peer 是节点的地址。p.basePath 是 HTTPPool 中定义的一个字段,表示 HTTP 请求的路径前缀。peer + p.basePath 拼接成完整的 HTTP 请求的基地址。


然后是Pickeer函数。PickerPeer() 包装了一致性哈希算法的 Get() 方法,根据具体的 key,选择节点peer,并返回该节点对应的 HTTP 客户端,也就是PeerGetter,bool就代表是否成功的找到对应的节点

p.peers.Get(key):调用一致性哈希环的 Get 方法,根据输入的 key 选择一个节点。peer != "":检查返回的节点地址是否为空。如果为空,说明没有找到合适的节点。peer != p.self:检查返回的节点是否是当前节点(p.self)。如果返回的节点是当前节点,说明不需要跨节点远程获取数据,可以直接跳过。

func (p *HTTPPool) PickPeer(key string) (PeerGetter, bool) {
	p.mu.Lock()
	defer p.mu.Unlock()
	if peer := p.peers.Get(key); peer != "" && peer != p.self {
		p.Log("Pick peer %s", peer)
		return p.httpGetters[peer], true
	}
	return nil, false
}

四、geecache.go

geecache.go中需要添加方法。

首先,新增 getFromPeer() 方法,使用实现了 PeerGetter 接口的 httpGetter 从访问远程节点,获取缓存值,并将其封装为一个 ByteView 类型返回。。


func (g *Group) getFromPeer(peer PeerGetter, key string) (ByteView, error) {
	bytes, err := peer.Get(g.name, key)
	if err != nil {
		return ByteView{}, err
	}
	return ByteView{b: bytes}, nil
}

原本的load()函数非常简单,是直接调用getLocally()函数,现在 需要改进。

首先尝试从分布式缓存系统中加载指定键(key)的值。它首先尝试从远程节点加载数据,也就是getFromPeer这个方法会帮我们去调用peer的Get()方法,如果失败,则退回到本地加载,也就是getLocally(key)

func (g *Group) load(key string) (value ByteView, err error) {
	if g.peers != nil {
		if peer, ok := g.peers.PickPeer(key); ok {
			if value, err = g.getFromPeer(peer, key); err == nil {
				return value, nil
			}
			log.Println("[GeeCache] Failed to get from peer", err)
		}
	}

	return g.getLocally(key)
}


接下来又到了调用http.go中的Get方法。

首先创建具体的 HTTP 客户端类 httpGetter,实现 PeerGetter 接口

baseURL 表示将要访问的远程节点的地址,例如 http://example.com/_geecache/

使用 http.Get() 方式获取返回值(http.Get(u)是发送一个 HTTP GET 请求到构造好的 URL),并转换为 []bytes 类型。

type httpGetter struct {
	baseURL string
}

func (h *httpGetter) Get(group string, key string) ([]byte, error) {
	u := fmt.Sprintf(
		"%v%v/%v",
		h.baseURL,
		url.QueryEscape(group),
		url.QueryEscape(key),
	)
	res, err := http.Get(u)
	if err != nil {
		return nil, err
	}
	defer res.Body.Close()

	if res.StatusCode != http.StatusOK {
		return nil, fmt.Errorf("server returned: %v", res.Status)
	}

	bytes, err := ioutil.ReadAll(res.Body)
	if err != nil {
		return nil, fmt.Errorf("reading response body: %v", err)
	}

	return bytes, nil
}

var _ PeerGetter = (*httpGetter)(nil)

接下来讲讲具体代码的作用:

url.QueryEscape(group)url.QueryEscape(key):使用 url.QueryEscape 函数对缓存组名称和键进行 URL 编码,确保它们可以安全地嵌入到 URL 中。

var _ PeerGetter = (*httpGetter)(nil)

这行代码就比较熟悉了:通过类型断言,用于确保 *httpGetter 类型实现了 PeerGetter 接口。

也就是通过将 (*httpGetter)(nil) 赋值给 PeerGetter 类型的变量,强制编译器检查 *httpGetter 是否实现了 PeerGetter 接口。


所以在geecache.go中,我们一共实现需要添加下面的代码。

// A Group is a cache namespace and associated data loaded spread over
type Group struct {
	name      string
	getter    Getter
	mainCache cache
	peers     PeerPicker
}

// RegisterPeers registers a PeerPicker for choosing remote peer
func (g *Group) RegisterPeers(peers PeerPicker) {
	if g.peers != nil {
		panic("RegisterPeerPicker called more than once")
	}
	g.peers = peers
}

func (g *Group) load(key string) (value ByteView, err error) {
	if g.peers != nil {
		if peer, ok := g.peers.PickPeer(key); ok {
			if value, err = g.getFromPeer(peer, key); err == nil {
				return value, nil
			}
			log.Println("[GeeCache] Failed to get from peer", err)
		}
	}

	return g.getLocally(key)
}

func (g *Group) getFromPeer(peer PeerGetter, key string) (ByteView, error) {
	bytes, err := peer.Get(g.name, key)
	if err != nil {
		return ByteView{}, err
	}
	return ByteView{b: bytes}, nil
}

新增 RegisterPeers() 方法,将 实现了 PeerPicker 接口的 HTTPPool 注入到 Group 中。

新增 getFromPeer() 方法,使用实现了 PeerGetter 接口的 httpGetter 从访问远程节点,获取缓存值。

修改 load 方法,使用 PickPeer() 方法选择节点,若非本机节点,则调用 getFromPeer() 从远程获取。若是本机节点或失败,则回退到 getLocally()

五、测试代码

测试总体代码如下:

var db = map[string]string{
	"Tom":  "630",
	"Jack": "589",
	"Sam":  "567",
}

func createGroup() *geecache.Group {
	return geecache.NewGroup("scores", 2<<10, geecache.GetterFunc(
		func(key string) ([]byte, error) {
			log.Println("[SlowDB] search key", key)
			if v, ok := db[key]; ok {
				return []byte(v), nil
			}
			return nil, fmt.Errorf("%s not exist", key)
		}))
}

func startCacheServer(addr string, addrs []string, gee *geecache.Group) {
	peers := geecache.NewHTTPPool(addr)
	peers.Set(addrs...)
	gee.RegisterPeers(peers)
	log.Println("geecache is running at", addr)
	log.Fatal(http.ListenAndServe(addr[7:], peers))
}

func startAPIServer(apiAddr string, gee *geecache.Group) {
	http.Handle("/api", http.HandlerFunc(
		func(w http.ResponseWriter, r *http.Request) {
			key := r.URL.Query().Get("key")
			view, err := gee.Get(key)
			if err != nil {
				http.Error(w, err.Error(), http.StatusInternalServerError)
				return
			}
			w.Header().Set("Content-Type", "application/octet-stream")
			w.Write(view.ByteSlice())

		}))
	log.Println("fontend server is running at", apiAddr)
	log.Fatal(http.ListenAndServe(apiAddr[7:], nil))

}

func main() {
	var port int
	var api bool
	flag.IntVar(&port, "port", 8001, "Geecache server port")
	flag.BoolVar(&api, "api", false, "Start a api server?")
	flag.Parse()

	apiAddr := "http://localhost:9999"
	addrMap := map[int]string{
		8001: "http://localhost:8001",
		8002: "http://localhost:8002",
		8003: "http://localhost:8003",
	}

	var addrs []string
	for _, v := range addrMap {
		addrs = append(addrs, v)
	}

	gee := createGroup()
	if api {
		go startAPIServer(apiAddr, gee)
	}
	startCacheServer(addrMap[port], addrs, gee)
}

接下来挨个讲讲对应的功能。

定义了一个模拟的数据库,使用一个字符串到字符串的映射来存储键值对。它模拟了后端存储,用于在缓存未命中时提供数据。

也就是提供Getter回调函数,方便在没有找到数据的时候来返回。

var db = map[string]string{
	"Tom":  "630",
	"Jack": "589",
	"Sam":  "567",
}

创建了一个缓存组,名为 “scores”,缓存大小为 2KB。它使用了一个自定义的 GetterFunc,当缓存未命中时,会从模拟数据库 db(也就是上面的本地db模拟数据库) 中获取数据。如果键不存在,则返回错误。

func createGroup() *geecache.Group {
	return geecache.NewGroup("scores", 2<<10, geecache.GetterFunc(
		func(key string) ([]byte, error) {
			log.Println("[SlowDB] search key", key)
			if v, ok := db[key]; ok {
				return []byte(v), nil
			}
			return nil, fmt.Errorf("%s not exist", key)
		}))
}

启动了一个缓存服务器,监听指定的地址 addr。它创建了一个 HTTP 节点池 peers,并将所有节点地址 addrs 添加到池中。然后,它将节点池注册到缓存组 gee 中,并启动 HTTP 服务以监听缓存请求。

func startCacheServer(addr string, addrs []string, gee *geecache.Group) {
	peers := geecache.NewHTTPPool(addr)
	peers.Set(addrs...)
	gee.RegisterPeers(peers)
	log.Println("geecache is running at", addr)
	log.Fatal(http.ListenAndServe(addr[7:], peers))
}

启动了一个 API 服务器,监听指定的地址 apiAddr。它为 /api 路径提供了一个 HTTP 处理函数,该函数从请求中提取键 key,并从缓存组 gee 中获取数据。如果获取成功,它将数据作为响应返回;如果失败,则返回错误。

func startAPIServer(apiAddr string, gee *geecache.Group) {
	http.Handle("/api", http.HandlerFunc(
		func(w http.ResponseWriter, r *http.Request) {
			key := r.URL.Query().Get("key")
			view, err := gee.Get(key)
			if err != nil {
				http.Error(w, err.Error(), http.StatusInternalServerError)
				return
			}
			w.Header().Set("Content-Type", "application/octet-stream")
			w.Write(view.ByteSlice())
		}))
	log.Println("fontend server is running at", apiAddr)
	log.Fatal(http.ListenAndServe(apiAddr[7:], nil))
}

通过命令行参数解析来设置缓存服务器的端口和是否启动 API 服务器。它定义了一个地址映射 addrMap,用于存储所有缓存节点的地址。然后,它创建了一个缓存组 gee,并根据参数决定是否启动 API 服务器。最后,它启动一个缓存服务器,监听指定的端口。

func main() {
	var port int
	var api bool
	flag.IntVar(&port, "port", 8001, "Geecache server port")
	flag.BoolVar(&api, "api", false, "Start a api server?")
	flag.Parse()

	apiAddr := "http://localhost:9999"
	addrMap := map[int]string{
		8001: "http://localhost:8001",
		8002: "http://localhost:8002",
		8003: "http://localhost:8003",
	}

	var addrs []string
	for _, v := range addrMap {
		addrs = append(addrs, v)
	}

	gee := createGroup()
	if api {
		go startAPIServer(apiAddr, gee)
	}
	startCacheServer(addrMap[port], addrs, gee)
}

也就是总的来说,startCacheServer() 用来启动缓存服务器:创建 HTTPPool,添加节点信息,注册到 gee 中,启动 HTTP 服务(共3个端口,8001/8002/8003),用户不感知。

startAPIServer() 用来启动一个 API 服务(端口 9999),与用户进行交互,用户感知。

main() 函数需要命令行传入 port 和 api 2 个参数,用来在指定端口启动 HTTP 服务。


测试会发现一个明显的问题,就是测试的时候,并发了 3 个请求 ?key=Tom,从日志中可以看到,三次均选择了节点 8001,这是一致性哈希算法的功劳。但是有一个问题在于,同时向 8001 发起了 3 次请求。

假如有 10 万个在并发请求该数据呢?那就会向 8001 同时发起 10 万次请求,如果 8001 又同时向数据库发起 10 万次查询请求,很容易导致缓存被击穿。

三次请求的结果是一致的,对于相同的 key,能不能只向 8001 发起一次请求?这就是需要优化的点,也就是防止缓存击穿。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/977780.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Win32/ C++ 简易对话框封装框架(多语言, 通知栏菜单, 拖拽文件处理)

Win32 简易对话框封装简易框架示例 1. 菜单操作: 多语言 2. 通知栏图标菜单 3. 其他操作: 接受拖拽文件等等 CDialogFrame.h #pragma once #include "CWindow/CDialogBase.h" #include "CNSFHeader.h" #include "Win32Utils/CBytesUtils.h" …

如何在 Linux 上安装和配置 Zsh

文章目录 如何在 Linux 上安装和配置 Zsh1. 安装 Zsh1.1 在 Ubuntu/Debian 上安装1.2 在 CentOS/RHEL/Fedora 上安装1.3 在 Arch Linux 上安装1.4 验证 Zsh 安装 2. 设置 Zsh 为默认 Shell2.1 验证默认 shell 3. 配置 Zsh3.1 使用 Oh My Zsh3.1.1 安装 Oh My Zsh3.1.2 启用插件…

Ubuntu搭建esp32环境 配置打开AT指令集 websocket功能

1&#xff0c;搭建前提 环境搭建参考乐鑫官网给的本地编译 ESP-AT 工程方法 因为公司电脑和网络的特殊性&#xff0c;不能正确解析域名&#xff08;仅在浏览器上可以访问&#xff09; &#xff0c;所以这边访问的时候改成了ssh 未了避免使用外网困难的问题&#xff0c;这里用…

网络安全第三次练习

一、实验拓扑 二、实验要求 配置真实DNS服务信息&#xff0c;创建虚拟服务&#xff0c;配置DNS透明代理功能 三、需求分析 1.创建用户并配置认证策略 2.安全策略划分接口 3.ip与策略配置 四、实验步骤 1.划分安全策略接口 2.创建用户并进行策略认证 3.配置安全策略 4.NAT配…

Web自动化之Selenium下Chrome与Edge的Webdriver常用Options参数

目录 引言 说明 Add_argument() 添加方式 常用参数 Add_experimental_option() 添加方式 常用方法 任务结束后仍然保持浏览器打开 禁用“Chrome 正受到自动测试软件的控制”提示 设置下载路径 禁用弹窗拦截 禁用图片加载 禁用 JavaScript 注意 引言 …

【无标题】网络安全公钥密码体制

第一节 网络安全 概述 一、基本概念 网络安全通信所需要的基本属性“ 机密性&#xff1b;消息完整性&#xff1b;可访问性与可用性&#xff1b;身份认证。 二、网络安全威胁 窃听&#xff1b;插入&#xff1b;假冒&#xff1b;劫持&#xff1b;拒绝服务Dos和分布式拒绝服务…

2024年国赛高教杯数学建模D题反潜航空深弹命中概率问题解题全过程文档及程序

2024年国赛高教杯数学建模 D题 反潜航空深弹命中概率问题 原题再现 应用深水炸弹&#xff08;简称深弹&#xff09;反潜&#xff0c;曾是二战时期反潜的重要手段&#xff0c;而随着现代军事技术的发展&#xff0c;鱼雷已成为现代反潜作战的主要武器。但是&#xff0c;在海峡或…

在vscode中编译运行c语言文件,配置并运行OpenMP多线程并行程序设计

1.下载安装vscode Visual Studio Code - Code Editing. Redefined 2.安装vscode扩展 打开vscode,按ctrl+shift+x,打开扩展,搜索c/c++,下载相应的扩展 3.下载MinGW-w64 MinGW-w64 提供了 GNU 编译器集合,可以编译c/c++文件 这里下载见我的资源,可直接下载 把压缩包解压…

PyCharm Professional 2025 安装配置全流程指南(Windows平台)

一、软件定位与核心功能 PyCharm 2025 是 JetBrains 推出的智能 Python IDE&#xff0c;新增深度学习框架自动补全、实时性能热力图等功能1。相较于社区版&#xff0c;专业版支持&#xff1a; Web开发&#xff08;Django/Flask&#xff09;数据库工具&#xff08;PostgreSQL/…

从两地三中心到多地多中心,OceanBase如何实现金融级高可用

“两地三中心”已成为金融领域基准的容灾部署模式。本文将简要阐述金融行业容灾架构中“两地三中心”的具体要求和部署&#xff0c;并进一步探讨OceanBase在实现“两地三中心”标准后&#xff0c;再至“多地多中心”部署中所展现的独特优势与特点。 商业银行的容灾要求 《商业…

九、数据治理架构流程

一、总体结构 《数据治理架构流程图》&#xff08;Data Governance Architecture Flowchart&#xff09; 水平结构&#xff1a;流程图采用水平组织&#xff0c;显示从数据源到数据应用的进程。 垂直结构&#xff1a;每个水平部分进一步划分为垂直列&#xff0c;代表数据治理的…

6.将cr打包成网络服务|使用postman进行测试|编写oj_server的服务路由功能(C++)

将cr打包成网络服务 compile_server.cc #include "compile_run.hpp" #include "../comm/httplib.h"using namespace ns_compile_and_run; using namespace httplib;//编译服务随时可能被多个人请求&#xff0c;必须保证传递上来的code&#xff0c;形成源…

js前端数据加密 CryptoJS库加密 黑盒情况下寻找web的加密算法 代码混淆

前言 前端的数据加密是对用户的输入的一个常见的加密方法 还有的就是防止我们的sql注入 如 idMQ 这个其实解密出来就是 id 1 所以注入的思路就是 把 1和payload 一起加密然后 再进行注入 客户端的加密 > 数据加密传输 > 服务端解密 > 服务端的处理 传输的…

window平台上qtcreator上使用opencv报错

平台&#xff1a;win11 随便在网上下载一个别人编译好的opencv,发现运行报错 发现此次下载的opencv&#xff0c;别人在编译时选用的mingw版本应该和我电脑目前安装的mingw的版本不太一致 右键桌面的qtcreator图标&#xff0c;进入Tools目录&#xff0c;可以看到mingw的版本是…

Android之APP更新(通过接口更新)

文章目录 前言一、效果图二、实现步骤1.AndroidManifest权限申请2.activity实现3.有版本更新弹框UpdateappUtilDialog4.下载弹框DownloadAppUtils5.弹框背景图 总结 前言 对于做Android的朋友来说&#xff0c;APP更新功能再常见不过了&#xff0c;因为平台更新审核时间较长&am…

数字信任的底层逻辑:密码学核心技术与现实应用

安全和密码学 --The Missing Semester of Your CS Education 目录 熵与密码强度密码散列函数密钥体系 3.1 对称加密 3.2 非对称加密信任模型对比典型应用案例安全实践建议扩展练习杂项 密码学是构建数字信任的基石。 本文浅析密码学在现实工具中的应用&#xff0c;涵盖 1&…

达梦有没有类似oerr的功能

在oracle 23ai的sqlplus中&#xff0c;直接看异常信息说明&#xff1a; 达梦没有此功能&#xff0c;但是可以造一个 cd /home/dmdba cat >err.sql<<eof set echo off set ver off set timing off set lineshow off set feedback off select * from V\$ERR_INFO wher…

linux--多进程开发(5)--进程

进程间通讯概念 每两个进程之间都是独立的资源分配单元&#xff0c;不同进程之间不能直接访问另一个进程的资源。 但不同的进程需要进行信息的交互和状态的传递等&#xff0c;因此需要进程间通信&#xff08;IPC,inter processes cimmunication) 进程通信的目的&#xff1a; …

(二)趣学设计模式 之 工厂方法模式!

目录 一、 啥是工厂方法模式&#xff1f;二、 为什么要用工厂方法模式&#xff1f;三、 工厂方法模式怎么实现&#xff1f;四、 工厂方法模式的应用场景五、 工厂方法模式的优点和缺点六、 总结 &#x1f31f;我的其他文章也讲解的比较有趣&#x1f601;&#xff0c;如果喜欢博…

【c语言】字符函数和字符串函数(1)

一、字符分类函数 c语言中有部分函数是专门做字符分类的&#xff0c;也就是一个字符是属于什么类型的字符&#xff0c;这些函 数的使用要包含一个头文件ctype.h中。 其具体如下图所示&#xff1a; 这些函数的使用方式都类似&#xff0c;下面我们通过一个函数来看其…