设计思想
singleflight是Go语言中的一个开源库;主要作用的是避免对同一个任务的重复请求。
它的设计思想主要是通过一个map和一个互斥锁来实现对任务请求的管理。
// Group 一个工作单元,可以管理多个singleFlight任务
type Group struct {
mu sync.Mutex // protects m
m map[string]*call // lazily initialized
}
map中的每一个key对应一个call,call中保存了请求的结果和一个channel,用于阻塞和唤醒请求;
当一个新的请求来时,首先会通过map查找是否有相同的请求正在进行,如果有则等待,如果没有则新建一个请求并保存到map中。
// call 任务单次执行时,会提前创建call结构,用于任务执行、阻塞相同key的请求
type call struct {
wg sync.WaitGroup
// 任务返回结果,在任务执行完成后写入
val interface{}
err error
// 重复的任务数
dups int
// 异步任务执行结束后,会遍历chans通知结果
chans []chan<- Result
}
调用流程
当调用singleflight的Do方法时,首先会检查是否有相同key的请求正在进行,如果有则等待,如果没有则新建一个请求。
请求点执行方式有两种,根据调用方式的不同分为 同步、异步请求。
Do
:结果保存在call.val中,调用后会阻塞,直到单次任务执行完成;
DoChan
:调用后返回一个ch,任务执行完成后将结果通知到ch中。
解决了哪些问题
singleFlight主要解决了对同一个任务的重复执行问题;如
- 缓存击穿,redis缓存过期,大量请求同时到达服务器,导致数据库访问量增高;
- 配置加载,并发情况下拉取资源,本地缓存还没有生效,多个请求都会从远程服务获取资源;
通过singleflight,可以保证对同一个任务的请求在同一时间只有一个,避免了一些耗时较长任务的重复执行。
什么场景下不适合使用
虽然singleflight可以有效避免对同一个任务的重复请求,但是在一些场景下可并不适合,如
- 任务查询的效率很高,而且调用频率不高,那使用singleFlight会带来额外的加锁、解锁开销;
- 请求要求实时返回,使用singleflight会导致一些请求阻塞,然后集中返回,导致部分请求响应时间长,而且返回的瞬间会有瞬时的高峰。
使用示例
var group = singleflight.Group{}
func GetSceneConfig(name string) *SceneConfig {
uniqKey := fmt.Sprintf("uniq_key:%s", name)
group.Do(uniqKey, func() (interface{}, error) {
return sceneMapper.GetConfig(name)
})
}
源码展示
// Copyright 2013 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package singleflight provides a duplicate function call suppression
// mechanism.
package singleflight // import "golang.org/x/sync/singleflight"
import (
"bytes"
"errors"
"fmt"
"runtime"
"runtime/debug"
"sync"
)
// errGoexit indicates the runtime.Goexit was called in
// the user given function.
var errGoexit = errors.New("runtime.Goexit was called")
// panicError 在执行给定任务过程中发生panic异常时,抛出的err类型
type panicError struct {
value interface{}
stack []byte
}
// Error implements error interface.
func (p *panicError) Error() string {
return fmt.Sprintf("%v\n\n%s", p.value, p.stack)
}
func (p *panicError) Unwrap() error {
err, ok := p.value.(error)
if !ok {
return nil
}
return err
}
func newPanicError(v interface{}) error {
stack := debug.Stack()
// The first line of the stack trace is of the form "goroutine N [status]:"
// but by the time the panic reaches Do the goroutine may no longer exist
// and its status will have changed. Trim out the misleading line.
if line := bytes.IndexByte(stack[:], '\n'); line >= 0 {
stack = stack[line+1:]
}
return &panicError{value: v, stack: stack}
}
// call 任务单次执行时,会提前创建call结构,用户任务执行与阻塞
type call struct {
wg sync.WaitGroup
// 任务返回结果,在任务执行完成后写入
val interface{}
err error
// 重复的任务数
dups int
chans []chan<- Result
}
// Group 一个工作单元,可以管理多个singleFlight任务
type Group struct {
mu sync.Mutex // protects m
m map[string]*call // lazily initialized
}
// Result holds the results of Do, so they can be passed
// on a channel.
type Result struct {
Val interface{}
Err error
Shared bool
}
// Do singleFlight任务执行入口,在任务没有执行过程中多次重复key的调用会被阻塞等待第一次任务执行完成才返回。
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok {
c.dups++
g.mu.Unlock()
c.wg.Wait()
if e, ok := c.err.(*panicError); ok {
panic(e)
} else if c.err == errGoexit {
runtime.Goexit()
}
return c.val, c.err, true
}
c := new(call)
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
g.doCall(c, key, fn)
return c.val, c.err, c.dups > 0
}
// DoChan 异步执行单次任务,执行完成后,把结果通过ch的方式返回。
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
ch := make(chan Result, 1)
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok {
c.dups++
c.chans = append(c.chans, ch)
g.mu.Unlock()
return ch
}
c := &call{chans: []chan<- Result{ch}}
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
go g.doCall(c, key, fn)
return ch
}
// doCall 任务在这里真正执行,结果会写入到c.val中,如果[]chan有值,也会依次发送。
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
normalReturn := false
recovered := false
// use double-defer to distinguish panic from runtime.Goexit,
// more details see https://golang.org/cl/134395
defer func() {
// the given function invoked runtime.Goexit
if !normalReturn && !recovered {
c.err = errGoexit
}
g.mu.Lock()
defer g.mu.Unlock()
c.wg.Done()
if g.m[key] == c {
delete(g.m, key)
}
if e, ok := c.err.(*panicError); ok {
// In order to prevent the waiting channels from being blocked forever,
// needs to ensure that this panic cannot be recovered.
if len(c.chans) > 0 {
go panic(e)
select {} // Keep this goroutine around so that it will appear in the crash dump.
} else {
panic(e)
}
} else if c.err == errGoexit {
// Already in the process of goexit, no need to call again
} else {
// Normal return
for _, ch := range c.chans {
ch <- Result{c.val, c.err, c.dups > 0}
}
}
}()
func() {
defer func() {
if !normalReturn {
// Ideally, we would wait to take a stack trace until we've determined
// whether this is a panic or a runtime.Goexit.
//
// Unfortunately, the only way we can distinguish the two is to see
// whether the recover stopped the goroutine from terminating, and by
// the time we know that, the part of the stack trace relevant to the
// panic has been discarded.
if r := recover(); r != nil {
c.err = newPanicError(r)
}
}
}()
c.val, c.err = fn()
normalReturn = true
}()
if !normalReturn {
recovered = true
}
}
// 忽略key对应的之前到达的key,调用Forget之后,下次传入key,对应的任务会重新执行。
func (g *Group) Forget(key string) {
g.mu.Lock()
delete(g.m, key)
g.mu.Unlock()
}