Redis如何避免数据丢失?——RDB

目录

1. RDB机制的配置

2. fork()函数和写时复制(Copy On Write)

什么是Copy On Write

系统fork中使用Copy On Write机制

3. RDB文件结构

RDB文件内容和内容顺序

InfoAuxFields是rdb信息数据

 数据库数据

数据

数据存储格式

字符串编码

操作码

4. RDB的2个触发时机

RDB定时生成 

 rdbSave函数

rdbSaveRio函数写数据到rdb文件

5. Redis重启,加载RDB 文件


Redis是把数据储存在内存的键值数据库,而服务器一旦宕机,那内存中的数据将全部丢失。像MySQL那样,是有宕机后数据恢复机制的。那Redis也是有的,其有两种方式:AOF和RDB。该文章讲解RDB(Redis Database

RDB会在不同的时间点将内存中数据库快照以二进制形式写入到磁盘。

备注:RDB机制是比较好理解的,就是其文件的格式有点不好弄懂,因为是二进制内容,其写的格式就有点绕吧。要是看不太懂,可以先跳过RDB文件结构这节,不过要理解RDB的内容顺序才行,不然看关于写RDB文件的代码时候会比较乱。

1. RDB机制的配置

达到某些条件,就会进行RDB快照,而这些条件是在redis.conf文件中配置的。

#格式 save <seconds> <changes>
#在 seconds 时间内如果变更的键数量不少于 changes,则生成一次 RDB

 save 3600 1    # 3600 秒内,至少变更 1 次,才会自动备份
 save 300 100
 save 60 10000

save  #这样是表示停用RDB
#save  #注释掉save, 也是表示停用RDB的

rdbcompression yes    #对于存储到磁盘中的快照,是否启动 LZF 压缩算法,yes表示使用
rdbchecksum yes    #否启动 CRC64 算法进行数据校验,默认是启用,这是会增加cpu消耗
dbfilename dump.rdb    #快照备份文件的名字
rdb-del-sync-files no

save的配置会存储到server.saveparam。

struct redisServer {
    .............
    long long dirty;                /* Changes to DB from the last save */
    long long dirty_before_bgsave;  /* Used to restore dirty on failed BGSAVE */
    struct saveparam *saveparams;   /* Save points array for RDB */
    int saveparamslen;              /* Number of saving points */
    time_t lastsave;                /* Unix time of last successful save */
};

struct saveparam {
    time_t seconds;
    int changes;
};

int main(int argc, char **argv) {
    initServerConfig();
    ........................
    if (argc >= 2) {    //表示是指定配置文件启动的,比如: redis-server ./redis.conf
        .......................
        loadServerConfig(server.configfile, config_from_stdin, options);
    }
    ................
}

void initServerConfig(void) {
    //这些是默认的配置
    appendServerSaveParams(60*60,1);  /* save after 1 hour and 1 change */
    appendServerSaveParams(300,100);  /* save after 5 minutes and 100 changes */
    appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
    .......................
}

函数调用: loadServerConfig————>loadServerConfigFromString
void loadServerConfigFromString(char *config) {
    int linenum = 0, totlines, i;
    sds *lines;
    int save_loaded = 0;

    lines = sdssplitlen(config,strlen(config),"\n",1,&totlines);
    for (i = 0; i < totlines; i++) {
        sds *argv;
        int argc;

        linenum = i+1;
        lines[i] = sdstrim(lines[i]," \t\r\n");
        /* Skip comments and blank lines */
        if (lines[i][0] == '#' || lines[i][0] == '\0') continue;
        /* Split into arguments */
        argv = sdssplitargs(lines[i],&argc);
        ...................................
        /* Execute config directives */    //没有展示其他配置的解析........
        if (!strcasecmp(argv[0],"bind") && argc >= 2) {
          ..............
        } else if (!strcasecmp(argv[0],"save")) {  //就是这解析save的配置内容
            if (!save_loaded) {
                save_loaded = 1;
                resetServerSaveParams();
            }
            if (argc == 3) {
                int seconds = atoi(argv[1]);
                int changes = atoi(argv[2]);
                if (seconds < 1 || changes < 0) {
                    err = "Invalid save parameters"; goto loaderr;
                }
                appendServerSaveParams(seconds,changes);
            } else if (argc == 2 && !strcasecmp(argv[1],"")) {
                resetServerSaveParams();
            }
        }
    }
    .....................
}

void appendServerSaveParams(time_t seconds, int changes) {
    server.saveparams = zrealloc(server.saveparams,sizeof(struct saveparam)*(server.saveparamslen+1));
    server.saveparams[server.saveparamslen].seconds = seconds;
    server.saveparams[server.saveparamslen].changes = changes;
    server.saveparamslen++;
}

2. fork()函数和写时复制(Copy On Write)

fork函数执行后,会产生一个新的子进程。其子进程会复制父进程的数据与堆栈空间,并继承父进程的用户代码、环境变量、已打来的文件代码等。而这里的复制就是使用了写时复制(COW,Copy on Write)机制。

什么是Copy On Write

写时复制的核心思想是 若有多个调用者同时请求相同资源,他们会共同获取相同的指针指向相同的资源,直到某个调用者视图修改资源的内容是,系统才会真正复制一个专用副本给该调用者,而其他调用者所见的最初的资源仍然保持不变,而该过程对其他的调用折是透明的,不被感知的。

此作法主要的优点是如果调用者没有修改该资源,就不会有副本(private copy)被建立,因此多个调用者只是读取操作时可以共享同一份资源。

系统fork中使用Copy On Write机制

而在使用fork创建子进程时候,只有当进程需要修改资源时候才会真正执行数据的拷贝,之后就再进程自己的数据段进行修改数据。否则,若是进程只有读请求,实际上不需要执行资源的复制操作,只是需要对不同进程之间维护对资源的引用即可。

在fork()后,内核把父进程中的所有内存页的权限都设置为read-only,然后子进程的地址空间指向父进程

当某个进程写内存时候,cpu检测到该内存页是read-only,就会触发异常中断(page-fault),陷入内核的一个中断例程。该中断例程中,内核会把触发异常的页复制一份,之后,父子进程就各自持有独立的一份了,之后进程再修改对应的数据。 

其好处:

  • 减少分配和复制大量资源时带来的瞬间延时
  • 可减少不必要的资源分配。比如fork进程时,并不是所有的页面都需要复制,父进程的代码段和只读数据段都不被允许修改,所以无需复制

缺点

  • 如果在fork()之后,父子进程都还需要继续进行写操作,那么会产生大量的分页错误(页异常中断page-fault),这样就得不偿失。

3. RDB文件结构

要先了解二进制文件存储的内容是怎样的。一条客户端命令是以什么格式存储到RDB文件中的。 

RDB文件内容和内容顺序

按照大内容来分的话,有以下几部分。

  • 文件头:这部分内容保存了 Redis 的魔数(即是REDIS)、RDB 版本、Redis 版本、RDB 信息数据InfoAuxFields模块数据
  • 文件数据部分:这部分保存了 Redis 数据库实际的所有键值对 和 键值对占用的内存大小等信息和lua脚本数据
  • 文件尾:这部分保存了 RDB 文件的结束标识符OP_EOF,以及整个文件的校验值checksum。这个校验值用来在 Redis server 加载 RDB 文件后,检查文件是否被篡改过。

InfoAuxFields是rdb信息数据

  • key 为 redis-ver, value 为当前 Redis 的版本, 比如 6.2.6 版本
  •  key 为 redis-bit, value 为当前 Redis 的位数, 64 位 / 32 位
  • key 为 ctime, value 为 RDB 创建时的时间戳
  • key 为 used-mem, value 为 dump 时 Redis 占的内存, 单位字节
  • key 为 repl-steam-db, 和主从复制相关, 在 server.master 客户端中选择的数据库, 这个不一定有, 只有在当前的 RDB 文件是用作主从复制时才有值, 数据持久化时, 没有这个属性
  •  key 为 repl-id, 和主从复制相关, 当前实例 replication ID, 这个不一定有, 只有当前的 RDB 文件是用作主从复制时, 不是数据持久化时, 才有
  • key 为 repl-offset, 和主从复制相关, 当前实例复制的偏移量, 这个不一定有, 只有当前的 RDB 文件是用作主从复制时, 不是数据持久化时, 才有
  • key 为 aof-preamble, value 为是否开启了 aof/rdb 的混合使用

 数据库数据

  • OP_SELECTDB: 首先是1个字节选db的操作数,后面是一个整数标识具体的dbId,这个整数是进行编码的,字节数个数范围是[1,9]。
  • OP_RESIZEDB: 这个操作数标识后面接着的是这个db的数据大小dbsize,以及过期键的大小expireSize
  • 数据: 一个个keyValue的键值对,每种数据类型的具体存储数据

数据

上图中虚线表示的是不一定会存在的数据

  • OP_EXPIRETIME_MS: 一个字节表示后续的是这条数据的过期时间
  • OP_IDLE: 一个字节表示后续的是这条数据的过期时间 (lru)
  • OP_FREQ: 一个字节表示后续接着的是这条数据的访问计数个数 (lfu)
  • value_type: 这条数据的类型
  • key: 一个字符串存储这条数据的key
  • value: 每种数据结构具体的数据类型
/* Map object types to RDB object types. Macros starting with OBJ_ are for
 * memory storage and may change. Instead RDB types must be fixed because
 * we store them on disk. */
#define RDB_TYPE_STRING 0
#define RDB_TYPE_LIST   1
#define RDB_TYPE_SET    2
#define RDB_TYPE_ZSET   3
#define RDB_TYPE_HASH   4
#define RDB_TYPE_ZSET_2 5 /* ZSET version 2 with doubles stored in binary. */
#define RDB_TYPE_MODULE 6
#define RDB_TYPE_MODULE_2 7 /* Module value with annotations for parsing without
                               the generating module being loaded. */
/* NOTE: WHEN ADDING NEW RDB TYPE, UPDATE rdbIsObjectType() BELOW */

/* Object types for encoded objects. */
#define RDB_TYPE_HASH_ZIPMAP    9
#define RDB_TYPE_LIST_ZIPLIST  10
#define RDB_TYPE_SET_INTSET    11
#define RDB_TYPE_ZSET_ZIPLIST  12
#define RDB_TYPE_HASH_ZIPLIST  13
#define RDB_TYPE_LIST_QUICKLIST 14
#define RDB_TYPE_STREAM_LISTPACKS 15

这几个就是数据类型的定义

数据存储格式

RDB文件是二进制格式的,上图是数据的顺序,那每个数据是怎么存储的呢?总体就是确定内容的长度,之后再写入内容

而确定内容的长度,有两种方式

  1. 内容的长度是定长的, 我们就给他制定特有的内容类型, 这个内容类型本身就代表了后面内容的长度
  2. 内容的长度是不定长的, 就通过自定义的一套整数规则, 在内容前面加上一个符合整数规则的数字, 表示内容的长度

字符串编码

rbd对字符串编码主要是先看第一个字节的前两位,如果前两位不是11则表示的是以字符串的形式存储的,其不同的编码方式来计算出这个字符串的长度。

而对于前两位是11则表示这个字符串可以用整数来表示,而其不同的编码方式也表示这个整数的具体的值的长度。

最后还有一种前面两位以及最后两位都是11的表示这个字符串是通过lzf进行压缩的,而其后面则是这个压缩的字符串的具体相关的数据。 

操作码

上面的 RDB 文件的逻辑结构中, 可以发现有一些属性, 在某些情况下是没有的, 这会造成什么问题呢?
顺着二进制文件一直读下去, 虽然数据解析出来了, 但是我们不知道这个数据是什么。

为了应对这种不一定存在的情况, Redis 定义了一套 操作码, 通过操作码表示后面的数据是什么, 让解析出来的数据能真正赋值到对应的属性。 

 上图中紫色块的OP开头的就是操作码。

4. RDB的2个触发时机

  • 用户发送命令来手动生成RDB
    • save:由主进程执行,那么发送命令的客户端会被阻塞等到save结束,且为了保证镜像的一致,save期间Redis是只读的
    • bgsave:这个是交由Redis的子进程来生成快照,父进程可以继续处理客户端请求。
  • 通过定时函数serverCron,达到条件时自动生成RDB

用户手动发送bgsave生成RDB文件比较少见的,我们就只分析定时生成RDB文件。 

RDB定时生成 

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
    ...................................
    // 判断后台是否正在进行 RDB 或者 AOF 操作或者还有子进程阻塞在父级
    if (hasActiveChildProcess() || ldbPendingChildren()) {
        ..............................
    } else {     // 表示没有后台 RDB/AOF 在进行中,
            //进行检查是否需要立即开启 RDB/AOF
        // 遍历我们的触发规则列表  save 1000 1
        for (j = 0; j < server.saveparamslen; j++) {
            struct saveparam *sp = server.saveparams+j;

            if (server.dirty >= sp->changes &&  //当前修改过的key的数量 > 规则配置的key 修改数量值
                server.unixtime-server.lastsave > sp->seconds &&    //当前的时间 - 上次保存的时间 > 规则配置的时间频率 
                (server.unixtime-server.lastbgsave_try >
                 CONFIG_BGSAVE_RETRY_DELAY ||
                 server.lastbgsave_status == C_OK)){
                rdbSaveInfo rsi, *rsiptr;
                rsiptr = rdbPopulateSaveInfo(&rsi); //生成控制RDB生成的辅助变量rdbSaveInfo

                //开始RDB 保存数据
                rdbSaveBackground(server.rdb_filename,rsiptr);
                break;
            }
        }
    }
    ....................

    if (!hasActiveChildProcess() &&
        server.rdb_bgsave_scheduled &&
        (server.unixtime-server.lastbgsave_try > CONFIG_BGSAVE_RETRY_DELAY ||
         server.lastbgsave_status == C_OK)) {
        rdbSaveInfo rsi, *rsiptr;
        rsiptr = rdbPopulateSaveInfo(&rsi);
        if (rdbSaveBackground(server.rdb_filename,rsiptr) == C_OK)
            server.rdb_bgsave_scheduled = 0;
    }
    .........................
}

 上面的就是满足了配置好的规则触,就会执行rdbSaveBackground。其函数的实现如下:

  1. 若仍然有正在生成RDB文件的子进程,就立即返回,以防RDB定时任务和BGSAVE命令重叠。
  2. 保存当前的修改过的数据个数和更新时间戳, 并fork出子进程
  3. 在子进程中调用rdbSave函数生成RDB文件,并将子进程中修改的内容通过pipe发送给主进程,处理完后,会退出子进程。
  4. 主进程执行的,更新server的运行时数据,server.rdb_child_type 会记录刚刚 fork 的子进程 ID
