【Redis】AOF 基础

因为 Redis AOF 的实现有些绕, 就分成 2 篇进行分析, 本篇主要是介绍一下 AOF 的一些特性和依赖的其他函数的逻辑,为下一篇 (Redis AOF 源码) 源码分析做一些铺垫。

AOF 全称: Append Only File, 是 Redis 提供了一种数据保存模式, Redis 默认不开启。
AOF 采用日志的形式来记录每个写操作, 并追加到文件。开启后, 执行更改 Redis 数据的命令时, 就会把命令写入到 AOF 文件中。
Redis 重启时会根据日志文件的内容把写指令从前到后执行一次以完成数据的恢复工作。

1 AOF 相关的配置

appendonly no                   # AOF 开关, 默认为关闭
appendfilename "appendonly.aof" # 保存的文件名
appendfsync everysec            # AOF 持久化策略 (硬盘缓存写入到硬盘) 

AOF 简单使用的话就这 3 个配置, 前 2 个就是字面的意思, 理解其他比较简单。
我们说明一下第三个配置的使用。

开启 AOF 后, 每次修改的命令都会存到 Redis 的一个缓存区
缓存区的数据最终是需要写入到磁盘的, 而 Redis 是通过 write 函数, 将缓存中的数据写入到磁盘中。
但是 write 函数实际是先将数据先保存到系统层级的缓存, 后续由系统自身将数据保存到磁盘, 系统默认为 30 秒保存一次。这样的话, 可能有风险, 如果系统直接宕机了
可能会丢失 30 秒左右的数据, 所以系统提供了一个 fsync 函数, 可以把系统层级的缓存立即写入到磁盘中, 但是这是一个阻塞且缓慢的操作, 会影响到执行的线程。

所以上面的配置的第 3 项就是控制这个 Redis 缓存到磁盘的行为

  1. everysec: AOF 默认的持久化策略。每秒执行一次 fsync, 可能导致丢失 1s 数据, 这种策略兼顾了安全性和效率
  2. no: 表示不执行 fsync, 由操作系统保证数据同步到磁盘, 速度最快, 但是不太安全
  3. always: 表示每次写入到执行 fsync, 保证数据同步到磁盘, 效率很低

除了上面的 3 个基础配置, 还有几个关于 AOF 执行中的行为配置

# 默认为 100
# 当目前 AOF 文件大小超过上次重写的 AOF 文件的百分之多少进行重写 (重写的含义可以看下面的重写机制), 即当 AOF 文件增长到一定大小的时候, Redis 能够调用 bgrewriteaof 对日志文件进行重写
auto-aof-rewrite-percentag  100

# 默认为 64m
# 设置允许重写的最小 AOF 文件大小, 避免达到约定百分比但占用的容量仍然很小的情况就重写
auto-aof-rewrite-min-size  64mb

# 默认为 no
# 在 AOF 重写时, 是否不要执行 fsync, 将缓存写入到磁盘, 默认为 no。
# 如果对低延迟要求很高的应用, 这里可以设置为 yes, 否则设置为 no, 这样对持久化特性来说这是更安全的选择
# 设置为 yes 表示重写期间对新的写操作不 fsync, 暂时存在内存中, 等重新操作完成后再写入
# 默认为 no, 建议改为 yes, 因为 Linux 的默认 fsync 策略为 30 秒, 所以可能丢失 30 秒数据
no-appendfsync-on-rewrite  no

# 默认为 yes
# 当 Redis 启动的时候, AOF 文件的数据会被重新载入内存
# 但是 AOF 文件可能在尾部是不完整的, 比如突然的断电宕机什么的, 可能导致 AOF 文件数据不完整
# 对于不完整的 AOF 文件如何处理
# 配置为 yes, 当截断的 AOF 文件被导入的时候, 会自动发布一个 log 给客户端, 然后继续加载文件中的数据
# 配置为 no, 用户必须手动 redis-check-aof 修复 AOF 文件才可以
aof-load-truncated yes

2 AOF 重写机制

上面的配置中有好几个提示到重写的概念, 那么什么是重写呢?

由于 AOF 持久化是 Redis 不断将写命令记录到 AOF 文件中, 随着 Redis 不断的运行, AOF 文件将会越来越大, 占用服务器磁盘越来越大, 同时 AOF 恢复要求时间越长。

