redis到底是怎么样进行渐进式Rehash的

Redis 是一个开源(BSD许可)的,内存中的数据结构存储系统,它可以用作数据库、缓存和消息中间件。那么redis的底层是如何来存储数据的呢?

一、redis如何在存储大量的key时候,查询速度还能接近O(1)呢?

查询速度接近O(1)的数据结构通常让我们想到的就是HashMap结构,那下面我从源码来追踪下redis到底是不是使用的HashMap结构呢?生成的全局hashTable的大小为4
redis的数据最外层的结构是redisDb(server.h文件) ,其定义如下:

typedef struct redisDb {
    dict *dict;                 /* The keyspace for this DB */
    dict *expires;              /* Timeout of keys with a timeout set */
    dict *blocking_keys;        /* Keys with clients waiting for data (BLPOP)*/
    dict *ready_keys;           /* Blocked keys that received a PUSH */
    dict *watched_keys;         /* WATCHED keys for MULTI/EXEC CAS */
    int id;                     /* Database ID */
    long long avg_ttl;          /* Average TTL, just for stats */
    unsigned long expires_cursor; /* Cursor of the active expire cycle. */
    list *defrag_later;         /* List of key names to attempt to defrag one by one, gradually. */
} redisDb;

从上面定义我们可以看出redisDb 的保存数据的结构是dict(dict.h),那么我们从文件中获取

typedef struct dict {
    dictType *type;
    void *privdata;
    dictht ht[2];
    long rehashidx; /* rehashing not in progress if rehashidx == -1 */
    int16_t pauserehash; /* If >0 rehashing is paused (<0 indicates coding error) */
} dict;
/* This is our hash table structure. Every dictionary has two of this as we
 * implement incremental rehashing, for the old to the new table. */
typedef struct dictht {
    dictEntry **table;
    unsigned long size;
    unsigned long sizemask;
    unsigned long used;
} dictht;

dict 包含了两个hash表(dictht ht[2]),这里使用两个hash表就是为了后续给渐进式rehash来进行服务的.属性rehashidx == -1时候代表不是处于reshaing中。
dictht 就一个hashtable,其包含dictEntry 的数组。然后我们继续看下

   typedef struct dictEntry {
    void *key;
    union {
        void *val;
        uint64_t u64;
        int64_t s64;
        double d;
    } v;
    struct dictEntry *next;
} dictEntry;

dictEntry 的就是hash表中的一个键值对,那么根据上面的代码我们可以绘出redis中内存结构图。
在这里插入图片描述

redis的rehash过程怎么处理呢?

随着redis中key的数据量增多,随着key的增多,那么dictEntry 连越来越长,这个时候查询出来的性能将会越来越慢。这个时候就需要对hashTable进行扩容,在数据量大的时候如果等到所有的扩容完成,那么必然会导致redis长时间等待,那么这个时候我们就采用渐进式rehash方式来进行扩容。

什么是渐进式rehash呢?

Redis 默认使用了两个全局哈希表:dictht[0]和哈希表 dictht[1],一开始,当你刚插入数据时,默认使用dictht[0],此时的dictht[1] 并没有被分配空间。随着数据逐步增多,Redis 开始执行 rehash,这个过程分为三步:

1、给dictht[1]分配更大的空间,一般是当前dictht[0]已使用大小的2倍,但是必须满足是2的幂次倍!
2、把哈希表0 中的数据重新映射并拷贝到哈希表1 中(在hash表1下进行重新计算hash值);
3、释放哈希表 0 的空间
4、把dictht[0]指向刚刚创建好的dictht[1]

什么时候进行hash

  • 1、在没有fork子进程进行RDS或者AOF数据备份的时候且ht[0] .used >= ht[0].size时
  • 2、 在有fork子进程进行RDS或者AOF数据备份的时候且ht[0] .used > ht[0].size * 5时
    扩容,肯定是在添加数据的时候才会扩容,所以我们找一个添加数据的入口,我们从源码层面进行下验证:
int dictReplace(dict *d, void *key, void *val)
{
    dictEntry *entry, *existing, auxentry;

    /* Try to add the element. If the key
     * does not exists dictAdd will succeed. */
    entry = dictAddRaw(d,key,&existing);
    if (entry) {
        dictSetVal(d, entry, val);
        return 1;
    }

    /* Set the new value and free the old one. Note that it is important
     * to do that in this order, as the value may just be exactly the same
     * as the previous one. In this context, think to reference counting,
     * you want to increment (set), and then decrement (free), and not the
     * reverse. */
    auxentry = *existing;
    dictSetVal(d, existing, val);
    dictFreeVal(d, &auxentry);
    return 0;
}

然后继续查看dictAddRaw方法

dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing)
{
    long index;
    dictEntry *entry;
    dictht *ht;

    if (dictIsRehashing(d)) _dictRehashStep(d);

    /* Get the index of the new element, or -1 if
     * the element already exists. */
    if ((index = _dictKeyIndex(d, key, dictHashKey(d,key), existing)) == -1)
        return NULL;

    /* Allocate the memory and store the new entry.
     * Insert the element in top, with the assumption that in a database
     * system it is more likely that recently added entries are accessed
     * more frequently. */
    ht = dictIsRehashing(d) ? &d->ht[1] : &d->ht[0];
    entry = zmalloc(sizeof(*entry));
    entry->next = ht->table[index];
    ht->table[index] = entry;
    ht->used++;

    /* Set the hash entry fields. */
    dictSetKey(d, entry, key);
    return entry;
}

然后继续往下看_dictKeyIndex方法

static long _dictKeyIndex(dict *d, const void *key, uint64_t hash, dictEntry **existing)
{
    unsigned long idx, table;
    dictEntry *he;
    if (existing) *existing = NULL;

    /* Expand the hash table if needed */
    if (_dictExpandIfNeeded(d) == DICT_ERR)
        return -1;
    for (table = 0; table <= 1; table++) {
        idx = hash & d->ht[table].sizemask;
        /* Search if this slot does not already contain the given key */
        he = d->ht[table].table[idx];
        while(he) {
            if (key==he->key || dictCompareKeys(d, key, he->key)) {
                if (existing) *existing = he;
                return -1;
            }
            he = he->next;
        }
        if (!dictIsRehashing(d)) break;
    }
    return idx;
}

从上面代码注释可以看出来,_dictExpandIfNeeded就是用来进行扩容的

   /* Expand the hash table if needed */
static int _dictExpandIfNeeded(dict *d)
{
    /* Incremental rehashing already in progress. Return. */
    if (dictIsRehashing(d)) return DICT_OK;

    /* If the hash table is empty expand it to the initial size. */
    if (d->ht[0].size == 0) return dictExpand(d, DICT_HT_INITIAL_SIZE);

    /* If we reached the 1:1 ratio, and we are allowed to resize the hash
     * table (global setting) or we should avoid it but the ratio between
     * elements/buckets is over the "safe" threshold, we resize doubling
     * the number of buckets. */
    if (!dictTypeExpandAllowed(d))
        return DICT_OK;
    if ((dict_can_resize == DICT_RESIZE_ENABLE &&
         d->ht[0].used >= d->ht[0].size) ||
        (dict_can_resize != DICT_RESIZE_FORBID &&
         d->ht[0].used / d->ht[0].size > dict_force_resize_ratio))
    {
        return dictExpand(d, d->ht[0].used + 1);
    }
    return DICT_OK;
}
  • 1、在hashtable扩容的时候,如果正在扩容的时将不会出发扩容操作
  • 2、DICT_HT_INITIAL_SIZE的大小为4,即默认创建的hashtable大小为4
  • 3、dict_force_resize_ratio的值为5
    *这里需要关注dict_can_resize 这个字段什么时候被赋值了,

如何进行扩容?

对hashtable真正扩容的方法是dictExpand(d, d->ht[0].used + 1)