int rdbSaveBackground(char *filename, rdbSaveInfo *rsi) {
    pid_t childpid;
    
    // 1 .若仍然有正在生成RDB的子进程,则返回
    if (hasActiveChildProcess()) return C_ERR;
    //2 .保存当前的脏数据量 和 更新时间戳
    server.dirty_before_bgsave = server.dirty;
    server.lastbgsave_try = time(NULL);
    //  .fork 子进程
    if ((childpid = redisFork(CHILD_TYPE_RDB)) == 0) {
        int retval;

        /* Child */
        redisSetProcTitle("redis-rdb-bgsave");
        //子进程绑定到用户配置的CPU列表上,减少不必要的进程上下文切换
        redisSetCpuAffinity(server.bgsave_cpulist); 
        //3  .生成RDB文件
        retval = rdbSave(filename,rsi);
        if (retval == C_OK) {
            //将子进程修改的内容,通过pipe发送给主进程
            sendChildCowInfo(CHILD_INFO_TYPE_RDB_COW_SIZE, "RDB");
        }
        exitFromChild((retval == C_OK) ? 0 : 1);    //退出子进程
    } else {
        // 4 .父进程执行的
        if (childpid == -1) {
            server.lastbgsave_status = C_ERR;
            return C_ERR;
        }
            server.rdb_save_time_start = time(NULL);
        server.rdb_child_type = RDB_CHILD_TYPE_DISK;
        return C_OK;
    }
    return C_OK; /* unreached */
}

 rdbSave函数