为了解决这个问题, Redis 新增了重写机制, 当 AOF 文件的大小超过了所设定的阈值时, Redis 就会自动启动 AOF 文件的内容压缩, 只保留可以恢复数据的最小指令集。
AOF 文件不是对原文件进行整理, 而是直接读取服务器现有的键值对, 然后用一条命令去代替之前记录这个键值对的多条命令, 生成一个新的文件替换原来的 AOF 文件。

用户可以通过 bgrewriteaof 命令来手动触发 AOF 文件的重写, 这个重写的过程也是通过子进程实现的。
在子进程进行 AOF 重写时, 主线程需要保证

  1. 处理客户端的请求
  2. 将新增和更新命令追加到现有的 AOF 文件中
  3. 将新增和更新命令追加到 AOF 重写缓存中

3 AOF 文件的优势和劣势

优势

  1. AOF 持久化的方法提供了多种的同步频率, 即使使用默认的同步频率每秒同步一次, Redis 最多也就丢失 1 秒的数据而已
  2. AOF 日志文件以 append-only 模式写入, 所以没有任何磁盘寻址的开销, 写入性能非常高, 而且文件不容易受损, 即使文件尾部受损, 也能很容易恢复, 打开文件, 把后面损坏的数据删除即可

劣势

  1. 对于具有相同数据的的 Redis, AOF 文件通常会比 RDF 文件体积更大 (RDB 存的是数据快照)
  2. 虽然 AOF 提供了多种同步的频率, 默认情况下, 每秒同步一次的频率也具有较高的性能。但是在高并发的情况下, RDB 比 AOF 具好更好的性能保证

4 AOF 和 RDB 两种方案比较

如果可以忍受一小段时间内数据的丢失, 使用 RDB 是最好的, 定时生成 RDB 快照 (snapshot) 非常便于进行数据库备份, 并且 RDB 恢复数据集的速度也要比 AOF 恢复的速度要快, 否则使用 AOF。

但是一般情况下建议不要单独使用某一种持久化机制, 而是应该两种一起用, 在这种情况下, 当 Redis 重启的时候会优先载入 AOF 文件来恢复原始的数据, 因为在通常情况下 AOF 文件保存的数据集要比 RDB 文件保存的数据集要完整。

在 Redis 4.0 带来了一个新的持久化选项 —— 混合持久化。将 RDB 文件的内容和增量的 AOF 日志文件存在一起。
这里的 AOF 日志不再是全量的日志, 而是自持久化开始到持久化结束的这段时间发生的增量 AOF 日志, 通常这部分 AOF 日志很小。

在 Redis 重启的时候, 可以先加载 RDB 的内容, 然后再重放增量 AOF 日志就可以完全替代之前的 AOF 全量文件重放, 重启效率因此大幅得到提升。

5 AOF 的过程

高度概括如下:

  1. 所有的修改命令会追加到 Redis 的一个 AOF 缓存区
  2. AOF 缓存区根据配置的策略向硬盘做同步操作
  3. 随着 AOF 文件越来越大, 达到配置的条件, 对 AOF 文件进行重写, 达到压缩的目的

到此, AOF 的理论知识就没了, 下面是介绍几个比较重要的函数的逻辑。

6 AOF 文件结构

如果现在向 Redis 中写入一个 key 为 redis-key, value 为 redis-value 的字符串键值对后, 这对键值对会以下面的格式保存在 AOF 文件中:

*3\r\n$3\r\nset\r\n$9\r\nredis-key\r\n$11\r\nredis-value\r\n

*数字 的格式开始, 表示后面的命令的参数个数, 然后通过 $数字 表示后面参数的长度, 然后各个分隔之间通过 \r\n 进行分隔。

整体的格式就是 Redis 自定义的 RESP 协议, 具体的 RESP 介绍, 可以看一下这篇文章。

可以看到这种文本格式具有很高的可读性, 同时可以直接进行修改。

注: Redis 中有多个数据库, 写入的数据是保存在哪个数据库的?
在写入对应的数据库数据时, 内部会自动插入一条 select 数据库的编号 的命令到 AOF 文件, 表明对应的数据库, 解析时也是通过这条命令切换到对应的数据库。

源码中将 key 和 value 转换为上面的文件格式的实现是由 2 个函数实现的: catAppendOnlyGenericCommand 和 catAppendOnlyExpireAtCommand, 前者处理的是正常的命令, 而后者处理的是命令的过期时间。

6.1 catAppendOnlyGenericCommand - 没有过期时间的命令