/* return DICT_ERR if expand was not performed */
int dictExpand(dict *d, unsigned long size) {
    return _dictExpand(d, size, NULL);
}
int _dictExpand(dict *d, unsigned long size, int* malloc_failed)
{
    if (malloc_failed) *malloc_failed = 0;

    /* the size is invalid if it is smaller than the number of
     * elements already inside the hash table */
    if (dictIsRehashing(d) || d->ht[0].used > size)
        return DICT_ERR;

    dictht n; /* the new hash table */
    unsigned long realsize = _dictNextPower(size);

    /* Detect overflows */
    if (realsize < size || realsize * sizeof(dictEntry*) < realsize)
        return DICT_ERR;

    /* Rehashing to the same table size is not useful. */
    if (realsize == d->ht[0].size) return DICT_ERR;

    /* Allocate the new hash table and initialize all pointers to NULL */
    n.size = realsize;
    n.sizemask = realsize-1;
    if (malloc_failed) {
        n.table = ztrycalloc(realsize*sizeof(dictEntry*));
        *malloc_failed = n.table == NULL;
        if (*malloc_failed)
            return DICT_ERR;
    } else
        n.table = zcalloc(realsize*sizeof(dictEntry*));

    n.used = 0;

    /* Is this the first initialization? If so it's not really a rehashing
     * we just set the first hash table so that it can accept keys. */
    if (d->ht[0].table == NULL) {
        d->ht[0] = n;
        return DICT_OK;
    }

    /* Prepare a second hash table for incremental rehashing */
    d->ht[1] = n;
    d->rehashidx = 0;
    return DICT_OK;
}

1、先定义一个新的全局表,大小2^2 到 2的n次幂跟size来进行比较,取第一次满足的时候的条件,_dictNextPower(size)的代码如下:

   while(1) {
        if (i >= size)
            return i;
        i *= 2;
    }

2、设置dictht 的size等于刚刚计算好的realSize,掩码等于realsize-1
3、给dictht 的table分配地址和做初始化操作
4、接下来就判断ht[0].table是否为null,如果为null说明是第一次进行初始存放数据,而不是真正的进行rehash。此时只需要将ht[0] = n,即把刚刚创建的全局hashtable赋值给ht[0]就可以了
5、如果不是那么把刚刚创建的全局hashtable赋值给ht[1],然后dict对应的rehashidx 值修改为0
至此我们完成了hash表的扩容

那redis的数据如何进行迁移的

答案就是我们刚刚说到的使用渐进式rehash方法,那具体是如何做的?
假如一次性把数据迁移会很耗时间,会让单条指令等待很久很久,会形成阻塞。所以,Redis采用的是渐进式Rehash,所谓渐进式,就是慢慢的,不会一次性把所有数据迁移。
那什么时候会进行渐进式数据迁移?
1.每次进行key的crud操作都会进行一个hash桶的数据迁移
2.定时任务,进行部分数据迁移

  • 执行crud时候对数据的操作,进行rehash操作
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
  • 定时任务执行
    源码来源于server.c
/* This function handles 'background' operations we are required to do
 * incrementally in Redis databases, such as active key expiring, resizing,
 * rehashing. */
void databasesCron(void) {
    /* Expire keys by random sampling. Not required for slaves
     * as master will synthesize DELs for us. */
    if (server.active_expire_enabled) {
        if (iAmMaster()) {
            activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW);
        } else {
            expireSlaveKeys();
        }
    }

    /* Defrag keys gradually. */
    activeDefragCycle();

    /* Perform hash tables rehashing if needed, but only if there are no
     * other processes saving the DB on disk. Otherwise rehashing is bad
     * as will cause a lot of copy-on-write of memory pages. */
    if (!hasActiveChildProcess()) {
        /* We use global counters so if we stop the computation at a given
         * DB we'll be able to start from the successive in the next
         * cron loop iteration. */
        static unsigned int resize_db = 0;
        static unsigned int rehash_db = 0;
        int dbs_per_call = CRON_DBS_PER_CALL;
        int j;

        /* Don't test more DBs than we have. */
        if (dbs_per_call > server.dbnum) dbs_per_call = server.dbnum;

        /* Resize */
        for (j = 0; j < dbs_per_call; j++) {
            tryResizeHashTables(resize_db % server.dbnum);
            resize_db++;
        }

        /* Rehash */
        if (server.activerehashing) {
            for (j = 0; j < dbs_per_call; j++) {
                int work_done = incrementallyRehash(rehash_db);
                if (work_done) {
                    /* If the function did some work, stop here, we'll do
                     * more at the next cron loop. */
                    break;
                } else {
                    /* If this db didn't need rehash, we'll try the next one. */
                    rehash_db++;
                    rehash_db %= server.dbnum;
                }
            }
        }
    }
}

