简介
dledger是openmessaging的一个组件, raft算法实现,用于分布式日志,本系列分析dledger如何实现raft概念,以及dledger在rocketmq的应用
本系列使用dledger v0.40
本文分析dledger的日志,包括写入,复制,共识点推进
关键词
Raft
Openmessaging
参考资料
In Search of an Understandable Consensus Algorithm raft论文简版
技术架构
- 应用/client client是dledger提供给应用访问节点的组件
以下是节点内组件
- rpc服务
rpc服务内置rpc client/rpc server,对外接收外部rpc访问,包括client和节点间通讯;对内,解释rpc请求,转发给Server;对外,发送rpc请求到其他节点
- Server
主程序,负责节点启动,其他组件的启动;写入日志请求初步处理等
- Elector
选举类,负责集群主节点选举
- EntryPusher
日志写入器,内置分发器和处理器,分发器主节点用于复制日志到跟随者;处理器跟随者使用,写入日志
- 存储
存储日志条木,有两个实现,基于内存和基于文件
- 快照/状态机
新版本的dledger提供状态机,dledger成为通用的raft组件,不再是转为rocketmq使用
日志
技术架构
日志组件核心是维护共识日志,所有节点都认可的日志记录点。应用发起写入rpc请求,需领导者处理,写入本地存储,领导者复制到其他节点,不断推进日志共识点。集群领导者下线,其他节点使用共识点恢复集群
- EntryPusher 统筹日志复制全局的角色,为领导者构建跟随者对应的EntryDispatcher,负责跟随者的同步,为每个跟随者构建EntryHandler处理同步请求
- 集群领导者通过比对/截取/写入/提交 同步日志
- QuorumAckChecker全局提交,推进共识
下面详细分析各参与组件
领导者写入
应用写入日志需要由领导者处理,领导者写入消息到本地存储后,复制到跟随者
写入请求DLedgerServer的handleAppend方法处理,然后appendAsLeader写入存储,最后DLedgerEntryPusher的appendClosure方法,登记本节点的日志水位线,pending返回
复制
领导者写入日志,揭开集群同步的序幕
领导者为每个跟随者构建EntryDispatcher,负责该跟随者的同步,QuorumAckChecker全局提交,确定共识index
EntryDispatcher是有状态,定时驱动
上图展示EntryDispatcher状态变更,状态变更执行的动作,相对应的EntryHandler处理方法,当然,实际不是直接交互的,EntryHandler缓存请求,定时处理,后面章节详细分析
其中,
INSTALL_SNAPSHOT 放到后续存储/快照中分析,在本版本,快照可选,没有该功能不影响日志复制
COMMIT EntryDispatcher没有处在该状态,只是一个操作
关键属性
很大程度上,理解dledger的日志复制就是理解日志存储的位置点和位置点动态变化,本周介绍一下关键属性
下图展示日志关键index
DLedgerEntryPusher
- peerWaterMarksByTerm
定义:Map<Long/*term*/, ConcurrentMap<String/*peer id*/, Long/*match index*/>>
term--->peerId--->matchIndex
按term记录每个节点的日志水位(复制进度)
- pendingClosure
定义:Map<Long /*term*/, ConcurrentMap<Long /*index*/, Closure /*upper callback*/>>
term--->index---> closure
按term存放消息索引点的返回
EntryDispatcher
- writeIndex
跟随者同步的下一个写入点
- matchIndex
比对和截取状态:比对匹配的日志index,用于后面截取
写入状态:
- pendingMap
定义:ConcurrentMap<Long/*index*/, Pair<Long/*send timestamp*/, Integer/*entries count in req*/>>
记录写入请求,以写入点位key,
- batchAppendEntryRequest
缓存写入消息请求,批量推送到跟随者
MemberState
- committedIndex
集群日志写入位置共识,过半节点日志index
- ledgerEndIndex
最近写入日志index
- ledgerEndTerm
最近写入日志index所在的term
- appliedIndex
用于状态机,后续状态机/快照分析
DLedgerStore
- beforeBeginIndex
存储启动/恢复,未写入新消息前的位置,该index指向启动时最新一个消息
- ledgerEndIndex
与MemberState相同
比对
领导者上任或与跟随者同步写入出现不一致时,EntryDispatcher进入比对状态,找到跟随者与领导者日志的匹配点,即,最大的term/index一致
-
比对请求
1 检查是否为领导者,是否变更为领导者
2 ledgerEndIndex=-1,主节点没有写入日志
严格来说不是没有写入,是只有beforeBegin写入
3 从跟随者最近已写入点开始比对
注意:这个writeIndex是代表跟随者,初始是领导者本地的ledgerEndIndex,经过一轮比对,返回跟随者的ledgerEndIndex
4 compareIndex < beforeBeginIndex
说明跟随者日志存在较大落后,通过快照恢复到beforeBeginIndex,比对操作才介入
-----------------------------------------分割线----------------------------------------
5 compareIndex = beforeBeginIndex
这是特殊的index,dLedgerStore存有index和term,其他的需要从存储获取消息条目(Entry),但存储只支持获取大于beforeBeginIndex的消息
6 compareIndex > beforeBeginIndex 正常
正常消息从存储获取,从而获得term,构建比对请求
7 推送比对请求到跟随者
8 等待返回
9 比对成功
9.1 设置matchIndex,更新跟随者的水位线
9.2 比对完成,转到截取
10 返回不一致状态,需继续比对,writeIndex设置为跟随者返回的可比较点
10.1 term不一致
10.2 跟随者返回的最近写入index,用于下一轮比对
参看处理对比
-
处理比对
0 获取比对的日志index,term
1 检查,排除刚开始没有任何日志的情况
2 本地最近写入点ledgerEndIndex>=preLogIndex
若ledgerEndIndex < preLogIndex,比对的存储还没写入
2.1 beforeBeginIndex是特殊的index
上面比对请求介绍过
2.2 其他index需要从存储获取消息来获取term
2.3 匹配条件是term&index
2.4 term不一样,返回本节点term的第一个写入的index
返回的index,领导者减一作为下一个比较点,实际退到上一个term最后写入index比对,为什么这样做?
我理解,另一个做法是index-1,再比对,同一个index,领导者的term与自身term不一致,说明跟随者之前是领导者,或者是写入比当前领导者多的节点,而term期限内,一般写入的消息比较多,若index减1再比较,可能经过很多轮次的比较才获得匹配点,很难预计多少轮才找到匹配点,不如退到上一个term最后写入点比较快
-----------------------------------------分割线----------------------------------------
3 这里有点偷懒的嫌疑,个人认为不应该定为不一致
截取
比对完成,得到匹配点matchIndex,matchIndex后面截掉,同步写入从matchIndex+1开始
- 截取请求
截取请求比较简单,不多解释
- 处理截取
1 截取存储
1.1 截取存储后,返回index是ledgerEndIndex,truncateIndex是第一个截取点,因此截取后正常情况index=truncateIndex-1
2 修改committedIndex,1.1 截取更新了ledgerEndIndex
committedIndex/ledgerEndIndex 取小的更新committedIndex,初看起来感到困惑,这样意味着共识点会后退,但回想 5.3.3 比对的处理比对 “2.4 term不一样,返回本节点term的第一个写入的index”, 回退到上一个term,可能出现大量的消息截取,出现共识点回退,但这个也是取舍。我个人不太认可,成为共识,继续往前推
写入(append)
经过比对和截取,领导者的EntryDispatcher与跟随者找到日志匹配点(mathIndex),EntryDispatcher从匹配点推送日志给跟随者
- 写入请求
1 是否leader, 是否变更为leader,leader变更转为COMPARE状态
2 检查pending写入请求是否有超时,超时清理
下面分析一下doCheckAppendResponse
检查pending写入请求是否超时,检查第一个,最有可能超时的请求
2.1/2.2 最近写入水位线+1,就是第一个pending请求key
2.3 未发送,当前batchAppendEntryRequest是第一个pending写入请求
2.4 超过推送返回时间,需重新发送
此时批量请求存放的是下一批(可能是第二,第三批)的待发送请求,需要重发前一批,清理,重置writeIndex为第一批index,
注意:这里调整了writeIndex,pendingMap没有改变,后面将从writerIndex构建请求,形成两条请求线,这对整个写入理解很关键,将在5中详细分析
----------------------------------------------分割线------------------------------------------------
3 writeIndex>ledgerEndIndex, 跟随者写入index>领导者最后写入index,没有消息可复制
writeIndex等于ledgerEndIndex+1,复制到最新的写入,下一个复制点加1
3.1 写入文档有未推送,推送后批量请求的第一个index为key,缓存请求到pendingMap,并清理请求,
推送请求的异步处理只处理SUCCESS,INCONSISTENT_STATE两个返回状态, 也就是说,其他网络超时,参数重复不对pendingMap处理
3.2 空闲,提交
4 到这里,writeIndex <= ledgerEndIndex
进一步细分 writeIndex <= beforeBeginIndex
存储只能获取大于beforeBeginIndex,只有通过快照恢复到大于beforeBeginIndex
----------------------------------------------分割线------------------------------------------------
下面分析清理pengdingMap中的请求,首先分析一下pendingMap的状态
在2.4 分析过,pengding请求超时,writeIndex调回到水位线+1作为写入点,
>上图的上部分是初始状态
writeIndex1 第一个批次请求;writeIndex2第二个批次请求,并已发送,pendingMap有两个数据
>上图的下部分,writeIndex1超时,调和writeIndex,请求重新发送,但批次数量会有所不同,形成新的writeIndex1(覆盖原有),writeIndex2-x,而且writeIndex1的批量比原writeIndex1大,writeIndex2-x比原writeIndex2大,pendingMap有3个数据
此时有两条发送线,不能说writeIndex2已废弃,可能跟随者已处理了原writeIndex1,只是回复失败,处理写入分析跟随者怎样处理
通过上面pendingMap的状态分析,很容易理解清理的逻辑,
第一个条目index+条目数-1 = 最后条目的index
最后条目的index < 写入水位线 就是上面pendingMap分析图,上部分的writeIndex2
当跟随者写入的是第二条线,writeIndex2-x完成后
----------------------------------------------分割线------------------------------------------------
6 pengding请求数量是否超限
本人觉得这里再做一个过期清理无必要,下一轮开始就会过期清理
7 pengdingMap缓存请求没有超,writeIndex消息条目加入批量请求,条件达到可发送批量请求
8 下一个写入点
- 处理写入
下图是doWork 写入部分
写入请求与其他TRUNCATE/COMPARE/COMMIT请求不同,其他的请求是缓存再队列,写入请求是map,已写入的index为key,这也是保证写入完整性的关键
1 写入点,ledgerEndIndex+1
2/3 下一个请求,请求空,请求未到或者写入了旧的请求线,参看pengdingMap分析
接着是检查请求Map
----------------------------------------------分割线------------------------------------------------
3.1.1 处理有点不解,按pendingMap分析,writeIndex2批量可能被返回不一致,处理没错,但本人认为更好的处理应该是保证写入点连续性即好,过期的请求删去就可以,返回不一致,领导者进入比对,降低效率
----------------------------------------------分割线------------------------------------------------
3.4 我理解,3.1处返回不一致,领导者转入COMPARE,跟随者也会转到COMPARE,继而TRUNCATE状态,writeRequestMap被清理
3.5 纵观整个清理逻辑,核心是及早发现请求的不连续性
minFastForwardIndex是不连续的请求写入点,因为出现写入点在3.2已退出
----------------------------------------------分割线------------------------------------------------
跟随者日志写入,这里值得注意的是,请求带了cimmittedIndex,主要作用孜孜不倦地推进写入落后的跟随者靠近或者达到共识点,非常好的细节!
提交
提交是EntryDispatcher的一个状态,但EntryDispatcher没有被赋予该状态,因此提交不是定时执行,而是写入(append)在空闲时调用提交,推进跟随者的日志
- 提交请求
提交比较简单,从MemberState获取committedIndex,构建请求,推送到跟随者
- 处理写入
committedIndex/ledgerEndIndex取小的,参看5.3.7 Quorum检查分析,committedIndex是过半数的日志水位线,有些跟随者的ledgerEndIndex比committedIndex,同步写入的进度慢
Quorum检查
Quorum法定人数,这个数值应该是可以设置,超过半数都可以,甚至可以设为总数,但dledger实际操作按过半数,我们这里也按过半数理解
前面的比对,截取,写入不断复制日志到跟随者,QuorumChecker定时检查,记录阶段的共识点,然后通过”5.3.6提交”, 提交共识点到跟随者,集群就是这样持续的向前推进
QuorumChecker前面清理工作不详细分析
5 检查是否未领导者
6 更新自身(领导者)的水位线
7 计算集群共识点
8 更新到MemberState
新领导者
新领导者上任首要是恢复共识日志,dledger使用Raft Log Read恢复集群日志共识,Raft Log Read是客户端一致性读取分布式日志的技术,其原理是client伪装成集群节点,走一遍Raft Log,经过前面分析,很容易想到实现原理, 3板斧-比对,截取,写入,其中,如果比对index < beforeBeginIndex先恢复快照
线性一致读
dledger实现了Raft Log Read,目前用于新领导者恢复共识日志,但其性能开销应用不能接受,ReadIndex Read/Lease Read预留了方法但还未实现, 因此dledger未能用于开发对外应用,例如,kv。
jraft实现了ReadIndex Read/Lease Read,如想了解线性一致读可以了解一下jraft