子进程执行的就是rdbSave函数:

  1. 根据pid生成临时文件名,然后打开该文件
  2. 初始化rio对象,并根据用户配置设置rio对象的参数
  3. 使用rdbSaveRio函数把内存数据写入到临时文件
  4. 刷盘,确保把数据写到磁盘;之后重命名,替换旧文件;更新server中关于RDB的属性
/* Save the DB on disk. Return C_ERR on error, C_OK on success. */
int rdbSave(char *filename, rdbSaveInfo *rsi) {
    char tmpfile[256];
    char cwd[MAXPATHLEN]; /* Current working dir path for error messages. */
    FILE *fp = NULL;
    rio rdb;
    int error = 0;
    // 1 根据 pid 生成临时文件名,然后打开文件
    snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
    fp = fopen(tmpfile,"w");
    //省略了对文件打开失败的处理...............
    // 2 初始化 rio 变量
    rioInitWithFile(&rdb,fp);
    startSaving(RDBFLAGS_NONE);
    // 根据用户配置设定 autosync
    if (server.rdb_save_incremental_fsync)
        rioSetAutoSync(&rdb,REDIS_AUTOSYNC_BYTES);
    // 3 将内存数据写入临时文件
    if (rdbSaveRio(&rdb,&error,RDBFLAGS_NONE,rsi) == C_ERR) {
        errno = error;
        goto werr;
    }

    // 4  刷盘,确保数据写到磁盘
    if (fflush(fp)) goto werr;
    if (fsync(fileno(fp))) goto werr;
    if (fclose(fp)) { fp = NULL; goto werr; }
    fp = NULL;
    
    // 5 重命名,替换旧文件
    if (rename(tmpfile,filename) == -1) {
        char *cwdp = getcwd(cwd,MAXPATHLEN);
        unlink(tmpfile);
        stopSaving(0);
        return C_ERR;
    }
    //更新server中的属性
    server.dirty = 0;
    server.lastsave = time(NULL);
    server.lastbgsave_status = C_OK;
    stopSaving(1);
    return C_OK;

werr:
    if (fp) fclose(fp);
    unlink(tmpfile);
    stopSaving(0);
    return C_ERR;
}

