Redis
- 安装
- redis-cli
- 记录
- 单线程+多路IO复用
- Redis字符串
- Redis列表
- 事务
- Redis悲观锁和乐观锁
- AOF
- 主从
- 集群
- 概念
- slots
- Redis应用问题解决
- 缓存穿透
- 缓存击穿
- 缓存雪崩
- 分布式锁
- 重启和停止redis server
- 配置登陆密码
- 配置外网访问
- Redis源码学习
- server守护进程实现
- server处理信号
- redis object
- redis 事件循环
- redis db
- redis rehash
- IO
- syncWrite
- RDB
- 分布式锁
- 延时队列
- Redis连环问
- Redis优缺点?
- Redis为什么这么快?
- Redis应用场景有哪些?
- Redis 数据类型有哪些?
- Redis事务支持隔离性吗?
- Redis事务保证原子性吗,支持回滚吗?
- Redis有哪些部署方案?
- 主从架构
- 哨兵Sentinel
- Redis cluster
- 过期键的删除策略?
- 内存淘汰策略有哪些?
- Redis大key怎么处理?
- 为什么 Redis 集群的最大槽数是 16384 个?
- rdb和aof的优势与劣势
- **RDB文件保存过程**
- **优势**
- **劣势**
- **AOF文件保存过程**
- **优势**
- **劣势**
- Redis windows客户端
- Redis Mac客户端
- docker启动redis
安装
https://www.cnblogs.com/cthon/p/9357464.html
其中第4步可能报错,执行 sudo apt-get install gcc-multilib
再执行 make distclean && make
redis-cli
登陆: redis-cli -a [password] -h [host] -p [port]
记录
单线程+多路IO复用
简单理解就是:一个服务端进程可以同时处理多个套接字描述符。
- 多路:多个客户端连接(连接就是套接字描述符)
- 复用:使用单进程就能够实现同时处理多个客户端的连接
以上是通过增加进程和线程的数量来并发处理多个套接字,免不了上下文切换的开销,而 IO 多路复用只需要一个进程就能够处理多个套接字,从而解决了上下文切换的问题。
Redis字符串
Redis列表
字符串列表
双向链表
事务
和MySQL不一致。
串行化,保证绝对的隔离性。按顺序执行。
Multi Exec discard
Exec类似MySQL提交事务。discard类似MySQL回滚。
Redis悲观锁和乐观锁
乐观锁通过“版本号”来区分。当一个操作改变数据库时产生新的版本号,另一个操作更改数据库时需要首先校验自己的版本号和数据库的版本号是否一致,如果不一致需要首先将手上的数据与数据库保持一致。
AOF
记录写操作指令。启动之初读取文件后重建数据。
主从
怎么配置主从?
从机执行:slave of [ip] [port] (后面是master的ip + port)
从服务器挂了重启是master状态,需要先salveof变成从服务器,此时master会把所有数据复制过来。
master挂了,slave不会变master,master重启还是master。
主从可以“薪火相传”,像“串”的主从结构。
哨兵模式:拿一个“哨兵”去监控master,master挂了哨兵在slave中选主。可以多哨兵。
集群
概念
解决问题:
- 容量不够,redis如何进行扩容?
- 并发写操作,redis如何分摊?
无中心化集群:任何一台服务器都可以作为集群入口。(而不是反向代理请求代理)
集群和主从的区别:主从存的都是一致的,集群是水平扩容,每个集群存所有数据的1/N,N为集群数量。
slots
一个Redis集群包含16384个插槽,数据库中每个键都属于这些插槽中的一个。
集群使用CRC16(key)%16384来计算键key属于那个槽,其中CRC16(key)语句用于计算key的CRC16校验和。
集群中的每个节点负责处理一部分插槽。
Redis应用问题解决
缓存穿透
缓存击穿
缓存雪崩
分布式锁
redis中用setnx实现:setnx key val
,val随便。之后若再次setnx相同key会失败,只有当del掉key后才能再次setnx,从而实现集群分布式锁。
重启和停止redis server
如果是用apt-get或者yum install安装的redis,可以直接通过下面的命令停止/启动/重启redis
/etc/init.d/redis-server stop
/etc/init.d/redis-server start
/etc/init.d/redis-server restart
如果是通过源码安装的redis,则可以通过redis的客户端程序redis-cli的shutdown命令来重启redis
1.redis关闭
redis-cli -h 127.0.0.1 -p 6379 shutdown
2.redis启动
redis-server
配置登陆密码
自己买的公网服务器redis最好还是配密码。
打开配置文件搜 requirepass 把注释取消,后面换成自己的登陆密码。
用redis-cli登陆的时候需要加 -a 参数,跟密码,直接登陆也可以,只是没有任何权限。
!https://s3-us-west-2.amazonaws.com/secure.notion-static.com/b8935a1a-57be-41a2-bd57-7f1bffc9cdf3/rId50.png
登陆:redis-cli -a [password]
配置外网访问
- 设置password
- 把bind的ip注释掉
- 重启redis
- 防火墙打开6379端口,或者直接关闭防火墙
注意:如果是用的云服务器(例如腾讯云),还需要在腾讯云的控制台开放6379端口,这里可以理解为两层防火墙,外层是云服务器厂商的。
Redis源码学习
版本:6.2.10
server守护进程实现
void daemonize(void) {
int fd;
if (fork() != 0) exit(0); /* parent exits */
setsid(); /* create a new session */
/* Every output goes to /dev/null. If Redis is daemonized but
* the 'logfile' is set to 'stdout' in the configuration file
* it will not log at all. */
if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
dup2(fd, STDIN_FILENO);
dup2(fd, STDOUT_FILENO);
dup2(fd, STDERR_FILENO);
if (fd > STDERR_FILENO) close(fd);
}
}
这里先fork,把主进程退出了,然后setsid创建新会话,关于setsid:
当进程是会话的领头进程时setsid()调用失败并返回(-1)。
setsid()调用成功后,返回新的会话的ID,调用setsid函数的进程成为新的会话的领头进程,并与其父进程的会话组和进程组脱离。
由于会话对控制终端的独占性,进程同时与控制终端脱离。
这里可以理解为终端启动程序(父进程),父进程创建子进程,但是当终端退出时子进程也会退出,如果setsid了,那么终端退出子进程依旧在。
参考:https://blog.51cto.com/u_3078781/3291624
再看下面:
if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
dup2(fd, STDIN_FILENO);
dup2(fd, STDOUT_FILENO);
dup2(fd, STDERR_FILENO);
if (fd > STDERR_FILENO) close(fd);
}
通过dup2将标准输入输出等定向到黑洞文件,可以理解为丢弃。
server处理信号
void initServer(void) {
// ....
signal(SIGHUP, SIG_IGN);
signal(SIGPIPE, SIG_IGN);
// ...
}
在对会话的概念有所了解之后,我们现在开始正式介绍一下SIGHUP信号,SIGHUP 信号在用户终端连接(正常或非正常)结束时发出, 通常是在终端的控制进程结束时, 通知同一session内的各个作业, 这时它们与控制终端不再关联. 系统对SIGHUP信号的默认处理是终止收到该信号的进程。所以若程序中没有捕捉该信号,当收到该信号时,进程就会退出。
在网络编程中,SIGPIPE这个信号是很常见的。当往一个写端关闭的管道或socket连接中连续写入数据时会引发SIGPIPE信号,引发SIGPIPE信号的写操作将设置errno为EPIPE。在TCP通信中,当通信的双方中的一方close一个连接时,若另一方接着发数据,根据TCP协议的规定,会收到一个RST响应报文,若再往这个服务器发送数据时,系统会发出一个SIGPIPE信号给进程,告诉进程这个连接已经断开了,不能再写入数据。
再看:
void setupSignalHandlers(void) {
struct sigaction act;
/* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction is used.
* Otherwise, sa_handler is used. */
sigemptyset(&act.sa_mask);
act.sa_flags = 0;
act.sa_handler = sigShutdownHandler;
sigaction(SIGTERM, &act, NULL);
sigaction(SIGINT, &act, NULL);
sigemptyset(&act.sa_mask);
act.sa_flags = SA_NODEFER | SA_RESETHAND | SA_SIGINFO;
act.sa_sigaction = sigsegvHandler;
if(server.crashlog_enabled) {
sigaction(SIGSEGV, &act, NULL);
sigaction(SIGBUS, &act, NULL);
sigaction(SIGFPE, &act, NULL);
sigaction(SIGILL, &act, NULL);
sigaction(SIGABRT, &act, NULL);
}
return;
}
sa_flags 用于指定信号处理的行为,通常设为0,表默认属性,它可以是以下值的按位与组合
SA_RESART 使被信号打断的系统调用自动重新发起(己废弃)
SA_NOCLDSTOP 使父进程在它的子进程暂停或继续运行时不会收到SIGCHLD信号
SA_NOCLDWAIT 使父进程在它的子进程退出时不会收到SIGCHLD信号,这时子进程如果退出也不
会成为僵尸进程
SA_NODEFER 使对信号的屏蔽无效,即在信号处理函数执行期间仍能发出这个信号
SA_RESETHAND 信号处理之后重新设置为默认处理方式
SA_SIGINFO 使用 sa_rigaction 成员而不是 sa_handler 作为信号处理函数
redis object
redis 非常注重内存消耗的,有些常用的对象,采用引用计数的方式进行复用:
void createSharedObjects(void) {
int j;
/* Shared command responses */
shared.crlf = createObject(OBJ_STRING,sdsnew("\r\n"));
shared.ok = createObject(OBJ_STRING,sdsnew("+OK\r\n"));
shared.emptybulk = createObject(OBJ_STRING,sdsnew("$0\r\n\r\n"));
shared.czero = createObject(OBJ_STRING,sdsnew(":0\r\n"));
shared.cone = createObject(OBJ_STRING,sdsnew(":1\r\n"));
shared.emptyarray = createObject(OBJ_STRING,sdsnew("*0\r\n"));
shared.pong = createObject(OBJ_STRING,sdsnew("+PONG\r\n"));
shared.queued = createObject(OBJ_STRING,sdsnew("+QUEUED\r\n"));
shared.emptyscan = createObject(OBJ_STRING,sdsnew("*2\r\n$1\r\n0\r\n*0\r\n"));
shared.space = createObject(OBJ_STRING,sdsnew(" "));
shared.colon = createObject(OBJ_STRING,sdsnew(":"));
shared.plus = createObject(OBJ_STRING,sdsnew("+"));
// .........
}
robj *createObject(int type, void *ptr) {
robj *o = zmalloc(sizeof(*o));
o->type = type;
o->encoding = OBJ_ENCODING_RAW;
o->ptr = ptr;
o->refcount = 1;
/* Set the LRU to the current lruclock (minutes resolution), or
* alternatively the LFU counter. */
if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
o->lru = (LFUGetTimeInMinutes()<<8) | LFU_INIT_VAL;
} else {
o->lru = LRU_CLOCK();
}
return o;
}
基本思想就是开辟一个堆空间分配好内容后,之后使用就不用反复在栈上分配了,直接使用。
redis 事件循环
bind + listen:
/* Open the TCP listening socket for the user commands. */
if (server.port != 0 &&
listenToPort(server.port,&server.ipfd) == C_ERR) {
/* Note: the following log text is matched by the test suite. */
serverLog(LL_WARNING, "Failed listening on port %u (TCP), aborting.", server.port);
exit(1);
}
accept:
通过下面这句注册listen的套接字到epoll,并设置listen套接字的回调
if (createSocketAcceptHandler(&server.ipfd, acceptTcpHandler) != C_OK) {
serverPanic("Unrecoverable error creating TCP socket accept handler.");
}
之后进main中的 aeMain(server.el);
,再进aeProcessEvents
。
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
// ........
numevents = aeApiPoll(eventLoop, tvp); // epoll wait
for (j = 0; j < numevents; j++) {
// .....
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
if (!invert && fe->mask & mask & AE_READABLE) {
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
}
/* Fire the writable event. */
if (fe->mask & mask & AE_WRITABLE) {
if (!fired || fe->wfileProc != fe->rfileProc) {
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
}
}
在之前我们已经往对应的文件描述符所在的aeFileEvent上注册了可读和可写的回调,这里会被调用。
在aeApiPoll中:
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval, numevents = 0;
retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
tvp ? (tvp->tv_sec*1000 + (tvp->tv_usec + 999)/1000) : -1);
if (retval > 0) {
int j;
numevents = retval;
for (j = 0; j < numevents; j++) {
int mask = 0;
struct epoll_event *e = state->events+j;
if (e->events & EPOLLIN) mask |= AE_READABLE;
if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
if (e->events & EPOLLERR) mask |= AE_WRITABLE|AE_READABLE;
if (e->events & EPOLLHUP) mask |= AE_WRITABLE|AE_READABLE;
eventLoop->fired[j].fd = e->data.fd; // 将活动的文件描述符添加到eventLoop fired中
eventLoop->fired[j].mask = mask;
}
}
return numevents;
}
redis db
/* Redis database representation. There are multiple databases identified
* by integers from 0 (the default database) up to the max configured
* database. The database number is the 'id' field in the structure. */
typedef struct redisDb {
dict *dict; /* The keyspace for this DB */
dict *expires; /* Timeout of keys with a timeout set */
dict *blocking_keys; /* Keys with clients waiting for data (BLPOP)*/
dict *ready_keys; /* Blocked keys that received a PUSH */
dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */
int id; /* Database ID */
long long avg_ttl; /* Average TTL, just for stats */
unsigned long expires_cursor; /* Cursor of the active expire cycle. */
list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */
} redisDb;
Redis启动有多个db可切换,其中由id属性区分。
redis rehash
/* Performs N steps of incremental rehashing. Returns 1 if there are still
* keys to move from the old to the new hash table, otherwise 0 is returned.
*
* Note that a rehashing step consists in moving a bucket (that may have more
* than one key as we use chaining) from the old to the new hash table, however
* since part of the hash table may be composed of empty spaces, it is not
* guaranteed that this function will rehash even a single bucket, since it
* will visit at max N*10 empty buckets in total, otherwise the amount of
* work it does would be unbound and the function may block for a long time. */
int dictRehash(dict *d, int n) {
int empty_visits = n*10; /* Max number of empty buckets to visit. */
if (!dictIsRehashing(d)) return 0;
while(n-- && d->ht[0].used != 0) {
dictEntry *de, *nextde;
/* Note that rehashidx can't overflow as we are sure there are more
* elements because ht[0].used != 0 */
assert(d->ht[0].size > (unsigned long)d->rehashidx);
while(d->ht[0].table[d->rehashidx] == NULL) {
d->rehashidx++;
if (--empty_visits == 0) return 1;
}
de = d->ht[0].table[d->rehashidx];
/* Move all the keys in this bucket from the old to the new hash HT */
while(de) {
uint64_t h;
nextde = de->next;
/* Get the index in the new hash table */
h = dictHashKey(d, de->key) & d->ht[1].sizemask;
de->next = d->ht[1].table[h];
d->ht[1].table[h] = de;
d->ht[0].used--;
d->ht[1].used++;
de = nextde;
}
d->ht[0].table[d->rehashidx] = NULL;
d->rehashidx++;
}
/* Check if we already rehashed the whole table... */
if (d->ht[0].used == 0) {
zfree(d->ht[0].table);
d->ht[0] = d->ht[1];
_dictReset(&d->ht[1]);
d->rehashidx = -1;
return 0;
}
/* More to rehash... */
return 1;
}
rehash每次停止条件为:
- 访问到空bucket次数总和超过某个值;
- hash1已经全部rehash;
rehash中rehashidx为核心变量,记录当前rehash到的位置(ht[1])。
IO
syncWrite
同步写模型:
ssize_t syncWrite(int fd, char *ptr, ssize_t size, long long timeout) {
ssize_t nwritten, ret = size;
long long start = mstime();
long long remaining = timeout;
while(1) {
long long wait = (remaining > SYNCIO__RESOLUTION) ?
remaining : SYNCIO__RESOLUTION;
long long elapsed;
/* Optimistically try to write before checking if the file descriptor
* is actually writable. At worst we get EAGAIN. */
nwritten = write(fd,ptr,size);
if (nwritten == -1) {
if (errno != EAGAIN) return -1;
} else {
ptr += nwritten;
size -= nwritten;
}
if (size == 0) return ret;
/* Wait */
aeWait(fd,AE_WRITABLE,wait);
elapsed = mstime() - start;
if (elapsed >= timeout) {
errno = ETIMEDOUT;
return -1;
}
remaining = timeout - elapsed;
}
}
这里比较值得注意的是,write的调用采用了循环写的形式。也就是write可能一次写不完数据,出现这种情况的原因有两种:
- 数据量过大:如果一次写入的数据量超过了操作系统缓冲区的大小,操作系统可能会将数据分成多次写入。在这种情况下,write 函数返回的写入字节数可能小于请求的字节数。
- 信号中断:当进程被信号中断时,write 函数可能会返回 -1 并设置 errno 为 EINTR。此时,应用程序可以再次调用 write 函数,将剩余的数据写入文件中。
因此,当使用 write 函数时,应该检查返回值以确定写入了多少字节,并在需要的情况下重复调用 write 函数,直到所有数据都被写入文件中为止。同时,需要确保对写入操作进行适当的错误处理,以避免出现潜在的问题。
RDB
着重看一下rdb.c中rdbSave函数:
int rdbSave(char *filename, rdbSaveInfo *rsi)
- 初始化rdb
fp = fopen(tmpfile,"w");
//....
rioInitWithFile(&rdb,fp);
// rioInitWithFile内部指定了IO模型并将fp绑定到rdb
rdb用的IO模型:
static const rio rioFileIO = {
rioFileRead,
rioFileWrite,
rioFileTell,
rioFileFlush,
NULL, /* update_checksum */
0, /* current checksum */
0, /* flags */
0, /* bytes read or written */
0, /* read/write chunk size */
{ { NULL, 0 } } /* union for io-specific vars */
};
我看了下,是基于C库的读写函数(fwrite和fread)。
- 进rdbSaveRio函数
if (rdbSaveRio(&rdb,&error,RDBFLAGS_NONE,rsi) == C_ERR) {
errno = error;
goto werr;
}
- 执行核心rdb备份逻辑(保存每个db的哈希表dict内的kv,调用rdbSaveKeyValuePair)
int rdbSaveRio(rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) {
dictIterator *di = NULL;
dictEntry *de;
char magic[10];
uint64_t cksum;
size_t processed = 0;
int j;
long key_count = 0;
long long info_updated_time = 0;
char *pname = (rdbflags & RDBFLAGS_AOF_PREAMBLE) ? "AOF rewrite" : "RDB";
if (server.rdb_checksum)
rdb->update_cksum = rioGenericUpdateChecksum;
snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION);
if (rdbWriteRaw(rdb,magic,9) == -1) goto werr;
if (rdbSaveInfoAuxFields(rdb,rdbflags,rsi) == -1) goto werr;
if (rdbSaveModulesAux(rdb, REDISMODULE_AUX_BEFORE_RDB) == -1) goto werr;
for (j = 0; j < server.dbnum; j++) {
redisDb *db = server.db+j;
dict *d = db->dict;
if (dictSize(d) == 0) continue;
di = dictGetSafeIterator(d);
/* Write the SELECT DB opcode */
if (rdbSaveType(rdb,RDB_OPCODE_SELECTDB) == -1) goto werr;
if (rdbSaveLen(rdb,j) == -1) goto werr;
/* Write the RESIZE DB opcode. */
uint64_t db_size, expires_size;
db_size = dictSize(db->dict);
expires_size = dictSize(db->expires);
if (rdbSaveType(rdb,RDB_OPCODE_RESIZEDB) == -1) goto werr;
if (rdbSaveLen(rdb,db_size) == -1) goto werr;
if (rdbSaveLen(rdb,expires_size) == -1) goto werr;
/* Iterate this DB writing every entry */
while((de = dictNext(di)) != NULL) {
sds keystr = dictGetKey(de);
robj key, *o = dictGetVal(de);
long long expire;
initStaticStringObject(key,keystr);
expire = getExpire(db,&key);
if (rdbSaveKeyValuePair(rdb,&key,o,expire) == -1) goto werr;
/* When this RDB is produced as part of an AOF rewrite, move
* accumulated diff from parent to child while rewriting in
* order to have a smaller final write. */
if (rdbflags & RDBFLAGS_AOF_PREAMBLE &&
rdb->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES)
{
processed = rdb->processed_bytes;
aofReadDiffFromParent();
}
/* Update child info every 1 second (approximately).
* in order to avoid calling mstime() on each iteration, we will
* check the diff every 1024 keys */
if ((key_count++ & 1023) == 0) {
long long now = mstime();
if (now - info_updated_time >= 1000) {
sendChildInfo(CHILD_INFO_TYPE_CURRENT_INFO, key_count, pname);
info_updated_time = now;
}
}
}
dictReleaseIterator(di);
di = NULL; /* So that we don't release it again on error. */
}
/* If we are storing the replication information on disk, persist
* the script cache as well: on successful PSYNC after a restart, we need
* to be able to process any EVALSHA inside the replication backlog the
* master will send us. */
if (rsi && dictSize(server.lua_scripts)) {
di = dictGetIterator(server.lua_scripts);
while((de = dictNext(di)) != NULL) {
robj *body = dictGetVal(de);
if (rdbSaveAuxField(rdb,"lua",3,body->ptr,sdslen(body->ptr)) == -1)
goto werr;
}
dictReleaseIterator(di);
di = NULL; /* So that we don't release it again on error. */
}
if (rdbSaveModulesAux(rdb, REDISMODULE_AUX_AFTER_RDB) == -1) goto werr;
/* EOF opcode */
if (rdbSaveType(rdb,RDB_OPCODE_EOF) == -1) goto werr;
/* CRC64 checksum. It will be zero if checksum computation is disabled, the
* loading code skips the check in this case. */
cksum = rdb->cksum;
memrev64ifbe(&cksum);
if (rioWrite(rdb,&cksum,8) == 0) goto werr;
return C_OK;
werr:
if (error) *error = errno;
if (di) dictReleaseIterator(di);
return C_ERR;
}
分布式锁
应用场景:
Redis 的 SET 命令有个 NX 参数可以实现「key不存在才插入」,所以可以用它来实现分布式锁:
- 如果 key 不存在,则显示插入成功,可以用来表示加锁成功;
- 如果 key 存在,则会显示插入失败,可以用来表示加锁失败。
基于 Redis 节点实现分布式锁时,对于加锁操作,我们需要满足三个条件。
- 加锁包括了读取锁变量、检查锁变量值和设置锁变量值三个操作,但需要以原子操作的方式完成,所以,我们使用 SET 命令带上 NX 选项来实现加锁;
- 锁变量需要设置过期时间,以免客户端拿到锁后发生异常,导致锁一直无法释放,所以,我们在 SET 命令执行时加上 EX/PX 选项,设置其过期时间;
- 锁变量的值需要能区分来自不同客户端的加锁操作,以免在释放锁时,出现误释放操作,所以,我们使用 SET 命令设置锁变量值时,每个客户端设置的值是一个唯一值,用于标识客户端;
满足这三个条件的分布式命令如下:
SET lock_key unique_value NX PX 10000
- lock_key 就是 key 键;
- unique_value 是客户端生成的唯一的标识,区分来自不同客户端的锁操作;
- NX 代表只在 lock_key 不存在时,才对 lock_key 进行设置操作;
- PX 10000 表示设置 lock_key 的过期时间为 10s,这是为了避免客户端发生异常而无法释放锁。
而解锁的过程就是将 lock_key 键删除(del lock_key),但不能乱删,要保证执行操作的客户端就是加锁的客户端。所以,解锁的时候,我们要先判断锁的 unique_value 是否为加锁客户端,是的话,才将 lock_key 键删除。
可以看到,解锁是有两个操作,这时就需要 Lua 脚本来保证解锁的原子性,因为 Redis 在执行 Lua 脚本时,可以以原子性的方式执行,保证了锁释放操作的原子性。
// 释放锁时,先比较 unique_value 是否相等,避免锁的误释放
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end
这样一来,就通过使用 SET 命令和 Lua 脚本在 Redis 单节点上完成了分布式锁的加锁和解锁。
java下实现分布式锁的案例:https://www.cnblogs.com/Howinfun/p/11803747.html
延时队列
ZSet 多了一个分值(score)属性,用它来存储时间戳,用多个线程轮询Zset获取到期的任务进行处理,以此来实现延迟消息队列等,步骤如下:
- 利用
zadd
向集合中插入元素,以元素的时间戳(超时时间)作为 score - 利用
zrangebyscore
以0 < score <= 当前时间戳
进行获取需要处理的元素 - 当有满足的条件的元素, 先删除
zrem
该元素(保证不被其他进程取到),再进行业务逻辑处理;
**问题一:**延迟队列满足条件的元素为空(或者集合为空)时候,进程会频繁不断向 redis 服务获取满足条件元素,这样会造成 redis 服务资源占用和浪费 ->可以在没有取到满足的条件时候让程序阻塞一段时间unsleep(10000)
,这种方法实际上就是用时间换取资源,注意控制阻塞时间长短,不宜太短,也不宜太长(影响即时性)
**问题二:**有可能出现 zrangebyscore
和 zrem
非一个客户端,即原子性问题 -> 采用 lua 脚本解决
缺点:
- 消息没有持久化,如果服务器宕机或重启,消息将会丢失
- 没有ACK机制,如果消费失败,消息会丢失
- 没有队列监控、出入对性能差
Redis连环问
Redis优缺点?
优点:
- 基于内存操作,内存读写速度快。
- 支持多种数据类型,包括String、Hash、List、Set、ZSet等。
- 支持持久化。Redis支持RDB和AOF两种持久化机制,持久化功能可以有效地避免数据丢失问题。
- 支持事务。Redis的所有操作都是原子性的,同时Redis还支持对几个操作合并后的原子性执行。
- 支持主从复制。主节点会自动将数据同步到从节点,可以进行读写分离。
- Redis命令的处理是单线程的。Redis6.0引入了多线程,需要注意的是,多线程用于处理网络数据的读写和协议解析,Redis命令执行还是单线程的。
缺点:
- 对结构化查询的支持比较差。
- 数据库容量受到物理内存的限制,不适合用作海量数据的高性能读写,因此Redis适合的场景主要局限在较小数据量的操作。
- Redis 较难支持在线扩容,在集群容量达到上限时在线扩容会变得很复杂。
Redis为什么这么快?
- 基于内存:Redis是使用内存存储,没有磁盘IO上的开销。数据存在内存中,读写速度快。
- IO多路复用模型:Redis 采用 IO 多路复用技术。Redis 使用单线程来轮询描述符,将数据库的操作都转换成了事件,不在网络I/O上浪费过多的时间。
- 高效的数据结构:Redis 每种数据类型底层都做了优化,目的就是为了追求更快的速度。
Redis应用场景有哪些?
- 缓存热点数据,缓解数据库的压力。
- 利用 Redis 原子性的自增操作,可以实现计数器的功能,比如统计用户点赞数、用户访问数等。
- 分布式锁。在分布式场景下,无法使用单机环境下的锁来对多个节点上的进程进行同步。可以使用 Redis 自带的 SETNX 命令实现分布式锁,除此之外,还可以使用官方提供的 RedLock 分布式锁实现。
- 简单的消息队列,可以使用Redis自身的发布/订阅模式或者List来实现简单的消息队列,实现异步操作。
- 限速器,可用于限制某个用户访问某个接口的频率,比如秒杀场景用于防止用户快速点击带来不必要的压力。
- 好友关系,利用集合的一些命令,比如交集、并集、差集等,实现共同好友、共同爱好之类的功能。
Redis 数据类型有哪些?
基本数据类型:
1、String:最常用的一种数据类型,String类型的值可以是字符串、数字或者二进制,但值最大不能超过512MB。
2、Hash:Hash 是一个键值对集合。
3、Set:无序去重的集合。Set 提供了交集、并集等方法,对于实现共同好友、共同关注等功能特别方便。
4、List:有序可重复的集合,底层是依赖双向链表实现的。
5、SortedSet:有序Set。内部维护了一个score
的参数来实现。适用于排行榜和带权重的消息队列等场景。
特殊的数据类型:
1、Bitmap:位图,可以认为是一个以位为单位数组,数组中的每个单元只能存0或者1,数组的下标在 Bitmap 中叫做偏移量。Bitmap的长度与集合中元素个数无关,而是与基数的上限有关。
2、Hyperloglog。HyperLogLog 是用来做基数统计的算法,其优点是,在输入元素的数量或者体积非常非常大时,计算基数所需的空间总是固定的、并且是很小的。典型的使用场景是统计独立访客。
3、Geospatial :主要用于存储地理位置信息,并对存储的信息进行操作,适用场景如定位、附近的人等。
Redis事务支持隔离性吗?
Redis 是单进程程序,并且它保证在执行事务时,不会对事务进行中断,事务可以运行直到执行完所有事务队列中的命令为止。因此,Redis 的事务是总是带有隔离性的。
Redis事务保证原子性吗,支持回滚吗?
Redis单条命令是原子性执行的,但事务不保证原子性,且没有回滚。事务中任意命令执行失败,其余的命令仍会被执行。
Redis有哪些部署方案?
单机版:单机部署,单机redis能够承载的 QPS 大概就在上万到几万不等。这种部署方式很少使用。存在的问题:1、内存容量有限 2、处理能力有限 3、无法高可用。
主从模式:一主多从,主负责写,并且将数据复制到其它的 slave 节点,从节点负责读。所有的读请求全部走从节点。这样也可以很轻松实现水平扩容,支撑读高并发。master 节点挂掉后,需要手动指定新的 master,可用性不高,基本不用。
哨兵模式:主从复制存在不能自动故障转移、达不到高可用的问题。哨兵模式解决了这些问题。通过哨兵机制可以自动切换主从节点。master 节点挂掉后,哨兵进程会主动选举新的 master,可用性高,但是每个节点存储的数据是一样的,浪费内存空间。数据量不是很多,集群规模不是很大,需要自动容错容灾的时候使用。
Redis cluster:服务端分片技术,3.0版本开始正式提供。Redis Cluster并没有使用一致性hash,而是采用slot(槽)的概念,一共分成16384个槽。将请求发送到任意节点,接收到请求的节点会将查询请求发送到正确的节点上执行。主要是针对海量数据+高并发+高可用的场景,如果是海量数据,如果你的数据量很大,那么建议就用Redis cluster,所有主节点的容量总和就是Redis cluster可缓存的数据容量。
主从架构
单机的 redis,能够承载的 QPS 大概就在上万到几万不等。对于缓存来说,一般都是用来支撑读高并发的。因此架构做成主从(master-slave)架构,一主多从,主负责写,并且将数据复制到其它的 slave 节点,从节点负责读。所有的读请求全部走从节点。这样也可以很轻松实现水平扩容,支撑读高并发。
Redis的复制功能是支持多个数据库之间的数据同步。主数据库可以进行读写操作,当主数据库的数据发生变化时会自动将数据同步到从数据库。从数据库一般是只读的,它会接收主数据库同步过来的数据。一个主数据库可以有多个从数据库,而一个从数据库只能有一个主数据库。
主从复制的原理?
- 当启动一个从节点时,它会发送一个
PSYNC
命令给主节点; - 如果是从节点初次连接到主节点,那么会触发一次全量复制。此时主节点会启动一个后台线程,开始生成一份
RDB
快照文件; - 同时还会将从客户端 client 新收到的所有写命令缓存在内存中。
RDB
文件生成完毕后, 主节点会将RDB
文件发送给从节点,从节点会先将RDB
文件写入本地磁盘,然后再从本地磁盘加载到内存中; - 接着主节点会将内存中缓存的写命令发送到从节点,从节点同步这些数据;
- 如果从节点跟主节点之间网络出现故障,连接断开了,会自动重连,连接之后主节点仅会将部分缺失的数据同步给从节点。
哨兵Sentinel
主从复制存在不能自动故障转移、达不到高可用的问题。哨兵模式解决了这些问题。通过哨兵机制可以自动切换主从节点。
客户端连接Redis的时候,先连接哨兵,哨兵会告诉客户端Redis主节点的地址,然后客户端连接上Redis并进行后续的操作。当主节点宕机的时候,哨兵监测到主节点宕机,会重新推选出某个表现良好的从节点成为新的主节点,然后通过发布订阅模式通知其他的从服务器,让它们切换主机。
工作原理
- 每个
Sentinel
以每秒钟一次的频率向它所知道的Master
,Slave
以及其他Sentinel
实例发送一个PING
命令。 - 如果一个实例距离最后一次有效回复
PING
命令的时间超过指定值, 则这个实例会被Sentine
标记为主观下线。 - 如果一个
Master
被标记为主观下线,则正在监视这个Master
的所有Sentinel
要以每秒一次的频率确认Master
是否真正进入主观下线状态。 - 当有足够数量的
Sentinel
(大于等于配置文件指定值)在指定的时间范围内确认Master
的确进入了主观下线状态, 则Master
会被标记为客观下线 。若没有足够数量的Sentinel
同意Master
已经下线,Master
的客观下线状态就会被解除。 若Master
重新向Sentinel
的PING
命令返回有效回复,Master
的主观下线状态就会被移除。 - 哨兵节点会选举出哨兵 leader,负责故障转移的工作。
- 哨兵 leader 会推选出某个表现良好的从节点成为新的主节点,然后通知其他从节点更新主节点信息。
Redis cluster
哨兵模式解决了主从复制不能自动故障转移、达不到高可用的问题,但还是存在主节点的写能力、容量受限于单机配置的问题。而cluster模式实现了Redis的分布式存储,每个节点存储不同的内容,解决主节点的写能力、容量受限于单机配置的问题。
Redis cluster集群节点最小配置6个节点以上(3主3从),其中主节点提供读写操作,从节点作为备用节点,不提供请求,只作为故障转移使用。
Redis cluster采用虚拟槽分区,所有的键根据哈希函数映射到0~16383个整数槽内,每个节点负责维护一部分槽以及槽所映射的键值数据。
工作原理:
- 通过哈希的方式,将数据分片,每个节点均分存储一定哈希槽(哈希值)区间的数据,默认分配了16384 个槽位
- 每份数据分片会存储在多个互为主从的多节点上
- 数据写入先写主节点,再同步到从节点(支持配置为阻塞同步)
- 同一分片多个节点间的数据不保持一致性
- 读取数据时,当客户端操作的key没有分配在该节点上时,redis会返回转向指令,指向正确的节点
- 扩容时时需要需要把旧节点的数据迁移一部分到新节点
过期键的删除策略?
1、被动删除。在访问key时,如果发现key已经过期,那么会将key删除。
2、主动删除。定时清理key,每次清理会依次遍历所有DB,从db随机取出20个key,如果过期就删除,如果其中有5个key过期,那么就继续对这个db进行清理,否则开始清理下一个db。
3、内存不够时清理。Redis有最大内存的限制,通过maxmemory参数可以设置最大内存,当使用的内存超过了设置的最大内存,就要进行内存释放, 在进行内存释放的时候,会按照配置的淘汰策略清理内存。
内存淘汰策略有哪些?
当Redis的内存超过最大允许的内存之后,Redis 会触发内存淘汰策略,删除一些不常用的数据,以保证Redis服务器正常运行。
Redisv4.0前提供 6 种数据淘汰策略:
- volatile-lru:LRU(
Least Recently Used
),最近使用。利用LRU算法移除设置了过期时间的key - allkeys-lru:当内存不足以容纳新写入数据时,从数据集中移除最近最少使用的key
- volatile-ttl:从已设置过期时间的数据集中挑选将要过期的数据淘汰
- volatile-random:从已设置过期时间的数据集中任意选择数据淘汰
- allkeys-random:从数据集中任意选择数据淘汰
- no-eviction:禁止删除数据,当内存不足以容纳新写入数据时,新写入操作会报错
Redisv4.0后增加以下两种:
- volatile-lfu:LFU,Least Frequently Used,最少使用,从已设置过期时间的数据集中挑选最不经常使用的数据淘汰。
- allkeys-lfu:当内存不足以容纳新写入数据时,从数据集中移除最不经常使用的key。
内存淘汰策略可以通过配置文件来修改,相应的配置项是maxmemory-policy
,默认配置是noeviction
。
Redis大key怎么处理?
通常我们会将含有较大数据或含有大量成员、列表数的Key称之为大Key。
以下是对各个数据类型大key的描述:
- value是STRING类型,它的值超过5MB
- value是ZSET、Hash、List、Set等集合类型时,它的成员数量超过1w个
上述的定义并不绝对,主要是根据value的成员数量和大小来确定,根据业务场景确定标准。
怎么处理:
- 当vaule是string时,可以使用序列化、压缩算法将key的大小控制在合理范围内,但是序列化和反序列化都会带来更多时间上的消耗。或者将key进行拆分,一个大key分为不同的部分,记录每个部分的key,使用multiget等操作实现事务读取。
- 当value是list/set等集合类型时,根据预估的数据规模来进行分片,不同的元素计算后分到不同的片。
为什么 Redis 集群的最大槽数是 16384 个?
Redis Cluster 采用数据分片机制,定义了 16384个 Slot槽位,集群中的每个Redis 实例负责维护一部分槽以及槽所映射的键值数据。
Redis每个节点之间会定期发送ping/pong消息(心跳包包含了其他节点的数据),用于交换数据信息。
Redis集群的节点会按照以下规则发ping消息:
- 每秒会随机选取5个节点,找出最久没有通信的节点发送ping消息
- 每100毫秒都会扫描本地节点列表,如果发现节点最近一次接受pong消息的时间大于cluster-node-timeout/2 则立刻发送ping消息
心跳包的消息头里面有个myslots的char数组,是一个bitmap,每一个位代表一个槽,如果该位为1,表示这个槽是属于这个节点的。
接下来,解答为什么 Redis 集群的最大槽数是 16384 个,而不是65536 个。
1、如果采用 16384 个插槽,那么心跳包的消息头占用空间 2KB (16384/8);如果采用 65536 个插槽,那么心跳包的消息头占用空间 8KB (65536/8)。可见采用 65536 个插槽,发送心跳信息的消息头达8k,比较浪费带宽。
2、一般情况下一个Redis集群不会有超过1000个master节点,太多可能导致网络拥堵。
3、哈希槽是通过一张bitmap的形式来保存的,在传输过程中,会对bitmap进行压缩。bitmap的填充率越低,压缩率越高。其中bitmap 填充率 = slots / N (N表示节点数)。所以,插槽数越低, 填充率会降低,压缩率会提高。
rdb和aof的优势与劣势
RDB文件保存过程
- redis调用fork,现在有了子进程和父进程。
- 父进程继续处理client请求,子进程负责将内存内容写入到临时文件。由于os的写时复制机制(copy on write)父子进程会共享相同的物理页面,当父进程处理写请求时os会为父进程要修改的页面创建副本,而不是写共享的页面。所以子进程的地址空间内的数 据是fork时刻整个数据库的一个快照。
- 当子进程将快照写入临时文件完毕后,用临时文件替换原来的快照文件,然后子进程退出。
client 也可以使用save或者bgsave命令通知redis做一次快照持久化。save操作是在主线程中保存快照的,由于redis是用一个主线程来处理所有 client的请求,这种方式会阻塞所有client请求。所以不推荐使用。
另一点需要注意的是,每次快照持久化都是将内存数据完整写入到磁盘一次,并不 是增量的只同步脏数据。如果数据量大的话,而且写操作比较多,必然会引起大量的磁盘io操作,可能会严重影响性能。
优势
- 一旦采用该方式,那么你的整个Redis数据库将只包含一个文件,这样非常方便进行备份。比如你可能打算没1天归档一些数据。
- 方便备份,我们可以很容易的将一个一个RDB文件移动到其他的存储介质上
- RDB 在恢复大数据集时的速度比 AOF 的恢复速度要快。
- RDB 可以最大化 Redis 的性能:父进程在保存 RDB 文件时唯一要做的就是 fork 出一个子进程,然后这个子进程就会处理接下来的所有保存工作,父进程无须执行任何磁盘 I/O 操作。
劣势
- 如果你需要尽量避免在服务器故障时丢失数据,那么 RDB 不适合你。 虽然 Redis 允许你设置不同的保存点(save point)来控制保存 RDB 文件的频率, 但是, 因为RDB 文件需要保存整个数据集的状态, 所以它并不是一个轻松的操作。 因此你可能会至少 5 分钟才保存一次 RDB 文件。 在这种情况下, 一旦发生故障停机, 你就可能会丢失好几分钟的数据。
- 每次保存 RDB 的时候,Redis 都要 fork() 出一个子进程,并由子进程来进行实际的持久化工作。 在数据集比较庞大时, fork() 可能会非常耗时,造成服务器在某某毫秒内停止处理客户端; 如果数据集非常巨大,并且 CPU 时间非常紧张的话,那么这种停止时间甚至可能会长达整整一秒。 虽然 AOF 重写也需要进行 fork() ,但无论 AOF 重写的执行间隔有多长,数据的耐久性都不会有任何损失。
AOF文件保存过程
redis会将每一个收到的写命令都通过write函数追加到文件中(默认是 appendonly.aof)。
当redis重启时会通过重新执行文件中保存的写命令来在内存中重建整个数据库的内容。当然由于os会在内核中缓存 write做的修改,所以可能不是立即写到磁盘上。这样aof方式的持久化也还是有可能会丢失部分修改。不过我们可以通过配置文件告诉redis我们想要 通过fsync函数强制os写入到磁盘的时机。有三种方式如下(默认是:每秒fsync一次)
appendonly yes //启用aof持久化方式#
appendfsync always //每次收到写命令就立即强制写入磁盘,最慢的,但是保证完全的持久化,不推荐使用
appendfsync everysec //每秒钟强制写入磁盘一次,在性能和持久化方面做了很好的折中,推荐#
appendfsync no //完全依赖os,性能最好,持久化没保证
复制aof 的方式也同时带来了另一个问题。持久化文件会变的越来越大。例如我们调用incr test命令100次,文件中必须保存全部的100条命令,其实有99条都是多余的。因为要恢复数据库的状态其实文件中保存一条set test 100就够了。
为了压缩aof的持久化文件。redis提供了bgrewriteaof命令。收到此命令redis将使用与快照类似的方式将内存中的数据 以命令的方式保存到临时文件中,最后替换原来的文件。具体过程如下
- redis调用fork ,现在有父子两个进程
- 子进程根据内存中的数据库快照,往临时文件中写入重建数据库状态的命令
- 父进程继续处理client请求,除了把写命令写入到原来的aof文件中。同时把收到的写命令缓存起来。这样就能保证如果子进程重写失败的话并不会出问题。
- 当子进程把快照内容写入已命令方式写到临时文件中后,子进程发信号通知父进程。然后父进程把缓存的写命令也写入到临时文件。
- 现在父进程可以使用临时文件替换老的aof文件,并重命名,后面收到的写命令也开始往新的aof文件中追加。
需要注意到是重写aof文件的操作,并没有读取旧的aof文件,而是将整个内存中的数据库内容用命令的方式重写了一个新的aof文件,这点和快照有点类似。
优势
- 使用 AOF 持久化会让 Redis 变得非常耐久(much more durable):你可以设置不同的 fsync 策略,比如无 fsync ,每秒钟一次 fsync ,或者每次执行写入命令时 fsync 。 AOF 的默认策略为每秒钟 fsync 一次,在这种配置下,Redis 仍然可以保持良好的性能,并且就算发生故障停机,也最多只会丢失一秒钟的数据( fsync 会在后台线程执行,所以主线程可以继续努力地处理命令请求)。
- AOF 文件是一个只进行追加操作的日志文件(append only log), 因此对 AOF 文件的写入不需要进行 seek , 即使日志因为某些原因而包含了未写入完整的命令(比如写入时磁盘已满,写入中途停机,等等), redis-check-aof 工具也可以轻易地修复这种问题。
Redis 可以在 AOF 文件体积变得过大时,自动地在后台对 AOF 进行重写: 重写后的新 AOF 文件包含了恢复当前数据集所需的最小命令集合。 整个重写操作是绝对安全的,因为 Redis 在创建新 AOF 文件的过程中,会继续将命令追加到现有的 AOF 文件里面,即使重写过程中发生停机,现有的 AOF 文件也不会丢失。 而一旦新 AOF 文件创建完毕,Redis 就会从旧 AOF 文件切换到新 AOF 文件,并开始对新 AOF 文件进行追加操作。 - AOF 文件有序地保存了对数据库执行的所有写入操作, 这些写入操作以 Redis 协议的格式保存, 因此 AOF 文件的内容非常容易被人读懂, 对文件进行分析(parse)也很轻松。 导出(export) AOF 文件也非常简单: 举个例子, 如果你不小心执行了 FLUSHALL 命令, 但只要 AOF 文件未被重写, 那么只要停止服务器, 移除 AOF 文件末尾的 FLUSHALL 命令, 并重启 Redis , 就可以将数据集恢复到 FLUSHALL 执行之前的状态。
劣势
- 对于相同的数据集来说,AOF 文件的体积通常要大于 RDB 文件的体积。
- 根据所使用的 fsync 策略,AOF 的速度可能会慢于 RDB 。 在一般情况下, 每秒 fsync 的性能依然非常高, 而关闭 fsync 可以让 AOF 的速度和 RDB 一样快, 即使在高负荷之下也是如此。 不过在处理巨大的写入载入时,RDB 可以提供更有保证的最大延迟时间(latency)。
- AOF 在过去曾经发生过这样的 bug : 因为个别命令的原因,导致 AOF 文件在重新载入时,无法将数据集恢复成保存时的原样。 (举个例子,阻塞命令 BRPOPLPUSH 就曾经引起过这样的 bug 。) 测试套件里为这种情况添加了测试: 它们会自动生成随机的、复杂的数据集, 并通过重新载入这些数据来确保一切正常。 虽然这种 bug 在 AOF 文件中并不常见, 但是对比来说, RDB 几乎是不可能出现这种 bug 的。
Redis windows客户端
RedisClient:
https://github.com/caoxinyu/RedisClient/issues/61
Redis Mac客户端
推荐QuickRedis,免费开源。
https://gitee.com/quick123official/quick_redis_blog/
docker启动redis
- docker pull redis
- mkdir /docker-data/redis
- wget http://download.redis.io/redis-stable/redis.conf
- chmod 777 redis.conf
- vi /docker-data/redis/redis.conf
bind 127.0.0.1 # 这行要注释掉,解除本地连接限制
protected-mode no # 默认yes,如果设置为yes,则只允许在本机的回环连接,其他机器无法连接。
daemonize no # 默认no 为不守护进程模式,docker部署不需要改为yes,docker run -d本身就是后台启动,不然会冲突
requirepass 123456 # 设置密码
appendonly yes # 持久化
docker run --name redis \
-p 6379:6379 \
-v /docker-data/redis/redis.conf:/etc/redis/redis.conf \
-v /docker-data/redis:/data \
-d redis redis-server /etc/redis/redis.conf --appendonly yes
说明:
- p 6379:6379:端口映射,前面是宿主机,后面是容器。
- –name redis:指定该容器名称。
- v 挂载文件或目录:前面是宿主机,后面是容器。
- d redis redis-server /etc/redis/redis.conf:表示后台启动redis,以配置文件启动redis,加载容器内的conf文件。
- appendonly yes:开启redis 持久化。