那接下来我们真正看下真正执行rehash操作的方法:

static void _dictRehashStep(dict *d) {
    if (d->pauserehash == 0) dictRehash(d,1);
}

/* Performs N steps of incremental rehashing. Returns 1 if there are still
 * keys to move from the old to the new hash table, otherwise 0 is returned.
 *
 * Note that a rehashing step consists in moving a bucket (that may have more
 * than one key as we use chaining) from the old to the new hash table, however
 * since part of the hash table may be composed of empty spaces, it is not
 * guaranteed that this function will rehash even a single bucket, since it
 * will visit at max N*10 empty buckets in total, otherwise the amount of
 * work it does would be unbound and the function may block for a long time. */
int dictRehash(dict *d, int n) {
    int empty_visits = n*10; /* Max number of empty buckets to visit. */
    if (dict_can_resize == DICT_RESIZE_FORBID || !dictIsRehashing(d)) return 0;
    if (dict_can_resize == DICT_RESIZE_AVOID && 
        (d->ht[1].size / d->ht[0].size < dict_force_resize_ratio))
    {
        return 0;
    }

    while(n-- && d->ht[0].used != 0) {
        dictEntry *de, *nextde;

        /* Note that rehashidx can't overflow as we are sure there are more
         * elements because ht[0].used != 0 */
        assert(d->ht[0].size > (unsigned long)d->rehashidx);
        while(d->ht[0].table[d->rehashidx] == NULL) {
            d->rehashidx++;
            if (--empty_visits == 0) return 1;
        }
        de = d->ht[0].table[d->rehashidx];
        /* Move all the keys in this bucket from the old to the new hash HT */
        while(de) {
            uint64_t h;

            nextde = de->next;
            /* Get the index in the new hash table */
            h = dictHashKey(d, de->key) & d->ht[1].sizemask;
            de->next = d->ht[1].table[h];
            d->ht[1].table[h] = de;
            d->ht[0].used--;
            d->ht[1].used++;
            de = nextde;
        }
        d->ht[0].table[d->rehashidx] = NULL;
        d->rehashidx++;
    }

    /* Check if we already rehashed the whole table... */
    if (d->ht[0].used == 0) {
        zfree(d->ht[0].table);
        d->ht[0] = d->ht[1];
        _dictReset(&d->ht[1]);
        d->rehashidx = -1;
        return 0;
    }

    /* More to rehash... */
    return 1;
}
  • 1、基于rehashidx从0开始把数据从ht[0]转移到ht[1]中
  • 2、当整个ht[0]中已使用的数量为0时,就会把原来ht[0]中所占用的内存进行释放,然后ht[0]指向ht[1],最后重置rehashIndex为-1
    根据上面的分析我们可以知道rehash过程中,在redis中设置值的操作大致如下:
    在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/29450.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

DAY 77 [ Ceph ] 基本概念、原理及架构

前言 在实现容器化的初期&#xff0c;计划使用 Ceph 作为容器的存储。都说存储是虚拟化之母&#xff0c;相对容器来说&#xff0c;存储也起到了至关重要的作用。 选用 Ceph 作为容器化存储理由如下&#xff1a; 方便后期横向扩展&#xff1b;Ceph能够同时支持快存储、对象存…