RDB 文件生成期间会频繁进行磁盘 I/O 操作。Redis 封装了 操作I/O的抽象层,叫做结构体rio,用来统一处理不同的存储介质(aof文件也是使用rio的)。上面代码中的rio对象就是这个。其相关的结构体如下:

struct _rio {
    // Backend functions.
    size_t (*read)(struct _rio *, void *buf, size_t len);
    size_t (*write)(struct _rio *, const void *buf, size_t len);
    off_t (*tell)(struct _rio *);
    int (*flush)(struct _rio *);
    void (*update_cksum)(struct _rio *, const void *buf, size_t len);

    /* The current checksum and flags (see RIO_FLAG_*) */
    uint64_t cksum, flags;

    /* number of bytes read or written */
    size_t processed_bytes;

    /* maximum single read or write chunk size */
    size_t max_processing_chunk;

    /* Backend-specific vars. */ //联合体
    union {
        /* In-memory buffer target. */
        struct {
            sds ptr;
            off_t pos;
        } buffer;
        /* Stdio file pointer target. */
        struct {
            FILE *fp;
            off_t buffered; /* Bytes written since last fsync. */
            off_t autosync; /* fsync after 'autosync' bytes written. */
        } file;
        /* Connection object (used to read from socket) */
        struct {
            connection *conn;   /* Connection */
            off_t pos;    /* pos in buf that was returned */
            sds buf;      /* buffered data */
            size_t read_limit;  /* don't allow to buffer/read more than that */
            size_t read_so_far; /* amount of data read from the rio (not buffered) */
        } conn;
        /* FD target (used to write to pipe). */
        struct {
            int fd;       /* File descriptor. */
            off_t pos;
            sds buf;
        } fd;
    } io;
};

typedef struct _rio rio;
  • readwriteflush:对底层存储介质的读、写、刷盘操作函数指针;
  • io:底层存储介质句柄,支持 file 文件、内存 buffer、conn 连接和管道文件描述符 fd。

