前言
redis的核心是数据的快速存储,下面就来分析一下godis的底层存储是如何实现,先分析单机服务。
此文采用抓大放小原则,先大的流程方向,再抓细节。
流程图
源码分析
现在以客户端连接,并发起set key val命令为例子
在单机部署的时候,服务启动,会创建一个处理实例,并创建一个单机的db
// redis/server.go
// 创建一个处理实例
// MakeHandler creates a Handler instance
func MakeHandler() *Handler {
// redis的一个存储引擎
var db database.DB
// 创建是集群还是单例
if config.Properties.ClusterEnable {
db = cluster.MakeCluster()
} else {
db = database2.NewStandaloneServer()
}
return &Handler{
db: db,
}
}
有客户端连接,会生成一个异步方法处理每个客户端,一旦有客户端的消息,都会进入Handle方法。
// redis/server/server.go
// 处理接收到客户端的命令
// Handle receives and executes redis commands
func (h *Handler) Handle(ctx context.Context, conn net.Conn) {
if h.closing.Get() {
// closing handler refuse new connection
_ = conn.Close()
return
}
client := connection.NewConn(conn)
// 存储一个客户端
h.activeConn.Store(client, struct{}{})
// 获取字符串
ch := parser.ParseStream(conn)
// 接收客户端数据
for payload := range ch {
// 遍历消息体
// ......... 经过各种校验
// 获取到客户端信息
r, ok := payload.Data.(*protocol.MultiBulkReply)
if !ok {
logger.Error("require multi bulk protocol")
continue
}
// 执行结果
result := h.db.Exec(client, r.Args)
// 结果回复
if result != nil {
_, _ = client.Write(result.ToBytes())
} else {
_, _ = client.Write(unknownErrReplyBytes)
}
}
}
客户端的各种命令进行判断,set是属于正常的数据操作命令,直接通过判断,获取数据库,并在当前数据库中执行
// database/server.go
func (server *Server) Exec(c redis.Connection, cmdLine [][]byte) (result redis.Reply) {
defer func() {
if err := recover(); err != nil {
logger.Warn(fmt.Sprintf("error occurs: %v\n%s", err, string(debug.Stack())))
result = &protocol.UnknownErrReply{}
}
}()
cmdName := strings.ToLower(string(cmdLine[0]))
// ping
if cmdName == "ping" {
return Ping(c, cmdLine[1:])
}
// authenticate
if cmdName == "auth" {
return Auth(c, cmdLine[1:])
}
// ........
// 各种各样的判断,暂时不管
// 获取当前的数据索引
// normal commands
dbIndex := c.GetDBIndex()
// 获取当前数据库
selectedDB, errReply := server.selectDB(dbIndex)
if errReply != nil {
return errReply
}
// 以当前数据库,执行命令
return selectedDB.Exec(c, cmdLine)
}
命令名称解析出来后,从cmdTable获取对应的执行方法,如prepare、executor
// Exec executes command within one database
func (db *DB) Exec(c redis.Connection, cmdLine [][]byte) redis.Reply {
// transaction control commands and other commands which cannot execute within transaction
cmdName := strings.ToLower(string(cmdLine[0]))
// ...
return db.execNormalCommand(cmdLine)
}
func (db *DB) execNormalCommand(cmdLine [][]byte) redis.Reply {
// 获取到正常的执行命令
cmdName := strings.ToLower(string(cmdLine[0]))
// 获取到commond
cmd, ok := cmdTable[cmdName]
if !ok {
return protocol.MakeErrReply("ERR unknown command '" + cmdName + "'")
}
if !validateArity(cmd.arity, cmdLine) {
return protocol.MakeArgNumErrReply(cmdName)
}
prepare := cmd.prepare
write, read := prepare(cmdLine[1:])
db.addVersion(write...)
// 数据库上锁
db.RWLocks(write, read)
// 命令执行完后解锁
defer db.RWUnLocks(write, read)
// 执行命令方法
fun := cmd.executor
return fun(db, cmdLine[1:])
}
set命令对应的方法,从代码可以发现,其实数据是存储在定义的map结构的集合中,自此,命令已经执行完毕,返回执行结果。
func execSet(db *DB, args [][]byte) redis.Reply {
// 提取key
key := string(args[0])
// 提取val
value := args[1]
// 提取策略
policy := upsertPolicy
// 提取过期时间
ttl := unlimitedTTL
// parse options
// 如何参数大于2个,说明有其他参数,需要做其他处理
// .....
entity := &database.DataEntity{
Data: value,
}
var result int
// 更新策略
switch policy {
case upsertPolicy:
// 默认策略
db.PutEntity(key, entity)
result = 1
case insertPolicy:
result = db.PutIfAbsent(key, entity)
case updatePolicy:
result = db.PutIfExists(key, entity)
}
if result > 0 {
if ttl != unlimitedTTL {
expireTime := time.Now().Add(time.Duration(ttl) * time.Millisecond)
// 设置过期时间
db.Expire(key, expireTime)
db.addAof(CmdLine{
[]byte("SET"),
args[0],
args[1],
})
db.addAof(aof.MakeExpireCmd(key, expireTime).Args)
} else {
db.Persist(key) // override ttl
db.addAof(utils.ToCmdLine3("set", args...))
}
}
if result > 0 {
return &protocol.OkReply{}
}
return &protocol.NullBulkReply{}
}
// database.go
// 将数据放入DB
// PutEntity a DataEntity into DB
func (db *DB) PutEntity(key string, entity *database.DataEntity) int {
// 当前数据库的数据字段
ret := db.data.PutWithLock(key, entity)
// db.insertCallback may be set as nil, during `if` and actually callback
// so introduce a local variable `cb`
if cb := db.insertCallback; ret > 0 && cb != nil {
cb(db.index, key, entity)
}
return ret
}
// datastruct/dict/concurrent.go
// ConcurrentDict is thread safe map using sharding lock
// 这里可以看出,数据其实就是存在map集合里面
type ConcurrentDict struct {
table []*shard
count int32
shardCount int
}
type shard struct {
m map[string]interface{}
mutex sync.RWMutex
}
// datastruct/dict/concurrent.go
func (dict *ConcurrentDict) PutWithLock(key string, val interface{}) (result int) {
if dict == nil {
panic("dict is nil")
}
hashCode := fnv32(key)
index := dict.spread(hashCode)
s := dict.getShard(index)
// 将数据放入map中
if _, ok := s.m[key]; ok {
s.m[key] = val
return 0
}
dict.addCount()
// 存储kv结构数据,完成
s.m[key] = val
return 1
}
其实还有一个问题,就是cmdTable怎么来的,为什么fun(db, cmdLine[1:])就完成了?
在router.go这个代码中,是生成一个新的cmdTable的map集合;registerCommand这个函数是将各种命令塞入cmdTable里面。每个数据结构如string等都有定义的方法。
main启动前都会调用init(),这个是golang特殊的函数,顺序按照文件的顺序执行。
这里就是在服务启动前,将所有命令注册到cmdTable集合。
// database/router.go
// 命令集
var cmdTable = make(map[string]*command)
// ....
// 注册命令,将命令存放在cmdTable集合里面
// registerCommand registers a normal command, which only read or modify a limited number of keys
func registerCommand(name string, executor ExecFunc, prepare PreFunc, rollback UndoFunc, arity int, flags int) *command {
name = strings.ToLower(name)
cmd := &command{
name: name,
executor: executor,
prepare: prepare,
undo: rollback,
arity: arity,
flags: flags,
}
cmdTable[name] = cmd
return cmd
}
//========================================
// database/string.go
func execSet(db *DB, args [][]byte) redis.Reply {
//....
}
// execSetNX sets string if not exists
func execSetNX(db *DB, args [][]byte) redis.Reply {
// .....
}
// execSetEX sets string and its ttl
func execSetEX(db *DB, args [][]byte) redis.Reply {
// ...
}
func init() {
// 调用注册命令函数,注册方法,如Set则是执行execSet方法
registerCommand("Set", execSet, writeFirstKey, rollbackFirstKey, -3, flagWrite).
attachCommandExtra([]string{redisFlagWrite, redisFlagDenyOOM}, 1, 1, 1)
registerCommand("SetNx", execSetNX, writeFirstKey, rollbackFirstKey, 3, flagWrite).
attachCommandExtra([]string{redisFlagWrite, redisFlagDenyOOM, redisFlagFast}, 1, 1, 1)
registerCommand("SetEX", execSetEX, writeFirstKey, rollbackFirstKey, 4, flagWrite).
attachCommandExtra([]string{redisFlagWrite, redisFlagDenyOOM}, 1, 1, 1)
// .....
}
ExecFunc是规范方法,每个命令对应的执行都按照规范定义。
// database/router.go
type command struct {
// 命令名称
name string
// 执行方法
executor ExecFunc
// prepare returns related keys command
prepare PreFunc
// undo generates undo-log before command actually executed, in case the command needs to be rolled back
undo UndoFunc
// arity means allowed number of cmdArgs, arity < 0 means len(args) >= -arity.
// for example: the arity of `get` is 2, `mget` is -2
arity int
flags int
extra *commandExtra
}
// ========================================
// database/database.go
// 执行方法接口
// ExecFunc is interface for command executor
// args don't include cmd line
type ExecFunc func(db *DB, args [][]byte) redis.Reply