1.概述
zk是一个分布式内存数据库。既然是数据库,就需要处理客户端发送过来的读,写请求。随着请求执行,不断更改数据实体。
作为内存数据库,数据实体全部加载到内存,在内存修改。我们把客户端发来的请求叫做事务,为了满足数据库的持久性要求,我们需要先将事务写入redo日志,待redo日志刷新到磁盘后,再提交对应的事务,修改数据实体。这样,后续重启zk时,借助redo我们也可以对所有redo再次迭代,进而使数据实体恢复到上次关闭zk时的状态。
为了尽可能加快这个恢复过程,zk除了redo还引入了快照机制。所谓快照就是直接将数据实体序列化保存到文件。这样,后续恢复时,我们可以反序列化直接得到数据实体,再针对此后的redo执行迭代,就能快速的恢复zk的数据实体。
本文以单机zk为分析对象,重点分析zk的启动恢复,处理客户端接入,客户端请求处理三个关键过程。
2.zk的启动恢复
首先我们明确那个类的那个函数承担了zk启动数据实体恢复的工作:FileTxnSnapLog::restore
。
恢复过程可划分成两个阶段:
(1).从快照恢复数据实体。
(2).基于redo
对数据实体进行迭代。
2.1.从快照恢复数据实体
完成此任务的类型及函数是:FileSnap::deserialize
。
执行流程如下:
(1).在快照目录对有效快照文件按序号部分递减排序。
快照文件文件名格式为:snapshot.zxid
。这里的zxid
是快照生成时数据实体对应的zxid
。
这样排在越前面的快照表示快照生成的时间点越晚,相应的包含的数据实体就越接近最终版本。
(2).依次对快照文件执行反向序列化得到数据实体
快照文件是通过对数据实体进行序列化得到的字节流文件,我们自然也可以对其执行反序列化恢复数据实体。
实现反序列化的关键类的关键函数是SerializeUtils::deserializeSnapshot
。
反向序列化的过程无需详细叙述,重点关注反向序列化后我们得到:
a. Map<Long, Integer> sessions
这个map
存储了sid
和其超时信息。
b. DataTree dt
zk
是一个内存数据库,这个dt
就是数据库的数据实体。
(3).反向序列化完成后进行完整性检测。
通过后,结束从快照恢复数据实体过程。此时实体的lastProcessedZxid
对应快照文件名中的zxid
。
若检测失败,则认为采用的快照文件被破坏了。
这样,我们采用下一个快照文件。直到遇到一个未损坏的,或在没有有效快照文件下,啥也不做。
2.2.基于redo
对数据实体进行迭代。
对redo
执行迭代时,我们得明确从何处开始。快照恢复后实体对应的zxid
从快照文件名可取得。
我们后续只需针对redo
中zxid
之后的各个redo
执行迭代即可。
redo
文件名的格式为:log.zxid
,其中的zxid
是redo
文件中首个redo
项的zxid
。
基于以上事实,我们执行如下步骤:
(1).从日志文件路径构建一个文件集合。
a. 此文件集合按文件名中zxid
递增排序。
b.集合中存在文件名的zxid
小于等于lastProcessedZxid+1
这样的文件时,文件集合首个文件为这些中zxid
最大的一个。
(2).对(1)
中文件集合从首个开始迭代,定位到首个zxid
大于或等于lastProcessedZxid+1
的redo
项。
从该redo
项开始,依次对文件集合从后续的每个redo
项执行重放过程。(这里需要执行反序列化得到redo
项对应的结构实例对象)
处理redo
项重放过程的类及其函数是:FileTxnSnapLog::processTransaction
,实际处理重放任务的是:DataTree::processTxn
。
因为redo
项包含了如何操作数据实体的必要信息,所以重放过程就是依据redo
项信息对数据实体进行修改的过程。
处理完毕,相应的数据实体的lastProcessedZxid
也被更新。
2.3.总结
通过2.1,2.2
我们便完成了zk
启动时从快照恢复,从redo
恢复的过程。
此时数据实体恢复到上次zk
结束时状态,相应的其lastProcessedZxid
也更新到最后一个提交的事务的zxid
了。
值得注意的是,恢复过程使用的所有redo
项会被收集起来,在zk
集群下,才会用到。这里略去。
3.处理客户端接入
zk
启动阶段完成数据恢复后,将开启服务监听,允许外部客户端接入。
下面我们分析zk
单机处理外部客户端接入的流程。
外部客户端接入zk
服务端时,在连接成功时,会先向服务端发一个注册包。
所以,zk
服务端一旦执行accept
得到一个被动连接后,从此连接上收到的首个包就是注册包。
对注册包进行处理的类及其函数是:ZooKeeperServer::processConnectRequest
。
利用注册包客户端可以给服务端提供若干信息,服务端也可以用这些信息来实现一些功能,诸如连接准入,连接超时等。
我们重点关注下,通过注册包新建一个会话和通过注册包复用一个会话。
3.1.通过注册包新建一个会话
当注册包中sessionId
为0
时,服务端需要为此接入分配一个新的会话来管理。
执行过程为:
(1). 为其分配一个独一无二的会话id
。加入服务端追踪的会话集合。
服务端的会话追踪是这样一个意思,因为连接到服务端的客户端可能数量很多,服务端维持每个会话需要消耗自身一些资源。当接入客户端数量很多时,服务端存在很大的资源压力。这时,有的客户端可能和服务端的网络出现了问题,导致服务端无法及时关闭对应的会话,及时回收资源。会话追踪就是通过服务端定期检测追踪的每个会话,在某个会话满足某些条件时,比如一段事件此会话上无任何事件产生,服务端会自动关闭此会话,及时回收会话资源的一种机制。
(2). 构建一个内部请求,提交给服务端的请求处理流水线。
请求对象是Request si = new Request(cnxn, sessionId, 0, OpCode.createSession, to, null);
,其中cnxn
是对应的被动连接对象,sessionId
是分配的会话id
,to
是一个缓存区,其中包含了此会话的超期信息。
为了理解这个处理过程,我们需要一点服务端请求处理流水线的知识。
服务端请求处理流水线:
a.服务端请求处理流水线构成:PrepRequestProcessor->SyncRequestProcessor->FinalRequestProcessor
。
所谓请求处理流水线具体一点是:请求先提交给PrepRequestProcessor
实例对象处理。这个实例对象独占一个线程。
PrepRequestProcessor
并不会将请求一次处理到位,而是只做此阶段的处理。
然后,将请求提交给SyncRequestProcessor
实例对象处理。这个实例对象也独占一个线程。
SyncRequestProcessor
也只做此阶段的处理。
然后,将请求提交给FinalRequestProcessor
实例对象处理。这个实例对象做最后一个阶段的处理。该对象不独占线程。
b.请求处理流水线的好处
在保证事务提交后持久性的前提下,提高了请求处理的吞吐率。
c.各个阶段主要的工作
因为数据库的请求分解后主要就是读数据,修改数据。针对修改数据就是修改数据实体。
PrepRequestProcessor
进行请求合理性检测,预先修改。因为此阶段请求处理时,可能前面有若干请求已经经过PrepRequestProcessor
阶段处理,但尚未完成FinalRequestProcessor
阶段处理。而且有的请求处理的效果依赖于前面请求的操作,所以,为了这部分预先修改的正确,zk
为其维护一个当前数据实体基础上的提前部分集合。提前部分+数据实体合起来就是当前所有完成PrepRequestProcessor
处理但尚未完成FinalRequestProcessor
处理的请求完成FinalRequestProcessor
阶段处理后数据实体的样子。
SyncRequestProcessor
进行为请求构造redo
项,写入redo
日志。
FinalRequestProcessor
在请求对应的redo
刷新到磁盘后,对请求执行FinalRequestProcessor
阶段处理,即实际修改数据实体。
(3). 会话创建请求处理
a.PrepRequestProcessor
阶段处理
因为单机下,所有会话均是全局会话。
针对全局的会话创建请求:
a.1.zk
为其分配一个独立的zxid
(使得全局zxid
递增一次)。
a.2.开始追踪此会话id
。所谓追踪就是,一段时机此会话上无事件产生,引发会话超时,主动关闭会话。
a.3.为其分配事务,一个CreateSessionTxn
实例对象,包含会话的超时信息。提交给下一阶段处理。
b.SyncRequestProcessor
阶段处理
b.1.针对有效请求,先将请求序列化,然后写入redo
文件。请求序列化后得到的就是请求对应的redo
项。
b.2.检测此时是否应该执行一次快照。
b.2.1.若不执行快照
将请求加入toFlush
集合。检测是否应该刷新。
b.2.1.1.若不应刷新,则处理结束。
b.2.1.2.若应该刷新,执行刷新。具体为:
保证对内存中序列化内容执行写入底层文件操作。
保证对所有待刷新的底层文件将此文件位于内核的页高速缓存刷新到磁盘。
最后,检测当前使用的redo
文件尺寸是否达到限制,达到限制后,后续redo
将写入一个新的redo
文件。
刷新后,将toFlush
集合中的项依次取出,提交给下一阶段处理。
b.2.2.若执行快照,执行快照。具体为:
b.2.2.1.通过互斥锁,保证同时只进行一个快照过程。
b.2.2.2.快照时,当前redo
文件无论是否达到尺寸限制,均结束。下个redo
将写入一个新的redo
文件。
b.2.2.3.快照过程即根据发起快照时数据实体的状态,将数据实体内容序列化到快照文件中,快照文件名格式为:snapshot
.执行快照时数据实体的zxid
。
值得注意的是,由于快照采用一个独立线程进行,故,快照过程中数据实体可能也在被流水线中FinalRequestProcessor
修改。zk
通过对每个node
加锁,保证每个node
的快照和FinalRequestProcessor
中修改是互斥的。这样最终快照得到的数据实体可能比文件名中zxid
要往后走几个zxid
。不过,没关系。即使往前多走了几个zxid
,启动恢复时,快照+redo
,相当于对多走的几个zxid
执行了一个多余的redo
,这样也并无大碍(一个修改动作执行两次,对数据实体无影响。一个删除动作,执行两次,对数据实体无影响。一个添加动作执行两次,对数据实体无影响)。
c.FinalRequestProcessor
阶段处理
针对会话创建请求,此阶段:
c.1.更新关联被动连接上的一些统计信息。
c.2.构建一个回复包,回复客户端。回复包是一个ConnectResponse
类型实例对象,其中包含:协议版本(固定0
),超期时间,会话id
,基于会话id
生成的密码等信息。
这样就结束了一个会话创建的流水线处理。
3.2.通过注册包复用一个会话
(1). 通过ServerCnxnFactory
类型的closeSession
。
从NIOServerCnxnFactory
,ServerCnxnFactory
,数据实体中分别移除此sid
关联的信息。这中间一方面是解除sid
与原来的被动连接对象的关联,一方面对原来的被动连接对象执行关闭处理。
(2). 在ServerCnxnFactory
中建立sid
与新的被动连接对象的关联。
(2). 通过提供的密码信息,验证密码和sid
是否一致。
a.若验证不一致,则同样构建一个ConnectResponse
类型的回复包,其中协议版本(固定0
),超期时间(固定0
),会话id
(固定0
),密码(空字符)等信息。之后又额外发送一个空包(服务端主动断开的标志)。
b.若验证一致,构建一个回复包,回复客户端。回复包是一个ConnectResponse
类型实例对象,其中包含:协议版本(固定0
),超期时间,会话id
,基于会话id
生成的密码等信息。
4.两种超期机制
zookeeper服务端存在两种超期机制。
一种面向会话,一种面向被动连接对象。
4.1.面向被动连接对象的超期机制
被动连接建立时,连接对象加入NIOServerCnxnFactory
的被动连接集合进行管理,NIOServerCnxnFactory
会开启一个检测线程,对其所管理的每个被动连接进行超期检测。检测到被动连接超期时,通过类NIOServerCnxn
的close
实现此被动连接的主动关闭流程。
4.2.面向会话的超期机制
客户端的注册包表明创建一个新会话时,会开启对此新会话的会话追踪,当会话追踪检测到此会话超期时,通过类ZooKeeperServer
的expire
实现会话的超期处理。超期处理会构建一个Request si = new Request(null, sessionId, 0, OpCode.closeSession, null, null);
请求,投递给流水线处理。
流水线针对Request(null, sessionId, 0, OpCode.closeSession, null, null);
请求的处理:
a.PrepRequestProcessor
阶段处理
因为单机下,所有会话均是全局会话。
针对全局的会话关闭请求:
a.1. 依据数据实体,当前中间态信息得到一个需要移除的临时路径集合。
a.2. 为需要移除的每个临时路径构建一个ChangeRecord
实例对象加入中间态集合,以便反映对应节点的删除信息。
a.3. 在closeSession
支持事务体下,为其分配一个事务体。其中包含所有要移除的临时节点路径信息。
b.SyncRequestProcessor
阶段处理
参考会话创建请求。
c.FinalRequestProcessor
阶段处理
针对会话关闭请求,此阶段:
c.1.从会话追踪中移除对此sid
的追踪。
c.2.针对数据实体,移除此会话下所有临时节点。
c.3.针对因为会话超时而产生的会话关闭请求,移除此sid
和被动连接的关联。执行被动连接关闭逻辑。
c.4.先后向客户端发回一个ReplyHeader hdr = new ReplyHeader(request.cxid, lastZxid, err.intValue());
回复包,在发一个标志服务端主动关闭连接的空包。
5.处理客户端请求
当客户端请求被服务端收取后,首先执行的是类ZooKeeperServer
的processPacket
。
由于来自客户端的请求包,必然以一个RequestHeader
开头,所以先反序列化得到一个RequestHeader
实例对象。
然后构建一个 Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), incomingBuffer, cnxn.getAuthInfo());
请求实例,设置请求来源si.setOwner(ServerCnxn.me);
后,投递到流水线中。
我们考察OpCode.create
类型请求的流水线处理流程:
(1). PrepRequestProcessor
阶段
为请求分配TxnHeader
头,OpCode.create
类型请求会得到一个新的zxid
。
继续对请求执行反向序列化得到CreateRequest
实例对象。
这一步会依据数据实体及中间态对请求合理性进行检测。
在判断请求会失败时,失败时会设置请求对象,使其包含失败信息。
request.getHdr().setType(OpCode.error);
request.setTxn(new ErrorTxn(e.code().intValue()));// 错误请求
request.setException(e);
在判断请求可以成功执行时,会相应调整中间态,为请求分配Txn
,这里是CreateTxn
。
此后递交给下一级处理。
(2).SyncRequestProcessor
阶段
SyncRequestProcessor
阶段的处理参考之前。简单来说,会针对请求构建redo
项序列化后写入redo
日志并刷新到磁盘后,再将请求递交给下一级流水线处理。
(3). FinalRequestProcessor
阶段
a.针对数据实体执行请求处理
若请求被允许执行,则
CreateTxn createTxn = (CreateTxn) txn;
rc.path = createTxn.getPath();
createNode(createTxn.getPath(), createTxn.getData(), createTxn.getAcl(), createTxn.getEphemeral() ? header.getClientId() : 0, createTxn.getParentCVersion(), header.getZxid(), header.getTime(), null);
这样将修改数据实体以反映请求操作。
更新数据实体的lastProcessedZxid。
b.针对中间态
若请求被允许执行,则
进行相应调整,以免除不必要的中间态信息存储。
b.回复阶段
针对请求出错的情况,构造回复包
err = e.code();
ReplyHeader hdr = new ReplyHeader(request.cxid, lastZxid, err.intValue());
responseSize = cnxn.sendResponse(hdr, rsp, "response");
针对请求允许的情况,构造回复包
lastOp = "CREA";
rsp = new CreateResponse(rc.path);
err = Code.get(rc.err);
responseSize = cnxn.sendResponse(hdr, rsp, "response");
6.客户端主动断开
客户端主动断开时,客户端会发送一个closeSession
类型的包给服务端。
所以只需分析下closeSession
类型包的处理流程即可。
当客户端请求被服务端收取后,首先执行的是类ZooKeeperServer
的processPacket
。
由于来自客户端的请求包,必然以一个RequestHeader
开头,所以先反序列化得到一个RequestHeader
实例对象。
然后构建一个 Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), incomingBuffer, cnxn.getAuthInfo());
请求实例,设置请求来源si.setOwner(ServerCnxn.me);
后,投递到流水线中。
我们考察OpCode.closeSession
类型请求的流水线处理流程:
(1). PrepRequestProcessor
阶段
为请求分配TxnHeader
头,OpCode.closeSession
类型请求会得到一个新的zxid
。
依据数据实体和中间态得到因为会话关闭而所需移除的临时节点集合,修改中间态以反向临时节点的移除。
在允许closeSession
拥有Txn
时,为请求分配CloseSessionTxn
,其中包含因会话关闭而所需移除的临时节点路径集合。
会话追踪里设置会话状态为关闭中zks.sessionTracker.setSessionClosing(request.sessionId);
将请求递交给下一级处理。
(2).SyncRequestProcessor
阶段
SyncRequestProcessor
阶段的处理参考之前。简单来说,会针对请求构建redo
项序列化后写入redo
日志并刷新到磁盘后,再将请求递交给下一级流水线处理。
(3). FinalRequestProcessor
阶段
针对closeSession
请求:
a. 将指定sid
从会话追踪对象里移除。这意味着,会话追踪里不再包含此会话id
的信息,相应的也不会再触发此会话的超期处理。
b. 针对数据实体执行closeSession
处理,这一步主要是从数据实体中移除因sid
关闭而所需移除的节点。
c. 针对中间态,进行相应调整,以免除不必要的中间态信息存储。
d. 在ServerCnxnFactory
层面,移除sid
和被动连接的映射关系。
e. 在NIOServerCnxn
层面,设置连接已经过期。
f. 在NIOServerCnxnFactory
层面,从被动连接集合中移除此被动连接,移除针对此被动连接的超期检测。
g. 在服务端方面,从数据实体中移除此被动连接。主要是移除挂在此连接上的监控信息。
h. 在NIOServerCnxn
层面,取消套接字上的事件监控,关闭套接字。