rdbSaveRio函数写数据到rdb文件

该函数就是按照上面所讲的RDB文件格式和顺序把数据写入到文件中。

  1. 写入RDB_VERSION版本
  2. 写入辅助字段InfoAuxFields,写入模块数据
  3. 遍历所有的db(默认是16个db,默认是使用编号为0的db),把每个db的数据写入到文件中
    • 写入OPCODE_SELECT标志和数据库id
    •  写入 OPCODE_RESIZEDB 标志和数据库大小、过期字典大小
    •   遍历所有键值对,写入 rdb文件
  4. 把lua脚本内容写入rdb文件
  5. 写入模块内容
  6. 写入 EOF 文件结束标志
  7.  写入 CRC64 校验码
int rdbSaveRio(rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) {
    dictIterator *di = NULL;
    dictEntry *de;
    char magic[10];
    uint64_t cksum;
    size_t processed = 0;
    int j;
    long key_count = 0;
    long long info_updated_time = 0;
    char *pname = (rdbflags & RDBFLAGS_AOF_PREAMBLE) ? "AOF rewrite" :  "RDB";
    
    // 开启了 RDB 文件校验码功能
    if (server.rdb_checksum)
        rdb->update_cksum = rioGenericUpdateChecksum;
    // 1  写入RDB_VERSION 版本
    snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION);
    if (rdbWriteRaw(rdb,magic,9) == -1) goto werr;
    // 2  写入辅助字段
    if (rdbSaveInfoAuxFields(rdb,rdbflags,rsi) == -1) goto werr;
    //触发模块回调,    这些模块,不明白
    if (rdbSaveModulesAux(rdb, REDISMODULE_AUX_BEFORE_RDB) == -1) goto werr;
    // 3  遍历所有编号的 db
    for (j = 0; j < server.dbnum; j++) {
        redisDb *db = server.db+j;
        dict *d = db->dict;
        if (dictSize(d) == 0) continue;
        di = dictGetSafeIterator(d);

       // 4  写入 OPCODE_SELECTDB 标志,数据库 id
        if (rdbSaveType(rdb,RDB_OPCODE_SELECTDB) == -1) goto werr;
        if (rdbSaveLen(rdb,j) == -1) goto werr;

        // 5  写入 OPCODE_RESIZEDB 标志和数据库大小、过期字典大小
        uint64_t db_size, expires_size;
        db_size = dictSize(db->dict);
        expires_size = dictSize(db->expires);
        if (rdbSaveType(rdb,RDB_OPCODE_RESIZEDB) == -1) goto werr;
        if (rdbSaveLen(rdb,db_size) == -1) goto werr;
        if (rdbSaveLen(rdb,expires_size) == -1) goto werr;

        //  6  遍历所有键值对,写入 rdb
        while((de = dictNext(di)) != NULL) {
            sds keystr = dictGetKey(de);
            robj key, *o = dictGetVal(de);
            long long expire;

            initStaticStringObject(key,keystr);
            expire = getExpire(db,&key);
            if (rdbSaveKeyValuePair(rdb,&key,o,expire) == -1) goto werr;
            .....................................
        }
        dictReleaseIterator(di);
        di = NULL; /* So that we don't release it again on error. */
    }

    //  所有 db 写完后,持久化 lua 脚本内容
    if (rsi && dictSize(server.lua_scripts)) {
        di = dictGetIterator(server.lua_scripts);
        while((de = dictNext(di)) != NULL) {
            robj *body = dictGetVal(de);
            if (rdbSaveAuxField(rdb,"lua",3,body->ptr,sdslen(body->ptr)) == -1)
                goto werr;
        }
        dictReleaseIterator(di);
        di = NULL; /* So that we don't release it again on error. */
    }

    if (rdbSaveModulesAux(rdb, REDISMODULE_AUX_AFTER_RDB) == -1) goto werr;

      // 写入 EOF 文件结束标志
    if (rdbSaveType(rdb,RDB_OPCODE_EOF) == -1) goto werr;

    //  写入 CRC64 校验码
    cksum = rdb->cksum;
    memrev64ifbe(&cksum);
    if (rioWrite(rdb,&cksum,8) == 0) goto werr;
    return C_OK;
    ........................
}

该函数内部有个核心函数rdbSaveKeyValuePair(在while循环中的),负责将内存数据键值对写入磁盘文件。