SF授权系统源码 V3.7全开源无加密版本

&#x1f389; 有需要的朋友记得关赞评&#xff0c;文章底部来交流&#xff01;&#xff01;&#xff01; &#x1f389; ✨ 源码介绍 2023全新SF授权系统源码 V3.7全开源无加密版本。网站搭建很简单&#xff0c;大致看来一下应该域名解析后上传源码解压&#xff0c;访问域名/i…

全志V3S嵌入式驱动开发(u盘写读)

【 声明&#xff1a;版权所有&#xff0c;欢迎转载&#xff0c;请勿用于商业用途。 联系信箱&#xff1a;feixiaoxing 163.com】 对于现在的soc来说&#xff0c;基本上usb是标配的。它一般需要支持usb host、device和otg三种模式。简单解释下&#xff0c;host模式&#xff0c;就…

STM32F407移植1588v2(ptpd)

硬件&#xff1a; STM32F407ZGT6开发板 软件&#xff1a; VSCode arm-none-eabi-gcc openOCD st-link 在github搜到一个在NUCLEO-F429ZI开发板上移植ptpd的example&#xff0c;因为和F407差别很小&#xff0c;所以就打算用这个demo移植到手头的开发板上。因为目前只需要…

ASP.NET MVC下的四种验证编程方式

ASP.NET MVC采用Model绑定为目标Action生成了相应的参数列表&#xff0c;但是在真正执行目标Action方法之前&#xff0c;还需要对绑定的参数实施验证以确保其有效性&#xff0c;我们将针对参数的验证成为Model绑定。总地来说&#xff0c;我们可以采用4种不同的编程模式来进行针…

DevExpress WPF功能区控件,更轻松创建应用工具栏!(上)

DevExpress WPF的Ribbon、Toolbar和Menus组件以Microsoft Office为灵感&#xff0c;针对WPF开发人员进行了优化&#xff0c;可帮助您在段时间内模拟当今最流行的商业生产力应用程序。 DevExpress WPF拥有120个控件和库&#xff0c;将帮助您交付满足甚至超出企业需求的高性能业…

百度CDN配置TLS

概述 为了保障您互联网通信的安全性和数据完整性&#xff0c;百度智能云CDN提供TLS版本控制功能。您可以根据不同域名的需求&#xff0c;灵活地配置TLS协议版本。 TLS&#xff08;Transport Layer Security&#xff09;即安全传输层协议&#xff0c;在两个通信应用程序之间提…

LeetCode·每日一题·1177. 构建回文串检测·前缀和

作者&#xff1a;小迅 链接&#xff1a;https://leetcode.cn/problems/can-make-palindrome-from-substring/solutions/2309940/qian-zhui-he-zhu-shi-chao-ji-xiang-xi-by-n3ps/ 来源&#xff1a;力扣&#xff08;LeetCode&#xff09; 著作权归作者所有。商业转载请联系作者获…

全景浏览技术在虚拟现实中的应用

随着虚拟现实技术的发展&#xff0c;全景浏览技术越来越受到人们的关注。全景浏览技术是一种可以将场景全方位呈现给用户的虚拟现实技术&#xff0c;可以为用户带来身临其境的视觉和听觉体验。本文将介绍全景浏览技术在虚拟现实中的应用以及如何利用代码实现这些应用。 一、全…

第五节 利用Ogre 2.3实现雨,雪,爆炸,飞机喷气尾焰等粒子效果

本节主要学习如何使用Ogre2.3加载粒子效果。为了学习方便&#xff0c;直接将官方粒子模块Sample_ParticleFX单独拿出来编译&#xff0c;学习如何实现粒子效果。 一. 前提须知 如果参考官方示例建议用最新版的Ogre 2.3.1。否则找不到有粒子效果的示例。不要用官网Ogre2.3 scri…

6.17黄金反弹是否到顶,下周开盘如何布局