/**
 * 将入参的参数转为 RESP 格式写入到入参的 dst
 * @param dst 当前未写入到文件的命令文本, 新的命令会追加到这个的后面
 * @param argc 命令参数的个数
 * @param argv 命令参数, 比如 set key value
 */
sds catAppendOnlyGenericCommand(sds dst, int argc, robj **argv) {

    char buf[32];
    int len, j;
    robj *o;

    // 上面的 set redis-key redis-value, 按照 RESP 协议转换的内容如下
    // *3\r\n$3\r\nset\r\n$9\r\nredis-key\r\n$11\r\nredis-value\r\n
    // 这里面可以拆为 2 部分处理 
    // 1. *3\r\n  --> 命令的参数个数
    // 2. $3\r\nset\r\n$9\r\nredis-key\r\n$11\r\nredis-value\r\n  --> 具体的命令

    // 1. 处理命令的参数个数部分
    
    // 命令开始的前缀为 *
    buf[0] = '*';
    // argc 表示的是写入命令的个数, 经过这一步 buf = *参数个数
    len = 1+ll2string(buf+1,sizeof(buf)-1,argc);
    // 追加 \r\n, 到了这一步 经过这一步 buf = *参数个数\r\n
    buf[len++] = '\r';
    buf[len++] = '\n';

    // 先将处理的文本第一步拼接到 dst 的后面, 此时 dst = *参数个数\r\n
    dst = sdscatlen(dst,buf,len);

    // 2. 处理具体的命令

    // 拼接参数列表
    for (j = 0; j < argc; j++) {
        
        // 将对应的参数转为字符串类型
        o = getDecodedObject(argv[j]);
        buf[0] = '$';
        // 将命令的长度写入到 buf 中
        len = 1+ll2string(buf+1,sizeof(buf)-1,sdslen(o->ptr));
        // 继续在后面拼接 \r\n, 到这一步 buf = $命令的长度\r\n
        buf[len++] = '\r';
        buf[len++] = '\n';

        // 同样将 $命令的长度\r\n 写入到 dst
        dst = sdscatlen(dst,buf,len);
        // 将当前的命令具体的内容 写入到 dst
        dst = sdscatlen(dst,o->ptr,sdslen(o->ptr));
        // 在 dist 后面追加一个 \r\n
        dst = sdscatlen(dst,"\r\n",2);
        // 经过一次循环, dist 里面多了一段 $命令的长度\r\n命令\r\n 的内容

        // 引用此数 - 1
        decrRefCount(o);
    }
    return dst;
}

/**
 * 如果入参的对象是 raw 或者 embstr 编码, 引用次数 + 1
 * 如果为 int 编码, 根据这个整数创建出一个字符串, 同时返回这个字符串
 * 其他类型不会处理
 */
robj *getDecodedObject(robj *o) {
    robj *dec;

    // 判断一个对象的编码是否为 OBJ_ENCODING_EMBSTR 或者 OBJ_ENCODING_RAW
    if (sdsEncodedObject(o)) {
        // 对象的引用次数还没达到最大值时, 进行引用次数 + 1
        incrRefCount(o);
        return o;
    }
    // 是字符串类型同时编码为 OBJ_ENCODING_INT
    if (o->type == OBJ_STRING && o->encoding == OBJ_ENCODING_INT) {

        char buf[32];
        // 整形转为 char 数组
        ll2string(buf,32,(long)o->ptr);
        // 转为字符串
        dec = createStringObject(buf,strlen(buf));
        return dec;
    } else {
        serverPanic("Unknown encoding type");
    }
}

2.7.2 catAppendOnlyExpireAtCommand - 带过期时间的命令

/**
 * 将入参的过期时间转为 RESP 格式的字符串并存入 buf
 * @param buf 当前未写入到文件的命令文本, 新的命令会追加到这个的后面, 同时将命令修改为 pexpireat 的格式
 * @param cmd 执行的命令
 * @param key redis 的 key 值
 * @param second 过期的时间, 单位秒
 */