/* Save a key-value pair, with expire time, type, key, value. */
int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime) {
    int savelru = server.maxmemory_policy & MAXMEMORY_FLAG_LRU;
    int savelfu = server.maxmemory_policy & MAXMEMORY_FLAG_LFU;

    /* Save the expire time */
    if (expiretime != -1) {
        if (rdbSaveType(rdb,RDB_OPCODE_EXPIRETIME_MS) == -1) return -1;
        if (rdbSaveMillisecondTime(rdb,expiretime) == -1) return -1;
    }

    /* Save the LRU info. */
    if (savelru) {
        uint64_t idletime = estimateObjectIdleTime(val);
        idletime /= 1000; /* Using seconds is enough and requires less space.*/
        if (rdbSaveType(rdb,RDB_OPCODE_IDLE) == -1) return -1;
        if (rdbSaveLen(rdb,idletime) == -1) return -1;
    }

    /* Save the LFU info. */
    if (savelfu) {
        uint8_t buf[1];
        buf[0] = LFUDecrAndReturn(val);
        if (rdbSaveType(rdb,RDB_OPCODE_FREQ) == -1) return -1;
        if (rdbWriteRaw(rdb,buf,1) == -1) return -1;
    }

    /* Save type, key, value */
    if (rdbSaveObjectType(rdb,val) == -1) return -1; // 写入键值对类型标志
    if (rdbSaveStringObject(rdb,key) == -1) return -1; //写入key
    if (rdbSaveObject(rdb,val,key) == -1) return -1;  //写入value

    /* Delay return if required (for testing) */
    if (server.rdb_key_save_delay)
        debugDelay(server.rdb_key_save_delay);

    return 1;
}

可以看到,Redis写入RDB文件时,是调用了很多rdbSaveXXX函数的。不同的后缀就是用于写入不同类型的数据。以rdbSaveType为例:

int rdbSaveType(rio *rdb, unsigned char type) {
    return rdbWriteRaw(rdb,&type,1);
}

static ssize_t rdbWriteRaw(rio *rdb, void *p, size_t len) {
    if (rdb && rioWrite(rdb,p,len) == 0)
        return -1;
    return len;
}

static inline size_t rioWrite(rio *r, const void *buf, size_t len) {
    if (r->flags & RIO_FLAG_WRITE_ERROR) return 0;
    while (len) {
        size_t bytes_to_write = (r->max_processing_chunk && r->max_processing_chunk < len) ? r->max_processing_chunk : len;
        if (r->update_cksum) r->update_cksum(r,buf,bytes_to_write);
        if (r->write(r,buf,bytes_to_write) == 0) {
            r->flags |= RIO_FLAG_WRITE_ERROR;
            return 0;
        }
        buf = (char*)buf + bytes_to_write;
        len -= bytes_to_write;
        r->processed_bytes += bytes_to_write;
    }
    return 1;
}

5. Redis重启,加载RDB 文件

Redis 在启动过程中调用 loadDataFromDisk() 来完成 RDB 的加载。调用的函数链路:loadDataFromDisk——>rdbLoad——>rdbLoadRio

int main(int argc, char **argv) {
    .......................
    if (!server.sentinel_mode) {
        ..........
        loadDataFromDisk();
    }
}

/* Function called at startup to load RDB or AOF file in memory. */
void loadDataFromDisk(void) {
    long long start = ustime();
    if (server.aof_state == AOF_ON) {
        loadAppendOnlyFile(server.aof_filename)    //加载AOF文件  
    } else {
        rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
        errno = 0; /* Prevent a stale value from affecting error checking */
        //加载RDB文件    ======================
        if (rdbLoad(server.rdb_filename,&rsi,RDBFLAGS_NONE) == C_OK) {
             ..................
        } else if (errno != ENOENT) {
            //写错误日志............
            exit(1);
        }
    }
}

int rdbLoad(char *filename, rdbSaveInfo *rsi, int rdbflags) {
    FILE *fp;
    rio rdb;
    int retval;

    if ((fp = fopen(filename,"r")) == NULL) return C_ERR;
    startLoadingFile(fp, filename,rdbflags);
    rioInitWithFile(&rdb,fp);
    retval = rdbLoadRio(&rdb,rdbflags,rsi);
    fclose(fp);
    stopLoading(retval==C_OK);
    return retval;
}

rdbLoadRio函数就是根据前面讲的RDB文件格式来解析数据。

/* Load an RDB file from the rio stream 'rdb'. On success C_OK is returned,
 * otherwise C_ERR is returned and 'errno' is set accordingly. */
