目录
- 1 交互方式 pipline
- 2 广播机制
- 2.1 概念演示
- 2.2 使用场景
- 3 redis事物
- 3.1 概念
- 3.2 使用场景
- 3.3 解决的问题
- 3.3.1 背景:多线程竞争出现问题
- 3.3.2 事务
- 3.3.3 安全性事务
- 3.4两种类型的“事务”
- 3.4.1 watch ... multi exec
- 3.4.2 lua 脚本实现“原子”执行,重点掌握
- 3.4.3 watch multi exec 与 lua 脚本 的区别
- 4 redis联通,通过hiredis压缩协议,解析协议,扩展事件处理
- 4.1 hiredis 安装
- 4.2 hiredis : 如何引入自己项目中
- 1 局限性: 一定要使用reactor模型。因为在hiredis中,具体的IO操作是hiredis实现的,
- 2 项目中要访问redis
- 2.1 \redis\hiredis\examples example-libevent.c
- 2.2 \redis\hiredis\adapters libevent.h 适配libevent网络库
- 2.3 流程备注
1 交互方式 pipline
pipeline 一次性发送多个命令,可以节约时间 (相对于客户端一次发一个命令,然后服务端回复一条命令)
异步连接 不需要规划, 带一个回调函数
时间窗口限流 ------------> 测试代码 github 地址 找mark 老师要
客户端缓存,然后一起发出(这里不是redis缓存)
2 广播机制
2.1 概念演示
发布者与注册者之间是1:n的关系,类似于观察者模式。
下图中两个客户端监听 “n1”, 第三个客户端向"n1"发布消息:
还可以是监听 n1.*, 用命令psubscibe
实际使用redis的时候,会与redis server建立两条连接,一条专门订阅(一直处于等待状态,相当于阻塞了),另一条用来处理种命令。
2.2 使用场景
缺陷:不保证消息一定到达(比如节点宕机),限制了发布订阅的使用场景
哪些场景可以允许丢失?-----用发布订阅模式
比如:游戏服务端发公告
反之:用kafka: 分布式消息队列: 能确保消息一定到达 (kafka 是工业级的,后续再写学习心得)
redis: stream 也可以,但是不推荐
3 redis事物
3.1 概念
用户定义一系列数据库操作,这些操作视为一个完整的逻辑处理工作单元,要么全部执行,要么全部不执行,是不可分割的工作单元。
工作中没有使用过很复杂的数据库场景,从而就用不到事物
3.2 使用场景
在什么场景下探讨事物? ----> 并发连接的场景:多条连接处理相同的功能
单条连接,多条连接处理不同的功能: 不需要考虑事物
3.3 解决的问题
3.3.1 背景:多线程竞争出现问题
client1: 1 get count : 1
2 set count 2
client2: 也要同时去操作count
3 set count 3
eg1 不希望 eg2影响它
希望的结果是 3 12 或者 12 3, 而不是 1 3 2。
1和2要作为一个整体执行,由此可以联想到 C/C++ 的原子性,但是redis中对应的是“事务”,事务可以将12绑定在一起。
3.3.2 事务
为解决并发连接互相干扰的问题,“事务”来了
事务是整体同生共死,假如有ABCD四个节点与服务器连接,每一个节点中都可能有事物,其中任意一个节点都有可能宕机,为了保证事物的安全性,怎么办?==> 安全性事务。
3.3.3 安全性事务
安全性事务具备 ACID特性:(引用3.3.1中的 命令1 2 3 )
A 原子性:要么都成功,要么都失败。
遇到失败了,需要回退到执行之前的状态。
C 完整检测一致:key 已经是一个string类型,不能当作另一个类型(如list, set)去操作。
逻辑上的一致 1 3 2 就破坏了逻辑性(相对于 3 12 或者 12 3)。
I 隔离性:client1 client2应该隔离,需要加锁,串行性执行。
锁事务(原子变量是 锁总线)。
后续:mysql: MVCC 非一致锁定夺。
D 持久性:数据刷到磁盘。
lua脚本满足原子性和隔离性,一致性和持久性不满足
3.4两种类型的“事务”
3.4.1 watch … multi exec
语法:
watch
语句
multi #开启事务
n个语句
# 告诉redis视为一个整体(n个语句依次入队,这里是redis服务端的行为,与pipeline不一样),保证里面的key不会被修改,否则不提交事务,直接返回nil
exec # 提交事务 redis服务端,依次执行n个语句的队列,因为是服务端的行为,所以可以控制不会被打断
eg:
WATCH zset
element = ZRANGE zset 0 0
MULTI
ZREM zset element
EXEC
说明:
multi 与 exec之间 : redis 将事物进行缓存 (acid) , 然后通过缓存命令统一执行。
用的就是 redis pipeline 技术: 客户端缓存命令统一发送。
但这些可以通过lua脚本去执行,因为redis中含有lua虚拟机,lua可以实时的拿到redis内存数据,引出下一节。
3.4.2 lua 脚本实现“原子”执行,重点掌握
redis 中加载了一个 lua 虚拟机;用来执行 redis lua 脚本;redislua 脚本的执行是原子性的;当某个脚本正在执行的时候,不会有其他命令或者脚本被执行;
lua 脚本当中的命令会直接修改数据状态;
lua 脚本 mysql 存储区别:MySQL存储过程不具备事务性,所以也不具备原子性;
注意:如果项目中使用了 lua 脚本,不需要使用上面的事务命令;
eg: eval后面的就是lua脚本
27.0.0.1:6379> set score:10001 100
OK
127.0.0.1:6379> eval 'local key = KEYS[1]; local val = redis.call("get", key); redis.call("set", key, 2*val); return 2*val;' 1 score:10001
(integer) 200
127.0.0.1:6379> eval 'local key = KEYS[1]; local val = redis.call("get", key); redis.call("set", key, 2*val); return 2*val;' 1 score:10001
(integer) 400
127.0.0.1:6379> eval 'local key = KEYS[1]; local val = redis.call("get", key); redis.call("set", key, 2*val); return 2*val;' 1 score:10001
(integer) 800
语法:EVAL script numkeys key [key ...] arg [arg ...]
numkeys 对应上面例子中的1,KEYS[1]表示 score:10001, 可选参数arg没有使用到。
redis.call 相当于调用redis命令
EVAL的第二个参数是参数的个数,后面的参数(从第三个参数),表示在脚本中所用到的那些 Redis 键(key),这些键名参数可以在 Lua 中通过全局变量 KEYS 数组,用 1 为基址的形式访问( KEYS[1] , KEYS[2] ,以此类推)。
3.4.3 watch multi exec 与 lua 脚本 的区别
1 multi exec 中间的事物语句拿不到结果
语句之间不能构成依赖逻辑关系 ===> 工作中几乎不会用到
2 相对的 lua都可以 。但是redis 没有回滚机制, lua脚本中有一部分语句执行成功,已经生效,后面的语句报错,与3.3.3.A里面提到的“遇到失败了,需要回退到执行之前的状态”不符。
要实现完整的原子性,lua需要自己加代码,在报错的地方加上类似 if err 写代码回退 ,回滚到事物之前的状态。
eg:
local res, err = redis.pcall(...)
if (err) {...自己写回滚的代码}
4 redis联通,通过hiredis压缩协议,解析协议,扩展事件处理
4.1 hiredis 安装
进入到 redis/deps/hiredis
make
sudo make install
4.2 hiredis : 如何引入自己项目中
1 局限性: 一定要使用reactor模型。因为在hiredis中,具体的IO操作是hiredis实现的,
也就说读数据read, 写数据write, 建立连接connect都是在hiredis中完成.
不可以使用proactor网络模型是因为 read write connect等操作都是在再proactor中完成,不是在hiredis中完成。
2 项目中要访问redis
参考
2.1 \redis\hiredis\examples example-libevent.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <signal.h>
#include <hiredis.h>
#include <async.h>
#include <adapters/libevent.h>
void getCallback(redisAsyncContext *c, void *r, void *privdata) {
redisReply *reply = r;
if (reply == NULL) {
if (c->errstr) {
printf("errstr: %s\n", c->errstr);
}
return;
}
printf("argv[%s]: %s\n", (char*)privdata, reply->str);
/* Disconnect after receiving the reply to GET */
redisAsyncDisconnect(c);
}
void connectCallback(const redisAsyncContext *c, int status) {
if (status != REDIS_OK) {
printf("Error: %s\n", c->errstr);
return;
}
printf("Connected...\n");
}
void disconnectCallback(const redisAsyncContext *c, int status) {
if (status != REDIS_OK) {
printf("Error: %s\n", c->errstr);
return;
}
printf("Disconnected...\n");
}
int main (int argc, char **argv) {
#ifndef _WIN32
signal(SIGPIPE, SIG_IGN);
#endif
struct event_base *base = event_base_new();
redisOptions options = {0};
REDIS_OPTIONS_SET_TCP(&options, "127.0.0.1", 6379);
struct timeval tv = {0};
tv.tv_sec = 1;
options.connect_timeout = &tv;
redisAsyncContext *c = redisAsyncConnectWithOptions(&options);
if (c->err) {
/* Let *c leak for now... */
printf("Error: %s\n", c->errstr);
return 1;
}
redisLibeventAttach(c,base);
redisAsyncSetConnectCallback(c,connectCallback);
redisAsyncSetDisconnectCallback(c,disconnectCallback);
redisAsyncCommand(c, NULL, NULL, "SET key %b", argv[argc-1], strlen(argv[argc-1]));
redisAsyncCommand(c, getCallback, (char*)"end-1", "GET key");
event_base_dispatch(base);
return 0;
}
2.2 \redis\hiredis\adapters libevent.h 适配libevent网络库
typedef struct redisLibeventEvents {
redisAsyncContext *context;
struct event *ev;
struct event_base *base;
struct timeval tv;
short flags;
short state;
} redisLibeventEvents;
。。。
2.3 流程备注
1 客户端:与redis建立连接,把指令压缩成redis协议发给redis服务器;
2 redis服务器: 解析执行,返回结果(也要压缩成redis协议的数据)。
3 客户端:再把 redis协议数据解析出来
客户端与服务端的连接 通过 hiredis 来管理,包含:
a 压缩协议
b 解析协议
c 扩展: 事件处理,可以用来适配reactor模型
reactor 回顾:
1 事件对象
2 注册事件
3 事件循环
4 触发事件 —> 处理 IO
参考\redis\hiredis\adapters libevent.h适配libevent网络库,关键部分如下:
static void redisLibeventUpdate(void *privdata, short flag, int isRemove) {
redisLibeventEvents *e = (redisLibeventEvents *)privdata;
const struct timeval *tv = e->tv.tv_sec || e->tv.tv_usec ? &e->tv : NULL;
if (isRemove) {
if ((e->flags & flag) == 0) {
return;
} else {
e->flags &= ~flag;
}
} else {
if (e->flags & flag) {
return;
} else {
e->flags |= flag;
}
}
event_del(e->ev);
event_assign(e->ev, e->base, e->context->c.fd, e->flags | EV_PERSIST,
redisLibeventHandler, privdata);
event_add(e->ev, tv);
}
static void redisLibeventAddRead(void *privdata) {
redisLibeventUpdate(privdata, EV_READ, 0);
}
static void redisLibeventDelRead(void *privdata) {
redisLibeventUpdate(privdata, EV_READ, 1);
}
static void redisLibeventAddWrite(void *privdata) {
redisLibeventUpdate(privdata, EV_WRITE, 0);
}
static void redisLibeventDelWrite(void *privdata) {
redisLibeventUpdate(privdata, EV_WRITE, 1);
}
/* Register functions to start/stop listening for events */
ac->ev.addRead = redisLibeventAddRead;
ac->ev.delRead = redisLibeventDelRead;
ac->ev.addWrite = redisLibeventAddWrite;
ac->ev.delWrite = redisLibeventDelWrite;
ac->ev.cleanup = redisLibeventCleanup;
ac->ev.scheduleTimer = redisLibeventSetTimeout;
文章参考与<零声教育>的C/C++linux服务期高级架构系统教程学习:链接