sds catAppendOnlyExpireAtCommand(sds buf, struct redisCommand *cmd, robj *key, robj *seconds) {

    long long when;
    robj *argv[3];

    // 转为字符串类型, 以便使用 strtoll 函数
    seconds = getDecodedObject(seconds);
    // 根据指定的进制将入参的 char 数组转为一个整数, 10 --> 10 进制
    // 得到过期的时间
    when = strtoll(seconds->ptr,NULL,10);

    // 当前执行的命令为 expire, setex expireat 将参数的秒转换成毫秒
    if (cmd->proc == expireCommand || cmd->proc == setexCommand || cmd->proc == expireatCommand) {
        when *= 1000;
    }

    // 将 expire, setex expireat 命令的参数,从相对时间设置为绝对时间
    // 以前可能是 10s 后过期, 经过这一步,得到的是 xxx 年 yyy 月 的具体时间
    if (cmd->proc == expireCommand || cmd->proc == pexpireCommand || cmd->proc == setexCommand || cmd->proc == psetexCommand) {
        when += mstime();
    }

    // 减少 second 的引用次数, 便于回收
    decrRefCount(seconds);

    // 拼接为 pexpireat key 超时时间 的命令格式
    argv[0] = createStringObject("PEXPIREAT",9);
    argv[1] = key;
    argv[2] = createStringObjectFromLongLong(when);

    // 将上面的 pexpireat key 过期时间 的命令通过 catAppendOnlyGenericCommand 转为 RESP 格式的字符串
    buf = catAppendOnlyGenericCommand(buf, 3, argv);
    decrRefCount(argv[0]);
    decrRefCount(argv[2]);
    // 返回buf
    return buf;

}

7 代码中涉及的几个模块

在 Redis 的 AOF 中涉及了几个模块功能, 这些功能辅助着整个 AOF 的功能, 这里对这些功能进行一个简单的讲解, 需要说明的时, 这些功能具体的实现可以不用去深入理解, 到了 AOF 源码时, 知道对应的函数的功能就行了。

这里也只是简单的介绍一下, 感兴趣了解一下大体的思路而已, 可以在后面 AOF 源码分析后, 再回来看一下。

7.1 延迟统计

Redis 中会对一些比较耗时的操作做一下统计, 便于后面的性能分析。
而在 AOF 中在调用 write 函数等操作也会进行延迟操作的统计。
大体的延迟统计实现如下:

统计延迟信息的配置


#define CONFIG_DEFAULT_LATENCY_MONITOR_THRESHOLD 0

struct redisServer {

    // 延迟监视的阈值, 默认值为 0, 如果配置为大于 0 的值, 表示开启延迟监控, 同时超过了这个时间就进行延迟记录
    long long latency_monitor_threshold;

    // 字典, 也就是延迟记录的保存地方, 保存的格式是 延迟记录的事件名 和 latencyTimeSeries (一个数组)
    dict *latency_events;
}

统计延迟样本对象的定义

struct latencyTimeSeries {

    // 用于记录的下一个延迟样本的位置, 超过了数组的长度, 会重新被赋值为 0 
    int idx;

    // 最大的延时
    uint32_t max;

    // 最近的延时记录样本数组
    struct latencySample samples[LATENCY_TS_LEN];
}

struct latencySample {

    // 延时样本创建的时间
    int32_t time; 

    // 延迟样本的延迟时间, 单位毫秒
    uint32_t latency;
}

统计延迟样本的函数定义

// 下面的函数做了小改动, 逻辑一样的

// 获取延迟事件的时间
void latencyStartMonitor(var) { 
    // mstime() 获取到当前的时间
    var = server.latency_monitor_threshold ? mstime() : 0;
}

void latencyEndMonitor(var) {
    if (server.latency_monitor_threshold) {
        var = mstime() - var;
    }
}

/**
 * 判断是否需要记录延迟时间 
 * @param event 事件名
 * @param var 延迟事件的耗时时间
 */
void latencyAddSampleIfNeeded(event,var) {
    if (server.latency_monitor_threshold &&  (var) >= server.latency_monitor_threshold)
        latencyAddSample((event),(var));
}

/**
 * 添加延迟事件到 redisServer 的 latency_events 字典
 */ 
void latencyAddSample(char *event, mstime_t latency) {

    // 找出 event 对应的延时事件记录结构体
    struct latencyTimeSeries *ts = dictFetchValue(server.latency_events,event);

    time_t now = time(NULL);
    int prev;

    // 没有对应事件的 latencyTimeSeries, 添加一个
    if (ts == NULL) {
        ts = zmalloc(sizeof(*ts));
        ts->idx = 0;
        ts->max = 0;
        memset(ts->samples,0,sizeof(ts->samples));
        dictAdd(server.latency_events,zstrdup(event),ts);
    }

    // 获取存储的位置
    prev = (ts->idx + LATENCY_TS_LEN - 1) % LATENCY_TS_LEN;

    // 数组对应位置的样本的创建时间等于当前时间
    if (ts->samples[prev].time == now) {
        // 当前的延迟时间大于样本里面的延迟时间, 更新为当前时间
        if (latency > ts->samples[prev].latency)
            ts->samples[prev].latency = latency;
        return;
    }

    // 修改对应位置的样本的时间信息
    ts->samples[ts->idx].time = time(NULL);
    ts->samples[ts->idx].latency = latency;

    // 如果大于当前所有样本的时间, 更新最大延迟时间为当前的延迟时间
    if (latency > ts->max) 
        ts->max = latency;

    ts->idx++;
    // 超过了上限, 重新设置为 0 
    if (ts->idx == LATENCY_TS_LEN) 
        ts->idx = 0;    

}

