本文目录
- 一、通信流程
- 二、peers.go
- 三、http.go
- 四、geecache.go
- 五、测试代码
本文为极客兔兔动手写分布式缓存GeeCache学习笔记。
一、通信流程
在前面一节中,已经为 HTTPPool 实现了服务端功能,通信不仅需要服务端还需要客户端,因此本节来实现 HTTPPool 客户端的功能。
最开始我们的流程定义如下,前面已经实现了1和3,那么现在需要实现2这个点。
现在我们进一步来细化2这个点的步骤,来看看是怎么实现的。
二、peers.go
首先抽象出 2 个接口,PeerPicker
的 PickPeer()
方法用于根据传入的 key 选择相应节点 PeerGetter
。
接口 PeerGetter
的 Get()
方法用于从对应 group
查找缓存值。PeerGetter
就对应于上述流程中的 HTTP 客户端。
type PeerPicker interface {
PickPeer(key string) (peer PeerGetter, ok bool)
//根据传进来的键,去选择相对应的接口(就是调用一致性哈希算法)。
}
// PeerGetter is the interface that must be implemented by a peer.
type PeerGetter interface {
Get(group string, key string) ([]byte, error)
//从客户端去请求其他节点的对应的值的过程。
}
三、http.go
首先需要编写Peers的Set函数,也就是节点的注册函数。
先初始化一个一致性哈希环,将传入的节点地址添加到哈希环中。为每个节点创建一个 httpGetter 对象,并将它们存储到一个映射中,以便后续通过节点地址快速访问对应的 Getter 对象。整个过程是线程安全的,通过互斥锁保护共享资源的并发访问。
一句话来表示就是:Set() 方法实例化了一致性哈希算法,并且添加了传入的节点。并为每一个节点创建了一个 HTTP 客户端 httpGetter
。
peers ...string
是方法的参数,表示一个可变数量的字符串切片。调用时可以传入任意数量的字符串作为参数,这些字符串代表 HTTP 节点的地址。
func (p *HTTPPool) Set(peers ...string) {
p.mu.Lock()
defer p.mu.Unlock()
p.peers = consistenthash.New(defaultReplicas, nil)
p.peers.Add(peers...)
p.httpGetters = make(map[string]*httpGetter, len(peers))
for _, peer := range peers {
p.httpGetters[peer] = &httpGetter{baseURL: peer + p.basePath}
}
}
consistenthash.New
是一致性哈希库的构造函数,用于创建一个新的一致性哈希环。
defaultReplicas
是一个常量,表示每个节点在哈希环上的虚拟副本数量。一致性哈希通过虚拟节点(副本)来提高负载均衡的效果。
nil
是一致性哈希的哈希函数参数,这里使用默认的哈希函数。
p.peers.Add
方法会将每个节点添加到一致性哈希环中。一致性哈希环会根据节点的哈希值将它们分布在环上。
p.httpGetters
是一个映射(map),用于存储每个节点的 HTTP Getter
对象。
make(map[string]*httpGetter, len(peers))
创建了一个映射,键是节点地址(string),值是指向 httpGetter 的指针。
for _, peer := range peers
遍历传入的 peers 切片,peer 是当前节点的地址。p.httpGetters[peer]
是将节点地址作为键存储到映射中。&httpGetter{baseURL: peer + p.basePath}
创建了一个 httpGetter 对象,并将其地址存储到映射中。也就是peer
是节点的地址。p.basePath
是 HTTPPool 中定义的一个字段,表示 HTTP 请求的路径前缀。peer + p.basePath
拼接成完整的 HTTP 请求的基地址。
然后是Pickeer
函数。PickerPeer() 包装了一致性哈希算法的 Get() 方法,根据具体的 key,选择节点peer,并返回该节点对应的 HTTP 客户端,也就是PeerGetter,bool就代表是否成功的找到对应的节点
。
p.peers.Get(key)
:调用一致性哈希环的 Get 方法,根据输入的 key 选择一个节点。peer != ""
:检查返回的节点地址是否为空。如果为空,说明没有找到合适的节点。peer != p.self
:检查返回的节点是否是当前节点(p.self)。如果返回的节点是当前节点,说明不需要跨节点远程获取数据,可以直接跳过。
func (p *HTTPPool) PickPeer(key string) (PeerGetter, bool) {
p.mu.Lock()
defer p.mu.Unlock()
if peer := p.peers.Get(key); peer != "" && peer != p.self {
p.Log("Pick peer %s", peer)
return p.httpGetters[peer], true
}
return nil, false
}
四、geecache.go
在geecache.go
中需要添加方法。
首先,新增 getFromPeer() 方法,使用实现了 PeerGetter 接口的 httpGetter 从访问远程节点,获取缓存值,并将其封装为一个 ByteView 类型返回。。
func (g *Group) getFromPeer(peer PeerGetter, key string) (ByteView, error) {
bytes, err := peer.Get(g.name, key)
if err != nil {
return ByteView{}, err
}
return ByteView{b: bytes}, nil
}
原本的load()
函数非常简单,是直接调用getLocally()
函数,现在 需要改进。
首先尝试从分布式缓存系统中加载指定键(key)的值。它首先尝试从远程节点加载数据,也就是getFromPeer
这个方法会帮我们去调用peer的Get()
方法,如果失败,则退回到本地加载,也就是getLocally(key)
。
func (g *Group) load(key string) (value ByteView, err error) {
if g.peers != nil {
if peer, ok := g.peers.PickPeer(key); ok {
if value, err = g.getFromPeer(peer, key); err == nil {
return value, nil
}
log.Println("[GeeCache] Failed to get from peer", err)
}
}
return g.getLocally(key)
}
接下来又到了调用http.go
中的Get
方法。
首先创建具体的 HTTP 客户端类 httpGetter
,实现 PeerGetter 接口
。
baseURL
表示将要访问的远程节点的地址,例如 http://example.com/_geecache/
。
使用 http.Get()
方式获取返回值(http.Get(u)
是发送一个 HTTP GET 请求到构造好的 URL),并转换为 []bytes
类型。
type httpGetter struct {
baseURL string
}
func (h *httpGetter) Get(group string, key string) ([]byte, error) {
u := fmt.Sprintf(
"%v%v/%v",
h.baseURL,
url.QueryEscape(group),
url.QueryEscape(key),
)
res, err := http.Get(u)
if err != nil {
return nil, err
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
return nil, fmt.Errorf("server returned: %v", res.Status)
}
bytes, err := ioutil.ReadAll(res.Body)
if err != nil {
return nil, fmt.Errorf("reading response body: %v", err)
}
return bytes, nil
}
var _ PeerGetter = (*httpGetter)(nil)
接下来讲讲具体代码的作用:
url.QueryEscape(group)
和 url.QueryEscape(key)
:使用 url.QueryEscape
函数对缓存组名称和键进行 URL 编码,确保它们可以安全地嵌入到 URL 中。
var _ PeerGetter = (*httpGetter)(nil)
这行代码就比较熟悉了:通过类型断言,用于确保 *httpGetter
类型实现了 PeerGetter
接口。
也就是通过将 (*httpGetter)(nil)
赋值给 PeerGetter 类型的变量,强制编译器检查 *httpGetter
是否实现了 PeerGetter
接口。
所以在geecache.go
中,我们一共实现需要添加下面的代码。
// A Group is a cache namespace and associated data loaded spread over
type Group struct {
name string
getter Getter
mainCache cache
peers PeerPicker
}
// RegisterPeers registers a PeerPicker for choosing remote peer
func (g *Group) RegisterPeers(peers PeerPicker) {
if g.peers != nil {
panic("RegisterPeerPicker called more than once")
}
g.peers = peers
}
func (g *Group) load(key string) (value ByteView, err error) {
if g.peers != nil {
if peer, ok := g.peers.PickPeer(key); ok {
if value, err = g.getFromPeer(peer, key); err == nil {
return value, nil
}
log.Println("[GeeCache] Failed to get from peer", err)
}
}
return g.getLocally(key)
}
func (g *Group) getFromPeer(peer PeerGetter, key string) (ByteView, error) {
bytes, err := peer.Get(g.name, key)
if err != nil {
return ByteView{}, err
}
return ByteView{b: bytes}, nil
}
新增 RegisterPeers()
方法,将 实现了 PeerPicker
接口的 HTTPPool
注入到 Group 中。
新增 getFromPeer()
方法,使用实现了 PeerGetter
接口的 httpGetter
从访问远程节点,获取缓存值。
修改 load
方法,使用 PickPeer()
方法选择节点,若非本机节点,则调用 getFromPeer()
从远程获取。若是本机节点或失败,则回退到 getLocally()
。
五、测试代码
测试总体代码如下:
var db = map[string]string{
"Tom": "630",
"Jack": "589",
"Sam": "567",
}
func createGroup() *geecache.Group {
return geecache.NewGroup("scores", 2<<10, geecache.GetterFunc(
func(key string) ([]byte, error) {
log.Println("[SlowDB] search key", key)
if v, ok := db[key]; ok {
return []byte(v), nil
}
return nil, fmt.Errorf("%s not exist", key)
}))
}
func startCacheServer(addr string, addrs []string, gee *geecache.Group) {
peers := geecache.NewHTTPPool(addr)
peers.Set(addrs...)
gee.RegisterPeers(peers)
log.Println("geecache is running at", addr)
log.Fatal(http.ListenAndServe(addr[7:], peers))
}
func startAPIServer(apiAddr string, gee *geecache.Group) {
http.Handle("/api", http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
key := r.URL.Query().Get("key")
view, err := gee.Get(key)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/octet-stream")
w.Write(view.ByteSlice())
}))
log.Println("fontend server is running at", apiAddr)
log.Fatal(http.ListenAndServe(apiAddr[7:], nil))
}
func main() {
var port int
var api bool
flag.IntVar(&port, "port", 8001, "Geecache server port")
flag.BoolVar(&api, "api", false, "Start a api server?")
flag.Parse()
apiAddr := "http://localhost:9999"
addrMap := map[int]string{
8001: "http://localhost:8001",
8002: "http://localhost:8002",
8003: "http://localhost:8003",
}
var addrs []string
for _, v := range addrMap {
addrs = append(addrs, v)
}
gee := createGroup()
if api {
go startAPIServer(apiAddr, gee)
}
startCacheServer(addrMap[port], addrs, gee)
}
接下来挨个讲讲对应的功能。
定义了一个模拟的数据库,使用一个字符串到字符串的映射来存储键值对。它模拟了后端存储,用于在缓存未命中时提供数据。
也就是提供Getter回调函数,方便在没有找到数据的时候来返回。
var db = map[string]string{
"Tom": "630",
"Jack": "589",
"Sam": "567",
}
创建了一个缓存组,名为 “scores”,缓存大小为 2KB。它使用了一个自定义的 GetterFunc,当缓存未命中时,会从模拟数据库 db(也就是上面的本地db模拟数据库) 中获取数据。如果键不存在,则返回错误。
func createGroup() *geecache.Group {
return geecache.NewGroup("scores", 2<<10, geecache.GetterFunc(
func(key string) ([]byte, error) {
log.Println("[SlowDB] search key", key)
if v, ok := db[key]; ok {
return []byte(v), nil
}
return nil, fmt.Errorf("%s not exist", key)
}))
}
启动了一个缓存服务器,监听指定的地址 addr。它创建了一个 HTTP 节点池 peers,并将所有节点地址 addrs 添加到池中。然后,它将节点池注册到缓存组 gee 中,并启动 HTTP 服务以监听缓存请求。
func startCacheServer(addr string, addrs []string, gee *geecache.Group) {
peers := geecache.NewHTTPPool(addr)
peers.Set(addrs...)
gee.RegisterPeers(peers)
log.Println("geecache is running at", addr)
log.Fatal(http.ListenAndServe(addr[7:], peers))
}
启动了一个 API 服务器,监听指定的地址 apiAddr。它为 /api 路径提供了一个 HTTP 处理函数,该函数从请求中提取键 key,并从缓存组 gee 中获取数据。如果获取成功,它将数据作为响应返回;如果失败,则返回错误。
func startAPIServer(apiAddr string, gee *geecache.Group) {
http.Handle("/api", http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
key := r.URL.Query().Get("key")
view, err := gee.Get(key)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/octet-stream")
w.Write(view.ByteSlice())
}))
log.Println("fontend server is running at", apiAddr)
log.Fatal(http.ListenAndServe(apiAddr[7:], nil))
}
通过命令行参数解析来设置缓存服务器的端口和是否启动 API 服务器。它定义了一个地址映射 addrMap,用于存储所有缓存节点的地址。然后,它创建了一个缓存组 gee,并根据参数决定是否启动 API 服务器。最后,它启动一个缓存服务器,监听指定的端口。
func main() {
var port int
var api bool
flag.IntVar(&port, "port", 8001, "Geecache server port")
flag.BoolVar(&api, "api", false, "Start a api server?")
flag.Parse()
apiAddr := "http://localhost:9999"
addrMap := map[int]string{
8001: "http://localhost:8001",
8002: "http://localhost:8002",
8003: "http://localhost:8003",
}
var addrs []string
for _, v := range addrMap {
addrs = append(addrs, v)
}
gee := createGroup()
if api {
go startAPIServer(apiAddr, gee)
}
startCacheServer(addrMap[port], addrs, gee)
}
也就是总的来说,startCacheServer()
用来启动缓存服务器:创建 HTTPPool,添加节点信息,注册到 gee 中,启动 HTTP 服务(共3个端口,8001/8002/8003),用户不感知。
startAPIServer()
用来启动一个 API 服务(端口 9999),与用户进行交互,用户感知。
main() 函数需要命令行传入 port 和 api 2 个参数,用来在指定端口启动 HTTP 服务。
测试会发现一个明显的问题,就是测试的时候,并发了 3 个请求 ?key=Tom
,从日志中可以看到,三次均选择了节点 8001,这是一致性哈希算法的功劳。但是有一个问题在于,同时向 8001 发起了 3 次请求。
假如有 10 万个在并发请求该数据呢?那就会向 8001 同时发起 10 万次请求,如果 8001 又同时向数据库发起 10 万次查询请求,很容易导致缓存被击穿。
三次请求的结果是一致的,对于相同的 key,能不能只向 8001 发起一次请求?这就是需要优化的点,也就是防止缓存击穿。