int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
    uint64_t dbid;
    int type, rdbver;
    redisDb *db = server.db+0;
    char buf[1024];
    int error;
    long long empty_keys_skipped = 0, expired_keys_skipped = 0, keys_loaded = 0;

    rdb->update_cksum = rdbLoadProgressCallback;
    rdb->max_processing_chunk = server.loading_process_events_interval_bytes;
    //读取 RDB 开头的九个字节
    if (rioRead(rdb,buf,9) == 0) goto eoferr;
    buf[9] = '\0';
    if (memcmp(buf,"REDIS",5) != 0) {
        serverLog(LL_WARNING,"Wrong signature trying to load DB from file");
        errno = EINVAL;
        return C_ERR;
    }
    rdbver = atoi(buf+5);
    if (rdbver < 1 || rdbver > RDB_VERSION) {
        serverLog(LL_WARNING,"Can't handle RDB format version %d",rdbver);
        errno = EINVAL;
        return C_ERR;
    }

    /* Key-specific attributes, set by opcodes before the key type. */
    long long lru_idle = -1, lfu_freq = -1, expiretime = -1, now = mstime();
    long long lru_clock = LRU_CLOCK();

    while(1) {
        sds key;
        robj *val;

        /* Read type. */
        if ((type = rdbLoadType(rdb)) == -1) goto eoferr;

        /* Handle special types. */
        if (type == RDB_OPCODE_EXPIRETIME) {
            expiretime = rdbLoadTime(rdb);
            expiretime *= 1000;
            if (rioGetReadError(rdb)) goto eoferr;
            continue; /* Read next opcode. */
        } else if (type == RDB_OPCODE_EXPIRETIME_MS) {
            expiretime = rdbLoadMillisecondTime(rdb,rdbver);
            if (rioGetReadError(rdb)) goto eoferr;
            continue; /* Read next opcode. */
        } else if (type == RDB_OPCODE_FREQ) {

            uint8_t byte;
            if (rioRead(rdb,&byte,1) == 0) goto eoferr;
            lfu_freq = byte;
            continue; /* Read next opcode. */
        } else if (type == RDB_OPCODE_IDLE) {
            /* IDLE: LRU idle time. */
            uint64_t qword;
            if ((qword = rdbLoadLen(rdb,NULL)) == RDB_LENERR) goto eoferr;
            lru_idle = qword;
            continue; /* Read next opcode. */
        } else if (type == RDB_OPCODE_EOF) {
            /* EOF: End of file, exit the main loop. */
            break;
        } else if (type == RDB_OPCODE_SELECTDB) {
            /* SELECTDB: Select the specified database. */
            if ((dbid = rdbLoadLen(rdb,NULL)) == RDB_LENERR) goto eoferr;
            if (dbid >= (unsigned)server.dbnum) {
                exit(1);
            }
            db = server.db+dbid;
            continue; /* Read next opcode. */
        }
        //还有很多else if,判断了很多情况..............................
    }

     //当 RDB 文件版本大于等于 5 时,检验 CRC64
    if (rdbver >= 5) {
        uint64_t cksum, expected = rdb->cksum;
        if (rioRead(rdb,&cksum,8) == 0) goto eoferr;
        if (server.rdb_checksum && !server.skip_checksum_validation) {
            memrev64ifbe(&cksum);
            .........................
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/613525.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Measurement and Analysis of Large-Scale Network File System Workloads——论文泛读

ATC 2008 Paper 分布式元数据论文阅读笔记整理 问题 网络文件系统在当今的数据存储中发挥着越来越重要的作用。使用网络文件系统可以降低管理成本&#xff0c;从多个位置可靠地访问的数据。这些系统的设计通常以对文件系统工作负载和用户行为的理解为指导[12&#xff0c;19&a…

STL——deque容器【双端动态数组】

deque容器的基本概念&#xff1a; 功能&#xff1a;双端数组&#xff0c;可以对头端进行插入删除操作 deque与vector的区别&#xff1a; vector队友头部的插入删除效率低&#xff0c;数据量越大&#xff0c;效率越低deque相对而言&#xff0c;对头部的插入删除速度会比vecto…

Type-C接口声卡OTG线

随着科技的飞速发展&#xff0c;移动设备已成为我们日常生活中不可或缺的一部分。而在这些设备中&#xff0c;Type-C接口凭借其高速、高效、易用等优点&#xff0c;逐渐成为主流的数据传输和充电接口。与此同时&#xff0c;OTG&#xff08;On-The-Go&#xff09;技术的兴起&…

Java刷题-基础篇

目录 题目1&#xff1a;打印1~100内奇数和、偶数和 题目2&#xff1a;计算5的阶乘 题目3&#xff1a;计算 1!2!3!4!5! 的和 题目4&#xff1a;找1~100之间即能被3整除&#xff0c;又能被5整除的数字&#xff0c;要求必须使用break/continue 题目5&#xff1a;实现猜数字小…

毕业论文答辩PPT该怎么做?推荐3个ai工具帮你做答辩ppt

在我原本的认知里面&#xff0c;答辩PPT是要包含论文各个章节的&#xff0c;在答辩时需要方方面面都讲到的&#xff0c;什么摘要、文献综述、实证分析、研究结果样样不落。但是&#xff0c;这大错特错&#xff01; 答辩PPT环节时长一般不超过5分钟&#xff0c;老师想要的答辩P…

HarmonyOS NEXT星河版之美团外卖点餐功能实战(下)

文章目录 一、购物车逻辑1.1 购物车及加减菜1.2 菜品的加减---方案一1.3 菜品的加减---方案二1.4 购物车View完善1.5 清空购物车1.5 购物车数量和价格 二、小结 一、购物车逻辑 1.1 购物车及加减菜 在utils目录下新建CartStore.ets文件&#xff0c;如下&#xff1a; import …

IndexedDB解密:打开Web应用的数据存储之门

欢迎来到我的博客&#xff0c;代码的世界里&#xff0c;每一行都是一个故事 IndexedDB解密&#xff1a;打开Web应用的数据存储之门 前言IndexedDB简介数据库操作数据检索与索引异步操作与事件处理 前言 在Web的世界里&#xff0c;数据就像是一群旅行者&#xff0c;它们来自各个…

AGV混合型电机驱动器|伺服控制器CNS-MI50H系列对电机的要求

混合型电机驱动器 CNS-MI50H系列涵盖CNS-MI50HB-A、CNS-MI50HBN-A、CNS-MI50HDN-A、CNS-MI50HSN-A型号&#xff0c;专为 AGV 舵轮控制需求设计&#xff0c;集成舵轮转向角度控制和驱动电机闭环控制。支持增量式编码器&#xff0c;霍尔传感器&#xff0c; 角度电位计&#xff0c…

利用106短信群发平台能否提升沟通效率?

利用106短信群发平台确实能够显著提升沟通效率&#xff0c;具体体现在以下几个方面&#xff1a; 1.快速传递信息&#xff1a;106短信群发平台能够实现信息的快速传递。一旦设置好发送内容和接收群体&#xff0c;短信便能在瞬间发送至大量用户。这种即时性确保了信息的迅速传达…

Linux开发--Bootloader应用分析

Bootloader应用分析 一个嵌入式 Linux 系统从软件的角度看通常可以分为四个层次&#xff1a; 引导加载程序。包括固化在固件( firmware )中的 boot 代码(可选)&#xff0c;和 Boot Loader 两大部分。 Linux 内核。特定于嵌入式板子的定制内核以及内核的启动参数。 文件系统…

idea-自我快捷键-2

1. 书签 创建书签&#xff1a; 创建书签&#xff1a;F11创建特色标记书签&#xff1a;Ctrl F11快速添加助记符书签&#xff1a;ctrl shift 数字键 查看书签&#xff1a; shift F11快速定位到助记符书签&#xff1a;Ctrl 数字键 删除书签&#xff1a; delete 2. 自动…

2024年第四届电子信息工程与计算机科学国际会议(EIECS 2024)

2024年第四届电子信息工程与计算机科学国际会议(EIECS 2024) 2024 4th International Conference on Electronic Information Engineering and Computer Science 中国延吉 | 2024年9月27-29日 投稿截止日期&#xff1a;2023年7月15日 收录检索&#xff1a;EI Compendex和Sc…

Remix Client/Server 架构

Remix 框架是服务端渲染架构&#xff0c;当路由请求时生成 HTML 并返回浏览器。这种 SSR 是如何实现的呢&#xff1f;如果不使用 Remix 这种框架&#xff0c;可以在服务器段启动一个无头浏览器进行页面渲染并返回&#xff0c;代价就是要在服务器上启动一个 Chrome 服务&#xf…

微信小程序按钮去除边框线

通常我们去掉按钮边框直接设置 border:0 但是在小程序中无效&#xff0c;设置outline:none也没用&#xff0c;当然可能你会说加权重无效 实际上该样式是在伪元素::after内&#xff0c;主要你检查css 还看不到有这个关系&#xff0c;鹅厂就是坑多 类样式::after {border: non…

【北京迅为】《iTOP-3588从零搭建ubuntu环境手册》-第3章 Ubuntu20.04系统设置

RK3588是一款低功耗、高性能的处理器&#xff0c;适用于基于arm的PC和Edge计算设备、个人移动互联网设备等数字多媒体应用&#xff0c;RK3588支持8K视频编解码&#xff0c;内置GPU可以完全兼容OpenGLES 1.1、2.0和3.2。RK3588引入了新一代完全基于硬件的最大4800万像素ISP&…

论文精读-存内计算芯片研究进展及应用

文章目录 论文精读-存内计算芯片研究进展及应用概述背景介绍前人工作 存内计算3.1 SRAM存内计算3.2 DRAM存内计算3.3 ReRAM/PCM存内计算3.4 MRAM存内计算3.5 NOR Flash存内计算3.6 基于其他介质的存内计算3.7 存内计算芯片应用场景 总结QA 论文精读-存内计算芯片研究进展及应用…

Echarts旭日图的配置项,强大的层级关系展示图表。

ECharts中的旭日图&#xff08;Sunburst Chart&#xff09;是一种数据可视化图表&#xff0c;用于展示层级关系数据。它通常用于呈现树状结构或层级结构的数据&#xff0c;例如组织结构、文件目录结构、地理区域层级等。 旭日图通过圆形的方式展示数据的层级结构&#xff0c;每…

动手学深度学习16 Pytorch神经网络基础

动手学深度学习16 Pytorch神经网络基础 1. 模型构造2. 参数管理1. state_dict()2. normal_() zeros_()3. xavier初始化共享参数的好处 3. 自定义层4. 读写文件net.eval() 评估模式 QA 1. 模型构造 定义隐藏层–模型结构定义前向函数–模型结构的调用 import torch from torch…

Windows密码破解常见手段

mimikatz导出lsass破解 如果域管在成员机器上登录过&#xff0c;那么密码机会保存到lsass.exe进程当中&#xff0c;可以通过mimikatz读取密码 用本地管理员登录本地机器 导出hash reg add HKLM\SYSTEM\CurrentControlSet\Control\SecurityProviders\WDigest /v UseLogonCred…

大模型,阿里云不做选择题 | 最新快讯

什么加速了云的发展&#xff1f; 自 2006 年云计算诞生之日算起&#xff0c;互联网和移动应用、云原生技术普及、企业向先进技术架构演进、中企全球化等因素&#xff0c;先后塑造了云计算的内在与外表&#xff0c;造就了一个数万亿规模的行业。 毋庸置疑的是&#xff0c;生成式…