上面就是延迟事件的创建和保存, 至于在哪里使用的, 如何汇总分析, AOF 这里没有涉及, 就跳过了, 如果需要继续研究可以查看 latency.hlatency.c 这 2 个文件。

7.2 BIO - 后台线程

在上面的介绍中可以知道 fsync 是一个很耗时的过程, 如果把这个过程同样放在 Redis 的主线程中, 那么可能影响到整个 Redis 的性能, 所以 Redis 将 fsync 的过程交给了后台的线程处理。
Reids 将后台相关耗时的操作封装为了一个 BIO 的功能, 可以看出是一个线程池, 线程池在启动时初始了几个线程, 然后生产者向这个池中添加任务。
而 Redis 主线程在执行到 fsync 时, 会提交一个 fsync 的任务到 BIO 中, 完成结束。真正的 fsync 由后台线程处理。

大体的实现如下:

任务类型定义

// 执行 close 函数, 也就是文件的关闭
#define BIO_CLOSE_FILE    0 

// 执行 redis_fsync 函数, 也就是 fsync 函数
#define BIO_AOF_FSYNC     1

// 延迟对象释放
#define BIO_LAZY_FREE     2 

// 任务类型的总数
#define BIO_NUM_OPS       3

BIO 的初始化

// 存放声明的线程数组
static pthread_t bio_threads[BIO_NUM_OPS];

// 线程锁, 这里是多线程场景了, 所以有并发问题
static pthread_mutex_t bio_mutex[BIO_NUM_OPS];

// 添加任务相关的 condition, 简单的理解就是, 线程会被阻塞在这个 condition, 另一个线程可以唤醒这个 condition 上的线程
static pthread_cond_t bio_newjob_cond[BIO_NUM_OPS];

// 执行过程相关的 condition
static pthread_cond_t bio_step_cond[BIO_NUM_OPS];

// 任务列表
static list *bio_jobs[BIO_NUM_OPS];

// 存放对应的任务类型还有多少个任务等待执行
static unsigned long long bio_pending[BIO_NUM_OPS];


void bioInit(void) {

    pthread_attr_t attr;
    pthread_t thread;
    size_t stacksize;
    int j;

    // 初始锁, condition, 任务列表
    for (j = 0; j < BIO_NUM_OPS; j++) {
        pthread_mutex_init(&bio_mutex[j],NULL);
        pthread_cond_init(&bio_newjob_cond[j],NULL);
        pthread_cond_init(&bio_step_cond[j],NULL);
        bio_jobs[j] = listCreate();
        bio_pending[j] = 0;
    }

    //设置线程栈空间
    pthread_attr_init(&attr);
    pthread_attr_getstacksize(&attr,&stacksize); 
    if (!stacksize) 
        stacksize = 1; 
    while (stacksize < REDIS_THREAD_STACK_SIZE) 
        stacksize *= 2;
    pthread_attr_setstacksize(&attr, stacksize);

    // 创建线程, 并存放到 bio_threads 这个线程数组
    for (j = 0; j < BIO_NUM_OPS; j++) {
        void *arg = (void*)(unsigned long) j;

        // 创建线程, 线程执行的逻辑为 bioProcessBackgroundJobs
        if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) {
            serverLog(LL_WARNING,"Fatal: Can't initialize Background Jobs.");
            exit(1);
        }
        bio_threads[j] = thread;
    }
}

线程执行的逻辑

