本文紧接着 源码分析 RocketMQ DLedger(多副本) 之日志追加流程 ,继续 Leader 处理客户端 append 的请求流程中最至关重要的一环:日志复制。
温馨提示:由于微信单篇文章的字数限制,RocketMQ DLedger 日志复制分为两篇文章介绍。
DLedger 多副本的日志转发由 DLedgerEntryPusher 实现,接下来将对其进行详细介绍。
温馨提示:由于本篇幅较长,为了更好的理解其实现,大家可以带着如下疑问来通读本篇文章: 1、raft 协议中有一个非常重要的概念:已提交日志序号,该如何实现。 2、客户端向 DLedger 集群发送一条日志,必须得到集群中大多数节点的认可才能被认为写入成功。 3、raft 协议中追加、提交两个动作如何实现。
日志复制(日志转发)由 DLedgerEntryPusher 实现,具体类图如下:
主要由如下4个类构成:
DLedgerEntryPusher DLedger 日志转发与处理核心类,该内会启动如下3个对象,其分别对应一个线程。
EntryHandler 日志接收处理线程,当节点为从节点时激活。
QuorumAckChecker 日志追加ACK投票处理线程,当前节点为主节点时激活。
EntryDispatcher 日志转发线程,当前节点为主节点时追加。
EntryHandler
日志接收处理线程,当节点为从节点时激活。
EntryDispatcher
日志转发线程,当前节点为主节点时追加。
接下来我们将详细介绍上述4个类,从而揭晓日志复制的核心实现原理。
1、DLedgerEntryPusher
1.1 核心类图
DLedger 多副本日志推送的核心实现类,里面会创建 EntryDispatcher、QuorumAckChecker、EntryHandler 三个核心线程。其核心属性如下:
DLedgerConfig dLedgerConfig 多副本相关配置。
DLedgerStore dLedgerStore 存储实现类。
MemberState memberState 节点状态机。
DLedgerRpcService dLedgerRpcService RPC 服务实现类,用于集群内的其他节点进行网络通讯。
Map peerWaterMarksByTerm 每个节点基于投票轮次的当前水位线标记。键值为投票轮次,值为 ConcurrentMap/, Long/* 节点对应的日志序号*/。
Map pendingAppendResponsesByTerm 用于存放追加请求的响应结果(Future模式)。
EntryHandler entryHandler 从节点上开启的线程,用于接收主节点的 push 请求(append、commit、append)。
QuorumAckChecker quorumAckChecker 主节点上的追加请求投票器。
Map dispatcherMap 主节点日志请求转发器,向从节点复制消息等。
DLedgerStore dLedgerStore
存储实现类。
DLedgerRpcService dLedgerRpcService
RPC 服务实现类,用于集群内的其他节点进行网络通讯。
Map pendingAppendResponsesByTerm
用于存放追加请求的响应结果(Future模式)。
QuorumAckChecker quorumAckChecker
主节点上的追加请求投票器。
接下来介绍一下其核心方法的实现。
1.2 构造方法
1public DLedgerEntryPusher(DLedgerConfig dLedgerConfig, MemberState memberState, DLedgerStore dLedgerStore,
2 DLedgerRpcService dLedgerRpcService) {
3 this.dLedgerConfig = dLedgerConfig;
4 this.memberState = memberState;
5 this.dLedgerStore = dLedgerStore;
6 this.dLedgerRpcService = dLedgerRpcService;
7 for (String peer : memberState.getPeerMap().keySet()) {
8 if (!peer.equals(memberState.getSelfId())) {
9 dispatcherMap.put(peer, new EntryDispatcher(peer, logger));
10 }
11 }
12}
构造方法的重点是会根据集群内的节点,依次构建对应的 EntryDispatcher 对象。
1.3 startup
DLedgerEntryPusher#startup
1public void startup() {
2 entryHandler.start();
3 quorumAckChecker.start();
4 for (EntryDispatcher dispatcher : dispatcherMap.values()) {
5 dispatcher.start();
6 }
7}
依次启动 EntryHandler、QuorumAckChecker 与 EntryDispatcher 线程。
备注:DLedgerEntryPusher 的其他核心方法在详细分析其日志复制原理的过程中会一一介绍。
接下来将从 EntryDispatcher、QuorumAckChecker、EntryHandler 来阐述 RocketMQ DLedger(多副本)的实现原理。
2、EntryDispatcher 详解
2.1 核心类图
其核心属性如下。
AtomicReference type 向从节点发送命令的类型,可选值:PushEntryRequest.Type.COMPARE、TRUNCATE、APPEND、COMMIT,下面详细说明。
long lastPushCommitTimeMs = -1 上一次发送提交类型的时间戳。
String peerId 目标节点ID。
long compareIndex = -1 已完成比较的日志序号。
long writeIndex = -1 已写入的日志序号。
int maxPendingSize = 1000 允许的最大挂起日志数量。
long term = -1 Leader 节点当前的投票轮次。
String leaderId = null Leader 节点ID。
long lastCheckLeakTimeMs 上次检测泄漏的时间,所谓的泄漏,就是看挂起的日志请求数量是否查过了 maxPendingSize 。
ConcurrentMap pendingMap 记录日志的挂起时间,key:日志的序列(entryIndex),value:挂起时间戳。
Quota quota = new Quota(dLedgerConfig.getPeerPushQuota()) 配额。
long lastPushCommitTimeMs = -1
上一次发送提交类型的时间戳。
long compareIndex = -1
已完成比较的日志序号。
int maxPendingSize = 1000
允许的最大挂起日志数量。
String leaderId = null
Leader 节点ID。
ConcurrentMap pendingMap
记录日志的挂起时间,key:日志的序列(entryIndex),value:挂起时间戳。
2.2 Push 请求类型
DLedger 主节点向从从节点复制日志总共定义了4类请求类型,其枚举类型为 PushEntryRequest.Type,其值分别为 COMPARE、TRUNCATE、APPEND、COMMIT。
COMPARE 如果 Leader 发生变化,新的 Leader 需要与他的从节点的日志条目进行比较,以便截断从节点多余的数据。
TRUNCATE 如果 Leader 通过索引完成日志对比,则 Leader 将发送 TRUNCATE 给它的从节点。
APPEND 将日志条目追加到从节点。
COMMIT 通常,leader 会将提交的索引附加到 append 请求,但是如果 append 请求很少且分散,leader 将发送一个单独的请求来通知从节点提交的索引。
TRUNCATE
如果 Leader 通过索引完成日志对比,则 Leader 将发送 TRUNCATE 给它的从节点。
COMMIT
通常,leader 会将提交的索引附加到 append 请求,但是如果 append 请求很少且分散,leader 将发送一个单独的请求来通知从节点提交的索引。
对主从节点的请求类型有了一个初步的认识后,我们将从 EntryDispatcher 的业务处理入口 doWork 方法开始讲解。
2.3 doWork 方法详解
1public void doWork() {
2 try {
3 if (!checkAndFreshState()) { // @1
4 waitForRunning(1);
5 return;
6 }
7
8 if (type.get() == PushEntryRequest.Type.APPEND) { // @2
9 doAppend();
10 } else {
11 doCompare(); // @3
12 }
13 waitForRunning(1);
14 } catch (Throwable t) {
15 DLedgerEntryPusher.logger.error("[Push-{}]Error in {} writeIndex={} compareIndex={}", peerId, getName(), writeIndex, compareIndex, t);
16 DLedgerUtils.sleep(500);
17 }
18}
代码@1:检查状态,是否可以继续发送 append 或 compare。
代码@2:如果推送类型为APPEND,主节点向从节点传播消息请求。
代码@3:主节点向从节点发送对比数据差异请求(当一个新节点被选举成为主节点时,往往这是第一步)。
2.3.1 checkAndFreshState 详解
EntryDispatcher#checkAndFreshState
1private boolean checkAndFreshState() {
2 if (!memberState.isLeader()) { // @1
3 return false;
4 }
5 if (term != memberState.currTerm() || leaderId == null || !leaderId.equals(memberState.getLeaderId())) { // @2
6 synchronized (memberState) {
7 if (!memberState.isLeader()) {
8 return false;
9 }
10 PreConditions.check(memberState.getSelfId().equals(memberState.getLeaderId()), DLedgerResponseCode.UNKNOWN);
11 term = memberState.currTerm();
12 leaderId = memberState.getSelfId();
13 changeState(-1, PushEntryRequest.Type.COMPARE);
14 }
15 }
16 return true;
17}
代码@1:如果节点的状态不是主节点,则直接返回 false。则结束 本次 doWork 方法。因为只有主节点才需要向从节点转发日志。
代码@2:如果当前节点状态是主节点,但当前的投票轮次与状态机轮次或 leaderId 还未设置,或 leaderId 与状态机的 leaderId 不相等,这种情况通常是集群触发了重新选举,设置其term、leaderId与状态机同步,即将发送COMPARE 请求。
接下来看一下 changeState (改变状态)。
1private synchronized void changeState(long index, PushEntryRequest.Type target) {
2 logger.info("[Push-{}]Change state from {} to {} at {}", peerId, type.get(), target, index);
3 switch (target) {
4 case APPEND: // @1
5 compareIndex = -1;
6 updatePeerWaterMark(term, peerId, index);
7 quorumAckChecker.wakeup();
8 writeIndex = index + 1;
9 break;
10 case COMPARE: // @2
11 if (this.type.compareAndSet(PushEntryRequest.Type.APPEND, PushEntryRequest.Type.COMPARE)) {
12 compareIndex = -1;
13 pendingMap.clear();
14 }
15 break;
16 case TRUNCATE: // @3
17 compareIndex = -1;
18 break;
19 default:
20 break;
21 }
22 type.set(target);
23}
代码@1:如果将目标类型设置为 append,则重置 compareIndex ,并设置 writeIndex 为当前 index 加1。
代码@2:如果将目标类型设置为 COMPARE,则重置 compareIndex 为负一,接下将向各个从节点发送 COMPARE 请求类似,并清除已挂起的请求。
代码@3:如果将目标类型设置为 TRUNCATE,则重置 compareIndex 为负一。
接下来具体来看一下 APPEND、COMPARE、TRUNCATE 等请求。
2.3.2 append 请求详解
EntryDispatcher#doAppend
1private void doAppend() throws Exception {
2 while (true) {
3 if (!checkAndFreshState()) { // @1
4 break;
5 }
6 if (type.get() != PushEntryRequest.Type.APPEND) { // @2
7 break;
8 }
9 if (writeIndex dLedgerStore.getLedgerEndIndex()) { // @3
10 doCommit();
11 doCheckAppendResponse();
12 break;
13 }
14 if (pendingMap.size() = maxPendingSize || (DLedgerUtils.elapsed(lastCheckLeakTimeMs) 1000)) { // @4
15 long peerWaterMark = getPeerWaterMark(term, peerId);
16 for (Long index : pendingMap.keySet()) {
17 if (index peerWaterMark) {
18 pendingMap.remove(index);
19 }
20 }
21 lastCheckLeakTimeMs = System.currentTimeMillis();
22 }
23 if (pendingMap.size() = maxPendingSize) { // @5
24 doCheckAppendResponse();
25 break;
26 }
27 doAppendInner(writeIndex); // @6
28 writeIndex++;
29 }
30}
代码@1:检查状态,已经在上面详细介绍。
代码@2:如果请求类型不为 APPEND,则退出,结束本轮 doWork 方法执行。
代码@3:writeIndex 表示当前追加到从该节点的序号,通常情况下主节点向从节点发送 append 请求时,会附带主节点的已提交指针,但如何 append 请求发不那么频繁,writeIndex 大于 leaderEndIndex 时(由于pending请求超过其 pending 请求的队列长度(默认为1w),时,会阻止数据的追加,此时有可能出现 writeIndex 大于 leaderEndIndex 的情况,此时单独发送 COMMIT 请求。
代码@4:检测 pendingMap(挂起的请求数量)是否发送泄漏,即挂起队列中容量是否超过允许的最大挂起阀值。获取当前节点关于本轮次的当前水位线(已成功 append 请求的日志序号),如果发现正在挂起请求的日志序号小于水位线,则丢弃。
代码@5:如果挂起的请求(等待从节点追加结果)大于 maxPendingSize 时,检查并追加一次 append 请求。
代码@6:具体的追加请求。
2.3.2.1 doCommit 发送提交请求
EntryDispatcher#doCommit
1private void doCommit() throws Exception {
2 if (DLedgerUtils.elapsed(lastPushCommitTimeMs) 1000) { // @1
3 PushEntryRequest request = buildPushRequest(null, PushEntryRequest.Type.COMMIT); // @2
4 //Ignore the results
5 dLedgerRpcService.push(request); // @3
6 lastPushCommitTimeMs = System.currentTimeMillis();
7 }
8}
代码@1:如果上一次单独发送 commit 的请求时间与当前时间相隔低于 1s,放弃本次提交请求。
代码@2:构建提交请求。
代码@3:通过网络向从节点发送 commit 请求。
接下来先了解一下如何构建 commit 请求包。
EntryDispatcher#buildPushRequest
1private PushEntryRequest buildPushRequest(DLedgerEntry entry, PushEntryRequest.Type target) {
2 PushEntryRequest request = new PushEntryRequest();
3 request.setGroup(memberState.getGroup());
4 request.setRemoteId(peerId);
5 request.setLeaderId(leaderId);
6 request.setTerm(term);
7 request.setEntry(entry);
8 request.setType(target);
9 request.setCommitIndex(dLedgerStore.getCommittedIndex());
10 return request;
11}
提交包请求字段主要包含如下字段:DLedger 节点所属组、从节点 id、主节点 id,当前投票轮次、日志内容、请求类型与 committedIndex(主节点已提交日志序号)。
2.3.2.2 doCheckAppendResponse 检查并追加请求
EntryDispatcher#doCheckAppendResponse
1private void doCheckAppendResponse() throws Exception {
2 long peerWaterMark = getPeerWaterMark(term, peerId); // @1
3 Long sendTimeMs = pendingMap.get(peerWaterMark + 1);
4 if (sendTimeMs != null && System.currentTimeMillis() - sendTimeMs dLedgerConfig.getMaxPushTimeOutMs()) { // @2
5 logger.warn("[Push-{}]Retry to push entry at {}", peerId, peerWaterMark + 1);
6 doAppendInner(peerWaterMark + 1);
7 }
8}
该方法的作用是检查 append 请求是否超时,其关键实现如下:
从挂起的请求队列中获取下一条的发送时间,如果不为空并去超过了 append 的超时时间,则再重新发送 append 请求,最大超时时间默认为 1s,可以通过 maxPushTimeOutMs 来改变默认值。
2.3.2.3 doAppendInner 追加请求
向从节点发送 append 请求。
EntryDispatcher#doAppendInner
1private void doAppendInner(long index) throws Exception {
2 DLedgerEntry entry = dLedgerStore.get(index); // @1
3 PreConditions.check(entry != null, DLedgerResponseCode.UNKNOWN, "writeIndex=%d", index);
4 checkQuotaAndWait(entry); // @2
5 PushEntryRequest request = buildPushRequest(entry, PushEntryRequest.Type.APPEND); // @3
6 CompletableFuturePushEntryResponse responseFuture = dLedgerRpcService.push(request); // @4
7 pendingMap.put(index, System.currentTimeMillis()); // @5
8 responseFuture.whenComplete((x, ex) - {
9 try {
10 PreConditions.check(ex == null, DLedgerResponseCode.UNKNOWN);
11 DLedgerResponseCode responseCode = DLedgerResponseCode.valueOf(x.getCode());
12 switch (responseCode) {
13 case SUCCESS: // @6
14 pendingMap.remove(x.getIndex());
15 updatePeerWaterMark(x.getTerm(), peerId, x.getIndex());
16 quorumAckChecker.wakeup();
17 break;
18 case INCONSISTENT_STATE: // @7
19 logger.info("[Push-{}]Get INCONSISTENT_STATE when push index={} term={}", peerId, x.getIndex(), x.getTerm());
20 changeState(-1, PushEntryRequest.Type.COMPARE);
21 break;
22 default:
23 logger.warn("[Push-{}]Get error response code {} {}", peerId, responseCode, x.baseInfo());
24 break;
25 }
26 } catch (Throwable t) {
27 logger.error("", t);
28 }
29 });
30 lastPushCommitTimeMs = System.currentTimeMillis();
31}
代码@1:首先根据序号查询出日志。
代码@2:检测配额,如果超过配额,会进行一定的限流,其关键实现点:
每秒追加的日志超过 20m(可通过 peerPushQuota 配置),则会 sleep 1s中后再追加。
代码@3:构建 PUSH 请求日志。
代码@4:通过 Netty 发送网络请求到从节点,从节点收到请求会进行处理(本文并不会探讨与网络相关的实现细节)。
代码@5:用 pendingMap 记录待追加的日志的发送时间,用于发送端判断是否超时的一个依据。
代码@6:请求成功的处理逻辑,其关键实现点如下:
更新已成功追加的日志序号(按投票轮次组织,并且每个从服务器一个键值对)。
代码@7:Push 请求出现状态不一致情况,将发送 COMPARE 请求,来对比主从节点的数据是否一致。
日志转发 append 追加请求类型就介绍到这里了,接下来我们继续探讨另一个请求类型 compare。
2.3.3 compare 请求详解
COMPARE 类型的请求有 doCompare 方法发送,首先该方法运行在 while (true) 中,故在查阅下面代码时,要注意其退出循环的条件。
EntryDispatcher#doCompare
1if (!checkAndFreshState()) {
2 break;
3}
4if (type.get() != PushEntryRequest.Type.COMPARE
5 && type.get() != PushEntryRequest.Type.TRUNCATE) {
6 break;
7}
8if (compareIndex == -1 && dLedgerStore.getLedgerEndIndex() == -1) {
9 break;
10}
Step1:验证是否执行,有几个关键点如下:
如果是请求类型不是 COMPARE 或 TRUNCATE 请求,则直接跳出。
EntryDispatcher#doCompare
1if (compareIndex == -1) {
2 compareIndex = dLedgerStore.getLedgerEndIndex();
3 logger.info("[Push-{}][DoCompare] compareIndex=-1 means start to compare", peerId);
4} else if (compareIndex dLedgerStore.getLedgerEndIndex() || compareIndex dLedgerStore.getLedgerBeginIndex()) {
5 logger.info("[Push-{}][DoCompare] compareIndex={} out of range {}-{}", peerId, compareIndex, dLedgerStore.getLedgerBeginIndex(), dLedgerStore.getLedgerEndIndex());
6 compareIndex = dLedgerStore.getLedgerEndIndex();
7}
Step2:如果 compareIndex 为 -1 或compareIndex 不在有效范围内,则重置待比较序列号为当前已已存储的最大日志序号:ledgerEndIndex。
1DLedgerEntry entry = dLedgerStore.get(compareIndex);
2PreConditions.check(entry != null, DLedgerResponseCode.INTERNAL_ERROR, "compareIndex=%d", compareIndex);
3PushEntryRequest request = buildPushRequest(entry, PushEntryRequest.Type.COMPARE);
4CompletableFuturePushEntryResponse responseFuture = dLedgerRpcService.push(request);
5PushEntryResponse response = responseFuture.get(3, TimeUnit.SECONDS);
Step3:根据序号查询到日志,并向从节点发起 COMPARE 请求,其超时时间为 3s。
EntryDispatcher#doCompare
1long truncateIndex = -1;
2if (response.getCode() == DLedgerResponseCode.SUCCESS.getCode()) { // @1
3 if (compareIndex == response.getEndIndex()) {
4 changeState(compareIndex, PushEntryRequest.Type.APPEND);
5 break;
6 } else {
7 truncateIndex = compareIndex;
8 }
9
10} else if (response.getEndIndex() dLedgerStore.getLedgerBeginIndex()
11 || response.getBeginIndex() dLedgerStore.getLedgerEndIndex()) { // @2
12 truncateIndex = dLedgerStore.getLedgerBeginIndex();
13} else if (compareIndex response.getBeginIndex()) { // @3
14 truncateIndex = dLedgerStore.getLedgerBeginIndex();
15} else if (compareIndex response.getEndIndex()) { // @4
16 compareIndex = response.getEndIndex();
17} else { // @5
18 compareIndex--;
19}
20
21if (compareIndex dLedgerStore.getLedgerBeginIndex()) { // @6
22 truncateIndex = dLedgerStore.getLedgerBeginIndex();
23}
Step4:根据响应结果计算需要截断的日志序号,其主要实现关键点如下:
代码@2:如果从节点存储的最大日志序号小于主节点的最小序号,或者从节点的最小日志序号大于主节点的最大日志序号,即两者不相交,这通常发生在从节点崩溃很长一段时间,而主节点删除了过期的条目时。truncateIndex 设置为主节点的 ledgerBeginIndex,即主节点目前最小的偏移量。
代码@4:如果已比较的日志序号大于从节点的最大日志序号,则已比较索引设置为从节点最大的日志序号,触发数据的继续同步。
代码@6:如果比较出来的日志序号小于主节点的最小日志需要,则设置为主节点的最小序号。
1if (truncateIndex != -1) {
2 changeState(truncateIndex, PushEntryRequest.Type.TRUNCATE);
3 doTruncate(truncateIndex);
4 break;
5}
Step5:如果比较出来的日志序号不等于 -1 ,则向从节点发送 TRUNCATE 请求。
2.3.3.1 doTruncate 详解
1private void doTruncate(long truncateIndex) throws Exception {
2 PreConditions.check(type.get() == PushEntryRequest.Type.TRUNCATE, DLedgerResponseCode.UNKNOWN);
3 DLedgerEntry truncateEntry = dLedgerStore.get(truncateIndex);
4 PreConditions.check(truncateEntry != null, DLedgerResponseCode.UNKNOWN);
5 logger.info("[Push-{}]Will push data to truncate truncateIndex={} pos={}", peerId, truncateIndex, truncateEntry.getPos());
6 PushEntryRequest truncateRequest = buildPushRequest(truncateEntry, PushEntryRequest.Type.TRUNCATE);
7 PushEntryResponse truncateResponse = dLedgerRpcService.push(truncateRequest).get(3, TimeUnit.SECONDS);
8 PreConditions.check(truncateResponse != null, DLedgerResponseCode.UNKNOWN, "truncateIndex=%d", truncateIndex);
9 PreConditions.check(truncateResponse.getCode() == DLedgerResponseCode.SUCCESS.getCode(), DLedgerResponseCode.valueOf(truncateResponse.getCode()), "truncateIndex=%d", truncateIndex);
10 lastPushCommitTimeMs = System.currentTimeMillis();
11 changeState(truncateIndex, PushEntryRequest.Type.APPEND);
12}
该方法主要就是构建 truncate 请求到从节点。
关于服务端的消息复制转发就介绍到这里了,主节点负责向从服务器PUSH请求,从节点自然而然的要处理这些请求,接下来我们就按照主节点发送的请求,来具体分析一下从节点是如何响应的。
由于微信单篇文章字数的限制,从服务器接收到主节点的 PUSH 请求后如何处理、以及主服务根据所有从服务器的响应后进行仲裁(需要集群内半数以上节点追加成功后才认为是有效数据)等实现细节,则在下一篇文章中给出。
笔者推荐阅读 RocketMQ DLedger 多副本系列文章:
1、
2、
3、
更多文章请关注中间件兴趣圈公众号:
原文始发于微信公众号(中间件兴趣圈):