近期有哪些消息面影响黄金走势&#xff1f;下周黄金多空该如何研判&#xff1f; ​黄金消息面解析&#xff1a;黄金周五(6月16日)小幅收高&#xff0c;但在触及5月以来最低盘中水准后本周以下跌收官。美市尾盘&#xff0c;现货黄金收报1957.68美元/盎司&#xff0c;下跌0.19美…

vmware设置centos客户机和windows宿主机共享文件夹

一、安装内核 kernel-devel 包 yum install gcc yum install kernel-devel-$(uname -r) 注意&#xff0c;如果自己修改过内核版本&#xff0c;需要确保 uname -r 显示的版本和实际使用的内核版本一致。 二、安装 vmware-tools 在vmware上点击菜单&#xff1a;虚拟机->安…

使用Nextcloud搭建私人云盘,并内网穿透实现公网远程访问

文章目录 摘要视频教程1. 环境搭建2. 测试局域网访问3. 内网穿透3.1 ubuntu本地安装cpolar3.2 创建隧道3.3 测试公网访问 4 配置固定http公网地址4.1 保留一个二级子域名4.1 配置固定二级子域名4.3 测试访问公网固定二级子域名 转载自cpolar极点云的文章&#xff1a;使用Nextcl…

LabVIEW开发基于Web数字图像处理

LabVIEW开发基于Web数字图像处理 数字图像处理已在各个领域找到了应用&#xff0c;并已成为一个高度活跃的研究领域。实际实施和实验在教育和研究活动中起着不可或缺的作用。为了方便快捷地实施数字图像处理操作&#xff0c;设计了一个先进的基于Web的数字图像处理虚拟实验室&…

一文搞定C++异常机制(附代码+详细解析)

C异常 1.引文C语言传统的处理错误的方式&#xff1a; 2.C异常概念3.异常的使用3.1 异常的抛出和捕获3.2 异常的重新抛出异常捕获中的内存泄漏问题 3.3异常安全3.4异常规范 4.异常优缺点5.总结&#xff1a; 1.引文 C语言传统的处理错误的方式&#xff1a; 终止程序&#xff0c…

python---列表和元组(5)

元组的相关操作 元组的创建 创建元组的时候指定初始值 元组中的元素也可以是任意类型 通过下标访问元组中的元素 下标从0开始到len-1结束 通过切片来获取元组中的一个部分 使用for循环来遍历元组 使用in 判定元素是否存在 使用index查找元素下标 使用来拼接两个元组 元…

2023年互联网Java面试复习大纲:ZK+Redis+MySQL+Java基础+架构

多数的公司总体上面试都是以自我介绍项目介绍项目细节/难点提问基础知识点考核算法题这个流程下来的。有些公司可能还会问几个实际的场景类的问题&#xff0c;这个环节阿里是必问的&#xff0c;这种问题通常是没有正确答案的&#xff0c;就看个人的理解&#xff0c;个人的积累了…

github action 基于个人项目实践

前言: DevOps 和 Jenkins 作为一名开发&#xff0c;虽然也没有经常听到 Devops &#xff08;研发和运维一体化&#xff09;这个概念&#xff0c;但日常工作中已经无处不在地用着 DevOps 工具。自研也好&#xff0c;基于开源项目改造也好&#xff0c;互联网公司基本都会有自已的…

Django-搭建sysinfo获取系统信息

文章目录 前言一、项目搭建二、主机信息监控三、Celery定时任务和异步任务 前言 使用Django&#xff0c;搭建sysinfo&#xff0c;Linux中,sysinfo是用来获取系统相关信息的结构体 本篇基于&#xff1a;https://github.com/hypersport/sysinfo#readme项目借鉴路径: https://gi…

基于开源大模型Vicuna-13B构建私有制库问答系统

本教程专注在怎么使用已经开源的模型和项目&#xff0c;构建一个可以私有化部署的问答知识库&#xff0c;而且整体效果要有所保障。 主要工作包括&#xff1a; 选择基础模型&#xff0c;openAI&#xff0c;claude 这些商用的&#xff0c;或者其他的开源的&#xff0c;这次我们…