void *bioProcessBackgroundJobs(void *arg) {

    struct bio_job *job;
    unsigned long type = (unsigned long) arg;
    sigset_t sigset;

    // 任务类型校验
    if (type >= BIO_NUM_OPS) {
        serverLog(LL_WARNING, "Warning: bio thread started with wrong type %lu",type);
        return NULL;
    }

    // 配置 thread 能够在任何时候被杀掉
    pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
    pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);

    // 获取锁
    pthread_mutex_lock(&bio_mutex[type]);

    sigemptyset(&sigset);
    sigaddset(&sigset, SIGALRM);
    if (pthread_sigmask(SIG_BLOCK, &sigset, NULL))
        serverLog(LL_WARNING, "Warning: can't mask SIGALRM in bio.c thread: %s", strerror(errno));

    // 线程主逻辑
    while(1) {

        listNode *ln; 

        // 没有任务, 进行等待, 
        if (listLength(bio_jobs[type]) == 0) {
            pthread_cond_wait(&bio_newjob_cond[type],&bio_mutex[type]);
            continue;
        }  

        // 获取列表的第一个任务
        ln = listFirst(bio_jobs[type]);
        job = ln->value; 

        // 释放锁, 这个锁的功能主要是为了确保任务的获取
        pthread_mutex_unlock(&bio_mutex[type]);

        if (type == BIO_CLOSE_FILE) {
            close((long)job->arg1);
        } else if (type == BIO_AOF_FSYNC) {
            redis_fsync((long)job->arg1);
        } else if (type == BIO_LAZY_FREE) {
            if (job->arg1)
                lazyfreeFreeObjectFromBioThread(job->arg1);
            else if (job->arg2 && job->arg3)
                lazyfreeFreeDatabaseFromBioThread(job->arg2,job->arg3);
            else if (job->arg3)
                lazyfreeFreeSlotsMapFromBioThread(job->arg3);
        } else {
            serverPanic("Wrong job type in bioProcessBackgroundJobs().");
        }
        zfree(job);

        // 获取锁
        pthread_mutex_lock(&bio_mutex[type]);
        // 删除任务
        listDelNode(bio_jobs[type],ln);
        // 需要执行的任务减 1
        bio_pending[type]--;

        // 唤醒所有等待在 bio_step_cond 上的线程
        pthread_cond_broadcast(&bio_step_cond[type]);
    }      
}

添加任务

void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) {

    struct bio_job *job = zmalloc(sizeof(*job));

    job->time = time(NULL);
    job->arg1 = arg1;
    job->arg2 = arg2;
    job->arg3 = arg3;
    
    // 获取锁
    pthread_mutex_lock(&bio_mutex[type]);
    // 添加任务
    listAddNodeTail(bio_jobs[type],job);
    bio_pending[type]++;

    // 通知等待在 bio_newjob_cond 的线程
    pthread_cond_signal(&bio_newjob_cond[type]);
    // 释放锁
    pthread_mutex_unlock(&bio_mutex[type]);
}

7.3 pipe - 父子进程通信

Redis 的 AOF 重写机制和 RDB 类型, 也是通过 fork 创建子进程, 将整个 AOF 重写过程交给子进程处理。
不同的时: AOF 重写过程中会不断和父进程通信获取父进程的命令缓存。 父子进程之间就是通过 pipe 进行通讯的。

这里只做一下简单的了解。

#include <unistd.h>

int Pipe(int pipefd[2]);

通过 pipe 的函数可以在 2 个文件描述符之间建立一个通道, 第一个用来读 read(fd[0]), 第二个用来写 write(fd[1])

具体的分析可以看一下这篇文章 APUE读书笔记—进程间通信(IPC)之管道和有名管道(FIFO)

而 Redis 中建立了 3 套通道

int aofCreatePipes(void) {

    int fds[6] = {-1, -1, -1, -1, -1, -1};
    int j;

    //  parent -> children data
    if (pipe(fds) == -1) goto error;

    // children -> parent ack
    if (pipe(fds+2) == -1) goto error; 

    // parent -> children ack
    if (pipe(fds+4) == -1) goto error; 

    // 同步非阻塞
    if (anetNonBlock(NULL,fds[0]) != ANET_OK) goto error;
    if (anetNonBlock(NULL,fds[1]) != ANET_OK) goto error;

    // 注册一个事件, 执行的函数为 aofChildPipeReadable, 里面的逻辑就是读取 aof_pipe_read_ack_from_child 的数据到 aof_pipe_write_ack_to_child
    if (aeCreateFileEvent(server.el, fds[2], AE_READABLE, aofChildPipeReadable, NULL) == AE_ERR) goto error;

    // 6 个通道
    server.aof_pipe_write_data_to_child = fds[1];
    server.aof_pipe_read_data_from_parent = fds[0];
    server.aof_pipe_write_ack_to_parent = fds[3];
    server.aof_pipe_read_ack_from_child = fds[2];
    server.aof_pipe_write_ack_to_child = fds[5];
    server.aof_pipe_read_ack_from_parent = fds[4];
    server.aof_stop_sending_diff = 0;
    return C_OK;
}

