目录
1. struct connection
ConnectionType属性
创建connection
2. struct client
3. 绑定客户端回调函数的流程
3.1. 读事件回调函数的设置
3.2. 写事件回调函数的设置
3.3. connSocketEventHandler函数
3.4. Redis5版本的设置回调函数
3.5. 个人的一些想法,修改源码
4. 总结设置客户端回调函数的流程
读事件回调函数的设置
写事件回调函数的设置流程
5. 回调函数的调用流程
客户端
服务器端
本文章主要内容是下面两部分:
- 两个结构体:connection、client
- 设置回调函数的注意点和函数connSocketEventHandler
1. struct connection
什么时候使用了connection?在创建一个客户端时候,会同时创建一个connection来绑定该fd。
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
while(max--) {
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
.............
acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);
}
}
//创建connection
connection *connCreateAcceptedSocket(int fd) {
connection *conn = connCreateSocket();
conn->fd = fd;
conn->state = CONN_STATE_ACCEPTING;
return conn;
}
该结构是一个完成的连接,客户端的fd封装成一个connection。
struct connection {
ConnectionType *type;
ConnectionState state; //表示该客户端当前的连接状态
short int flags;
short int refs; //该连接被引用的数量
int last_errno; //该连接的最终错误
void *private_data; //在网络这部分,可以认为是结构体client
//一些对应的回调函数
ConnectionCallbackFunc conn_handler;
ConnectionCallbackFunc write_handler;
ConnectionCallbackFunc read_handler;
int fd; //该客户端对应的fd
};
//回调函数的类型
typedef void (*ConnectionCallbackFunc)(struct connection *conn);
//connection的flags的值
#define CONN_FLAG_CLOSE_SCHEDULED (1<<0) /* Closed scheduled by a handler */
#define CONN_FLAG_WRITE_BARRIER (1<<1) /* Write barrier requested */
//一般是先执行读事件,之后再执行写事件,但是想要置换顺序的话,其flags置为CONN_FLAG_WRITE_BARRIER,就可以换顺序了
//表示客户端当前的连接状态
typedef enum {
CONN_STATE_NONE = 0,
CONN_STATE_CONNECTING,
CONN_STATE_ACCEPTING,
CONN_STATE_CONNECTED,
CONN_STATE_CLOSED,
CONN_STATE_ERROR
} ConnectionState;
ConnectionType属性
connection有个ConnectionType属性,这里是一堆接口(函数的第一个参数都是connection)
,而struct connection
是操作对象。
那么该结构与ConnectionType
配合使用。不同ConnectionType的connection就会有不同的接口。
typedef struct ConnectionType {
void (*ae_handler)(struct aeEventLoop *el, int fd, void *clientData, int mask);
int (*connect)(struct connection *conn, const char *addr, int port, const char *source_addr, ConnectionCallbackFunc connect_handler);
int (*write)(struct connection *conn, const void *data, size_t data_len);
int (*read)(struct connection *conn, void *buf, size_t buf_len);
void (*close)(struct connection *conn);
int (*accept)(struct connection *conn, ConnectionCallbackFunc accept_handler);
int (*set_write_handler)(struct connection *conn, ConnectionCallbackFunc handler, int barrier);
int (*set_read_handler)(struct connection *conn, ConnectionCallbackFunc handler);
.........................
int (*get_type)(struct connection *conn);
} ConnectionType;
//这里有个要点需要留意:所有的函数类型的参数都是struct connecton* 开头的,
//但是只有ae_handler类型不是,其参数没有connection的,其是在参数clientData位置上
为什么要有这个类型ConnectionType呢?是因为Redis中默认有两种类型的connection。感觉像是面向对象的,继承,不同类型的connection会有不同的方法。
Redis从版本6开始支持SSL / TLS,这是一项可选功能,需要在编译时启用。所以才弄了两种类型。
//要对这些接口有印象,后续就是使用这些接口的
ConnectionType CT_Socket = {
//这些都是函数,比如把函数connSocketEventHandler赋值给ae_hander
.ae_handler = connSocketEventHandler,
.close = connSocketClose,
.write = connSocketWrite,
.read = connSocketRead,
.accept = connSocketAccept,
.connect = connSocketConnect,
.set_write_handler = connSocketSetWriteHandler,
.set_read_handler = connSocketSetReadHandler,
............................................
.get_type = connSocketGetType
};
//tls.c
#ifdef USE_OPENSSL
//需要定义了USE_OPENSSL,这个CT_TLS才会生效
ConnectionType CT_TLS = {
.ae_handler = tlsEventHandler,
.accept = connTLSAccept,
.connect = connTLSConnect,
.blocking_connect = connTLSBlockingConnect,
.read = connTLSRead,
.write = connTLSWrite,
.close = connTLSClose,
.set_write_handler = connTLSSetWriteHandler,
.set_read_handler = connTLSSetReadHandler
.........................................................
.get_type = connTLSGetType
};
创建connection
那编译时候不使用TLS的,那创建的connection的type就是CT_Socket类型。从源码可知,所以后面我们就关注CT_Socket的接口就行。
connection *connCreateAcceptedSocket(int fd) {
connection *conn = connCreateSocket();
conn->fd = fd; //设置对应的fd
conn->state = CONN_STATE_ACCEPTING; //设置状态
return conn;
}
connection *connCreateSocket() {
connection *conn = zcalloc(sizeof(connection));
conn->type = &CT_Socket; //这个重点,是CT_Socket类型
conn->fd = -1;
return conn;
}
//tls.c
static connection *createTLSConnection(int client_side) {
SSL_CTX *ctx = redis_tls_ctx;
if (client_side && redis_tls_client_ctx)
ctx = redis_tls_client_ctx;
tls_connection *conn = zcalloc(sizeof(tls_connection));
conn->c.type = &CT_TLS; //这个就是CT_TLS类型的
conn->c.fd = -1;
conn->ssl = SSL_new(ctx);
return (connection *) conn;
}
2. struct client
什么时候使用到client?还是从创建一个客户端acceptTcpHandler开始。
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
........................
acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);
}
static void acceptCommonHandler(connection *conn, int flags, char *ip) {
..............................
client *c = createClient(conn); //创建client
}
client *createClient(connection *conn) {
client *c = zmalloc(sizeof(client));
if (conn) {
.........................
connSetReadHandler(conn, readQueryFromClient); //设置读事件回调函数
connSetPrivateData(conn, c); //把client变量c赋值给conection->privateData
}
//初始化client的一些变量
................
if (conn) linkClient(c); //把该客户端添加到服务器server.client链表中保存
}
Redis使用结构体client存储客户端连接的所有信息。这里面就包括了客户端对应的connection。
//redis5版本的是有fd,而redis6版本的用connection替代了fd
typedef struct client {
uint64_t id; /* Client incremental unique ID. */
connection *conn; //客户对应的connection
int resp; /* RESP protocol version. Can be 2 or 3. */
redisDb *db; //select命令选择的数据库对象
//从客户端读取的数据存储的位置,即输入缓冲区
sds querybuf; /* Buffer we use to accumulate client queries. */
size_t qb_pos; /* The position we have read in querybuf. */
// 命令和命令参数
int argc; /* Num of arguments of current command. */
robj **argv; /* Arguments of current command. */
struct redisCommand *cmd; //待执行的命令
int reqtype; /* Request protocol type: PROTO_REQ_* */
int multibulklen; /* Number of multi bulk arguments left to read. */
long bulklen; /* Length of bulk argument in multi bulk request. */
//回复客户端数据的链表
list *reply; /* List of reply objects to send to the client. */
unsigned long long reply_bytes; /* Tot bytes of objects in reply list. */
size_t sentlen; /* Amount of bytes already sent in the current
buffer or object being sent. */
uint64_t flags; /* 客户端标识,Client flags: CLIENT_* macros. */
...............
/* Response buffer */ //回复客户端数据的地方,即输出缓冲区,若是不够空间,就存放在reply中
int bufpos;
char buf[PROTO_REPLY_CHUNK_BYTES];
} client;
3. 绑定客户端回调函数的流程
3.1. 读事件回调函数的设置
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
..........
while(max--) {
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
........................
acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);
}
}
static void acceptCommonHandler(connection *conn, int flags, char *ip) {
..............................
/* Create connection and client */
client *c = createClient(conn);
}
client *createClient(connection *conn) {
client *c = zmalloc(sizeof(client));
if (conn) {
.........................
connSetReadHandler(conn, readQueryFromClient); //设置读事件回调函数
connSetPrivateData(conn, c); //把client变量c赋值给conection->privateData
}
}
static inline int connSetReadHandler(connection *conn, ConnectionCallbackFunc func) {
return conn->type->set_read_handler(conn, func);
}
调用函数connSetReadHandler设置读事件回调函数。看到该函数的实现,可能会比较疑惑。所以这时就需要关联上面讲的ConnectionType属性,其是CT_Socket。所以我们查看到CT_Socket的set_read_handler是函数 connSocketSetReadHandler。
那么connSetReadHandler的实现就变成如下。那么其最终也是调用aeCreateFileEvent来创建一个FileEvent,并且绑定func给对应的fileProc。
static inline int connSetReadHandler(connection *conn, ConnectionCallbackFunc func) {
//return conn->type->set_read_handler(conn, func);
//就是调用connSocketSetReadHandler
return connSocketSetReadHandler(conn, func);
}
static int connSocketSetReadHandler(connection *conn, ConnectionCallbackFunc func) {
if (func == conn->read_handler) return C_OK;
conn->read_handler = func;
if (!conn->read_handler)
aeDeleteFileEvent(server.el,conn->fd,AE_READABLE);
else
if (aeCreateFileEvent(server.el,conn->fd,
AE_READABLE,conn->type->ae_handler,conn) == AE_ERR) return C_ERR;
return C_OK;
}
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData)
{
aeFileEvent *fe = &eventLoop->events[fd];
....................
fe->mask |= mask;
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;
fe->clientData = clientData;
}
那么又有疑惑了,不是说绑定func的吗?怎么函数aeCreateFileEvent中的参数是conn->type->ae_handler?我们先保留这个疑问,看完写事件回调函数的设置。
3.2. 写事件回调函数的设置
void beforeSleep(struct aeEventLoop *eventLoop) {
.................
/* Handle writes with pending output buffers. */
handleClientsWithPendingWritesUsingThreads();
}
int handleClientsWithPendingWritesUsingThreads(void) {
.....................
listRewind(server.clients_pending_write,&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
//设置写事件回调函数
if (clientHasPendingReplies(c) &&
connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR)
..................
}
}
static inline int connSetWriteHandler(connection *conn, ConnectionCallbackFunc func) {
return conn->type->set_write_handler(conn, func, 0);
}
conn->type->set_write_handler绑定的是connSocketSetWriteHandler。那么connSetWriteHandler的实现即是:
static inline int connSetWriteHandler(connection *conn, ConnectionCallbackFunc func) {
//return conn->type->set_write_handler(conn, func, 0);
return connSocketSetWriteHandler(conn, func, 0);
}
static int connSocketSetWriteHandler(connection *conn, ConnectionCallbackFunc func, int barrier) {
if (func == conn->write_handler) return C_OK;
conn->write_handler = func;
if (barrier)
conn->flags |= CONN_FLAG_WRITE_BARRIER;
else
conn->flags &= ~CONN_FLAG_WRITE_BARRIER;
if (!conn->write_handler)
aeDeleteFileEvent(server.el,conn->fd,AE_WRITABLE);
else
if (aeCreateFileEvent(server.el,conn->fd,AE_WRITABLE,
conn->type->ae_handler,conn) == AE_ERR) return C_ERR;
return C_OK;
}
设置写事件回调函数的也是使用conn->type->ae_handler。
说明读写事件的回调函数的设置都统一是使用conn->type->ae_handler。ae_handler对应的是connSocketEventHandler。
3.3. connSocketEventHandler函数
static void connSocketEventHandler(struct aeEventLoop *el, int fd, void *clientData, int mask)
{
connection *conn = clientData;
//创建connection时候,设置了state=CONN_STATE_ACCEPTING,所以这个判断不成立
if (conn->state == CONN_STATE_CONNECTING && (mask & AE_WRITABLE) && conn->conn_handler) {
int conn_error = connGetSocketError(conn);
if (conn_error) {
conn->last_errno = conn_error;
conn->state = CONN_STATE_ERROR;
} else {
conn->state = CONN_STATE_CONNECTED;
}
if (!conn->write_handler) aeDeleteFileEvent(server.el,conn->fd,AE_WRITABLE);
if (!callHandler(conn, conn->conn_handler)) return;
conn->conn_handler = NULL;
}
//位全为1结果才是1,初始时候flags是0,所以&CONN_FLAG_WRITE_BARRIER后也是0
//只有后续设置flags=CONN_FLAG_WRITE_BARRIER后,再&结果才是1
int invert = conn->flags & CONN_FLAG_WRITE_BARRIER;
int call_write = (mask & AE_WRITABLE) && conn->write_handler;
int call_read = (mask & AE_READABLE) && conn->read_handler;
//执行对应的回调函数
/* Handle normal I/O flows */
if (!invert && call_read) {
if (!callHandler(conn, conn->read_handler)) return;
}
/* Fire the writable event. */
if (call_write) {
if (!callHandler(conn, conn->write_handler)) return;
}
/* If we have to invert the call, fire the readable event now
* after the writable one. */
if (invert && call_read) {
if (!callHandler(conn, conn->read_handler)) return;
}
}
static inline int callHandler(connection *conn, ConnectionCallbackFunc handler) {
connIncrRefs(conn); //增加refs值以保护连接
if (handler) handler(conn); //这里就是执行回调函数,即是执行readQueryFromClient等
connDecrRefs(conn); //回调函数执行后,refs--
if (conn->flags & CONN_FLAG_CLOSE_SCHEDULED) {
if (!connHasRefs(conn)) connClose(conn); //如果refs==0,执行延迟关闭
return 0;
}
return 1;
}
static inline void connIncrRefs(connection *conn) {
conn->refs++;
}
到这里终于知道设置conn->type->ae_handler作为回调函数的原因了。
前面的设置读写回调时候,把readQueryFromClient绑定给conn->read_handler,把sendReplyToClient绑定给conn->write_handler。
所以,connSocketEventHandler
函数中既有读事件回调函数,也有写事件回调函数。所以,我们可以这样认为,connSocketEventHandler
是connection的处理中心。在主框架的epoll中并不会直接调用客户端的读写回调函数,而是统一调用connSocketEventHandler
,这样一来相当于是框架与connection解耦了。
3.4. Redis5版本的设置回调函数
我回头查看了Redis5.0.10版本的,发现,其是直接设置readQueryFromClient作为回调函数的,这个版本也是没有结构体connection,其是直接使用client的。
//Reids5.0.10版本
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
while(max--) {
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
...........
acceptCommonHandler(cfd,0,cip);
}
}
static void acceptCommonHandler(int fd, int flags, char *ip) {
client *c = createClient(fd)
.................
}
client *createClient(int fd) {
client *c = zmalloc(sizeof(client));
if (fd != -1) {
aeCreateFileEvent(server.el,fd,AE_READABLE,readQueryFromClient, c)
......................
}
}
Redis6后,开始支持SSL / TLS,添加了conneciton,这个connection就是有两种类型。所以才这样弄吧。
3.5. 个人的一些想法,修改源码
当事件就绪时,那都是执行connSocketEventHandler,而其都有读/写事件的回调函数。
我认为,那结构体aeFileEvent可以只拥有一个aeFileProc即可。
typedef struct aeFileEvent {
int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
aeFileProc *rfileProc;
aeFileProc *wfileProc;
void *clientData;
} aeFileEvent;
//可以改写成如下
typedef struct aeFileEvent {
int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
aeFileProc *fileProc; //只用一个回调函数就行
void *clientData;
} aeFileEvent;
而在函数aeProcessEvents中不再需要判别是读事件还是写事件了。可以改写成如下:
//展示主体,主要是修改了for循环内部,不管是哪种类型,统一是使用fe->fileProc(....)
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
int processed = 0, numevents;
...............................
if (eventLoop->maxfd != -1 ||
((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
struct timeval tv, *tvp;
if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)
eventLoop->beforesleep(eventLoop);
/* Call the multiplexing API, will return only on timeout or when
* some event fires. */
numevents = aeApiPoll(eventLoop, tvp);
for (int j = 0; j < numevents; j++) {
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
if (fe->mask) //表示该fd是有关注的事件类型的,就可以执行对应的读或写
//该函数就是调用connSocketEventHandler
fe->fileProc(eventLoop,fd,fe->clientData,mask);
}
}
}
可以这样写的原因,是因为connSocketEventHandler中有写回调和读回调,只要传事件类型进去就知道是使用读回调还是写回调。这种写法就更加统一了。
那为什么Redis作者不这样做呢?是为了兼容之前版本的,或者是我漏了什么细节是不能这样操作的呢?若有见解,欢迎在评论区讨论指出。
4. 总结设置客户端回调函数的流程
读事件回调函数的设置
创建连接时候,需要设置客户端的读回调。
createClient--->connSetReadHandler--->(conn->type->set_read_handler)--->(connSocketReadHandler,其内部把读回调函数readQueryFromClient赋值给read_handler)--->(aeCreateFileEvent,把conn->type->ae_handler赋值给rfileProc)。
写事件回调函数的设置流程
单线程的情况:
aeProcessEvents--->beforesleep--->handleClientsWithPendingWritesUsingThreads--->handleClientsWithPendingWrites--->connSetWriteHandlerWithBarrier--->(connSocketSetWriteHandler,把sendReplyToClient赋值给write_handler)--->(aeCreateFileEvent,把conn->type->ae_handler赋值给wfileProc)。
5. 回调函数的调用流程
客户端
当事件就绪(假设是读事件),就会执行fe->rfileProc函数,那该函数就是执行connSocketEventHandler,接着其内部会调用callHandler函数,callHandler就会调用conn->read_handler。
其实不管是读事件还是写事件,都是执行connSocketEventHandler。
即是aeProcessEvents--->rfileProc--->connSocketEventHandler--->callHandler--->(conn->read_handler,即是readQueryFromClient)。
服务器端
aeProcessEvents--->rfileProc--->acceptTcpHandler。
对比:
服务器端的是直接调用回调函数acceptTcpHandler。
而客户端的是调用connSocketEventHandler,再在connSocketEventHandler内部判断若是读事件,才执行回调函数readQueryFromClient。