最终形成的效果如下:
Alt 'AOF 创建的通道'

父进程写入到 aof_pipe_read_data_from_parent 的数据会自动同步到子进程的 aof_pipe_read_data_from_parent, 另外 2 个类似。

至此, AOF 的一些概念和源码中相关的一些代码就介绍完了, 下篇开始真正的 AOF 源码分析。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/252451.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Go标准包之flag命令行参数解析

1.介绍 在 Go中&#xff0c;如果要接收命令行参数&#xff0c;需要使用 flag 包进行解析。不同的参数类型可以通过不同的方法接收。 2.参数接受 2.1 接受方式 使用flag接收参数&#xff0c;可以由以下三种方式接受&#xff1a; 方式一: flag.Type(name,defaultVal,desc)方…

Linux上使用HTTP协议进行数据获取的实战示例

嗨&#xff0c;Linux爱好者们&#xff0c;今天我们要一起探讨一下如何在Linux上进行HTTP协议的数据获取。这不是一项简单的任务&#xff0c;但放心&#xff0c;我会以最简单的语言&#xff0c;结合实例来给大家讲解。 首先&#xff0c;我们需要一个工具&#xff0c;那就是curl…

Python生成器(Generator)(继续更新...)

学习网页&#xff1a; Welcome to Python.orghttps://www.python.org/https://www.python.org/ Python生成器 生成器&#xff08;Generator&#xff09;是 Python 的一种特殊类型的迭代器。生成器允许你创建自己的数据流&#xff0c;每次从数据流中获取一个元素&#xff0c;…

医保电子凭证在项目中的集成应用

随着医保电子凭证使用普及&#xff0c;医疗行业的各个场景都要求支持医保码一码通办&#xff0c;在此分享一下&#xff0c;在C#和js中集成医保电子凭证的demo 供有需要的小伙伴参考。 一、项目效果图 在c#中集成医保电子凭证效果 在js中集成医保电子凭证效果 二、主要代码 c#…

Linux_Docker图形化工具Portainer如何安装并结合内网穿透实现远程访问

文章目录 前言1. 部署Portainer2. 本地访问Portainer3. Linux 安装cpolar4. 配置Portainer 公网访问地址5. 公网远程访问Portainer6. 固定Portainer公网地址 前言 本文主要介绍如何本地安装Portainer并结合内网穿透工具实现任意浏览器远程访问管理界面。Portainer 是一个轻量级…

算法竞赛备赛进阶之树形DP训练

目录 1.树的最长路径 2.树的中心 3.数字转换 4.二叉苹果树 5.战略游戏 6.皇宫守卫 树形DP是一种动态规划方法&#xff0c;主要用于解决树形结构的问题。在树形DP中&#xff0c;通常会使用动态规划的思想来求解最优化问题。其核心在于通过不断地分解问题和优化子问题来解决…

【理论篇】SaTokenException: 非Web上下文无法获取Request问题解决 -理论篇

在我们使用sa-token安全框架的时候&#xff0c;有时候会提示&#xff1a;SaTokenException:非Web上下文无法获取Request 错误截图&#xff1a; 在官方网站中&#xff0c;查看常见问题排查&#xff1a; 错误追踪&#xff1a; 跟着源码可以看到如下代码&#xff1a; 从源码中&a…

01 整体代码运行流程

文章目录 01 整体代码运行流程1.1 运行官方 Demo1.2 变量命名规则1.3 多线程1.4 线程锁1.5 SLAM 主类 System 01 整体代码运行流程 1.1 运行官方 Demo 以 stereo_kitti 为例&#xff0c;执行 ./stereo_kitti path_to_vocabulary path_to_settings path_to_sequence./stereo_…

大创项目推荐 深度学习 python opencv 实现人脸年龄性别识别

文章目录 0 前言1 项目课题介绍2 关键技术2.1 卷积神经网络2.2 卷积层2.3 池化层2.4 激活函数&#xff1a;2.5 全连接层 3 使用tensorflow中keras模块实现卷积神经网络4 Keras介绍4.1 Keras深度学习模型4.2 Keras中重要的预定义对象4.3 Keras的网络层构造 5 数据集处理训练5.1 …

W25Q64(模拟SPI)读写数据的简单应用

文章目录 一、W25Q64是什么&#xff1f;二、使用步骤1.硬件1.引脚说明2.硬件连接3.设备ID4.内部框架5.指令集指令集1指令集2 2.软件1.W25Q64引脚定义代码如下&#xff08;示例&#xff09;&#xff1a;2.W25Q64初始化代码如下&#xff08;示例&#xff09;&#xff1a;3.W25Q64…

在排序数组中查找元素的第一个和最后一个位置(Java详解)

一、题目描述 给你一个按照非递减顺序排列的整数数组 nums&#xff0c;和一个目标值 target。请你找出给定目标值在数组中的开始位置和结束位置。 如果数组中不存在目标值 target&#xff0c;返回 [-1, -1]。 你必须设计并实现时间复杂度为 O(log n) 的算法解决此问题。 示…

Android开发——组合函数、注解与连接Android设备

1、JetPack Compose、组合函数与注解和文本修改 1、JetPack Compose&#xff1a;Jetpack Compose 是由 Google 推出的用于构建 Android 用户界面的现代化工具包。它是一个声明式的 UI 工具包&#xff0c;用于简化 Android 应用程序的用户界面设计和开发。Jetpack Compose 采用…

02.Git常用基本操作

一、基本配置 &#xff08;1&#xff09;打开Git Bash &#xff08;2&#xff09;配置姓名和邮箱 git config --global user.name "Your Name" git config --global user.email "Your email" 因为Git是分布式版本控制工具&#xff0c;所以每个用户都需要…

02-分组查询group by和having的使用

分组查询 MySQL中默认是对整张表的数据进行操作即整张表为一组, 如果想对每一组的数据进行操作,这个时候我们需要使用分组查询 分组函数的执行顺序: 先根据where条件筛选数据,然后对查询到的数据进行分组,最后也可以采用having关键字过滤取得正确的数据 group by子句 在一条…

【STM32】STM32学习笔记-EXTI外部中断(11)

00. 目录 文章目录 00. 目录01. 中断系统02. 中断执行流程03. STM32中断04. NVIC基本结构05. NVIC优先级分组06. EXTI简介07. EXTI基本结构08. AFIO复用IO口09. EXTI框图10. 计数器模块11. 旋转编码器简介12. 附录 01. 中断系统 中断&#xff1a;在主程序运行过程中&#xff0…

easy贪吃蛇

之前承诺给出一个贪吃蛇项目。 1.EasyX库认知 有关EasyX库的相关信息&#xff0c;您可以看一下官方的文档&#xff1a;EasyX官方文档。 这里我做几点总结&#xff1a; EasyX库就和名字一样&#xff0c;可以让用户调用一些简单的函数来绘制图像和几何图形利用EasyX库可以制作…

ES6 面试题 | 15.精选 ES6 面试题

&#x1f90d; 前端开发工程师&#xff08;主业&#xff09;、技术博主&#xff08;副业&#xff09;、已过CET6 &#x1f368; 阿珊和她的猫_CSDN个人主页 &#x1f560; 牛客高级专题作者、在牛客打造高质量专栏《前端面试必备》 &#x1f35a; 蓝桥云课签约作者、已在蓝桥云…

知识付费平台选择指南:如何找到最适合你的学习平台?

在当今的知识付费市场中&#xff0c;用户面临的选择越来越多&#xff0c;如何从众多知识付费平台中正确选择属于自己的平台呢&#xff1f;下面&#xff0c;我们将为您介绍我有才知识付费平台相比同行的优势&#xff0c;帮助您做出明智的选择。 一、创新的技术架构&#xff0c;…

3.3【窗口】窗口的几何形状(二,窗口属性)

写在前面 应用程序使用窗口来显示内容。一些属性决定了窗口及其内容的大小和位置。其他属性决定了窗口内容的外观和解释。 了解窗口属性引用的两个坐标系非常重要。如果你知道你正在使用的坐标系,那么为你的窗口属性选择设置值会容易得多,并且会更有意义。 一,显示相关属…

【CMU 15-445】Lecture 11: Joins Algorithms 学习笔记

Joins Algorithms Nested Loop JoinNaive Nested Loop JoinBLock Nested Loop JoinIndex Nested Loop Join Sort-Merge JoinHash JoinBasic Hash JoinPartitioned Hash Join Conclusion 本节课主要介绍的是数据库系统中的一些Join算法 Nested Loop Join Naive Nested Loop Joi…