源码分析 RocketMQ DLedger(多副本) 之日志复制-上篇

本人花费半年的时间总结的《Java面试指南》已拿腾讯等大厂offer,已开源在github ,欢迎star!

本文GitHub https://github.com/OUYANGSIHAI/JavaInterview 已收录,这是我花了6个月总结的一线大厂Java面试总结,本人已拿大厂offer,欢迎star

原文链接:blog.ouyangsihai.cn >> 源码分析 RocketMQ DLedger(多副本) 之日志复制-上篇

本文紧接着 源码分析 RocketMQ DLedger(多副本) 之日志追加流程  ,继续 Leader 处理客户端 append 的请求流程中最至关重要的一环:日志复制。

温馨提示:由于微信单篇文章的字数限制,RocketMQ DLedger 日志复制分为两篇文章介绍。

DLedger 多副本的日志转发由 DLedgerEntryPusher 实现,接下来将对其进行详细介绍。

温馨提示:由于本篇幅较长,为了更好的理解其实现,大家可以带着如下疑问来通读本篇文章: 1、raft 协议中有一个非常重要的概念:已提交日志序号,该如何实现。 2、客户端向 DLedger 集群发送一条日志,必须得到集群中大多数节点的认可才能被认为写入成功。 3、raft 协议中追加、提交两个动作如何实现。

日志复制(日志转发)由 DLedgerEntryPusher 实现,具体类图如下:

主要由如下4个类构成:

  • DLedgerEntryPusher DLedger 日志转发与处理核心类,该内会启动如下3个对象,其分别对应一个线程。

  • EntryHandler 日志接收处理线程,当节点为从节点时激活。

  • QuorumAckChecker 日志追加ACK投票处理线程,当前节点为主节点时激活。

  • EntryDispatcher 日志转发线程,当前节点为主节点时追加。

  • EntryHandler
    日志接收处理线程,当节点为从节点时激活。

    EntryDispatcher
    日志转发线程,当前节点为主节点时追加。

    接下来我们将详细介绍上述4个类,从而揭晓日志复制的核心实现原理。

    1、DLedgerEntryPusher

    1.1 核心类图

    DLedger 多副本日志推送的核心实现类,里面会创建 EntryDispatcher、QuorumAckChecker、EntryHandler 三个核心线程。其核心属性如下:

  • DLedgerConfig dLedgerConfig 多副本相关配置。

  • DLedgerStore dLedgerStore 存储实现类。

  • MemberState memberState 节点状态机。

  • DLedgerRpcService dLedgerRpcService RPC 服务实现类,用于集群内的其他节点进行网络通讯。

  • Map peerWaterMarksByTerm 每个节点基于投票轮次的当前水位线标记。键值为投票轮次,值为 ConcurrentMap/, Long/* 节点对应的日志序号*/。

  • Map pendingAppendResponsesByTerm 用于存放追加请求的响应结果(Future模式)。

  • EntryHandler entryHandler 从节点上开启的线程,用于接收主节点的 push 请求(append、commit、append)。

  • QuorumAckChecker quorumAckChecker 主节点上的追加请求投票器。

  • Map dispatcherMap 主节点日志请求转发器,向从节点复制消息等。

  • DLedgerStore dLedgerStore
    存储实现类。

    DLedgerRpcService dLedgerRpcService
    RPC 服务实现类,用于集群内的其他节点进行网络通讯。

    Map pendingAppendResponsesByTerm
    用于存放追加请求的响应结果(Future模式)。

    QuorumAckChecker quorumAckChecker
    主节点上的追加请求投票器。

    接下来介绍一下其核心方法的实现。

    1.2 构造方法

    
     1public DLedgerEntryPusher(DLedgerConfig dLedgerConfig, MemberState memberState, DLedgerStore dLedgerStore,
     2    DLedgerRpcService dLedgerRpcService) {
     3    this.dLedgerConfig = dLedgerConfig;
     4    this.memberState = memberState;
     5    this.dLedgerStore = dLedgerStore;
     6    this.dLedgerRpcService = dLedgerRpcService;
     7    for (String peer : memberState.getPeerMap().keySet()) {
     8        if (!peer.equals(memberState.getSelfId())) {
     9            dispatcherMap.put(peer, new EntryDispatcher(peer, logger));
    10        }
    11    }
    12}
    

    构造方法的重点是会根据集群内的节点,依次构建对应的 EntryDispatcher 对象。

    1.3 startup

    DLedgerEntryPusher#startup

    
    1public void startup() {
    2    entryHandler.start();
    3    quorumAckChecker.start();
    4    for (EntryDispatcher dispatcher : dispatcherMap.values()) {
    5        dispatcher.start();
    6    }
    7}
    

    依次启动 EntryHandler、QuorumAckChecker 与 EntryDispatcher 线程。

    备注:DLedgerEntryPusher 的其他核心方法在详细分析其日志复制原理的过程中会一一介绍。

    接下来将从 EntryDispatcher、QuorumAckChecker、EntryHandler 来阐述 RocketMQ DLedger(多副本)的实现原理。

    2、EntryDispatcher 详解

    2.1 核心类图

    其核心属性如下。

  • AtomicReference type 向从节点发送命令的类型,可选值:PushEntryRequest.Type.COMPARE、TRUNCATE、APPEND、COMMIT,下面详细说明。

  • long lastPushCommitTimeMs = -1 上一次发送提交类型的时间戳。

  • String peerId 目标节点ID。

  • long compareIndex = -1 已完成比较的日志序号。

  • long writeIndex = -1 已写入的日志序号。

  • int maxPendingSize = 1000 允许的最大挂起日志数量。

  • long term = -1   Leader 节点当前的投票轮次。

  • String leaderId = null Leader 节点ID。

  • long lastCheckLeakTimeMs 上次检测泄漏的时间,所谓的泄漏,就是看挂起的日志请求数量是否查过了 maxPendingSize 。

  • ConcurrentMap pendingMap 记录日志的挂起时间,key:日志的序列(entryIndex),value:挂起时间戳。

  • Quota quota = new Quota(dLedgerConfig.getPeerPushQuota()) 配额。

  • long lastPushCommitTimeMs = -1
    上一次发送提交类型的时间戳。

    long compareIndex = -1
    已完成比较的日志序号。

    int maxPendingSize = 1000
    允许的最大挂起日志数量。

    String leaderId = null
    Leader 节点ID。

    ConcurrentMap pendingMap
    记录日志的挂起时间,key:日志的序列(entryIndex),value:挂起时间戳。

    2.2 Push 请求类型

    DLedger 主节点向从从节点复制日志总共定义了4类请求类型,其枚举类型为 PushEntryRequest.Type,其值分别为 COMPARE、TRUNCATE、APPEND、COMMIT。

  • COMPARE 如果 Leader 发生变化,新的 Leader 需要与他的从节点的日志条目进行比较,以便截断从节点多余的数据。

  • TRUNCATE 如果 Leader 通过索引完成日志对比,则 Leader 将发送  TRUNCATE 给它的从节点。

  • APPEND 将日志条目追加到从节点。

  • COMMIT 通常,leader 会将提交的索引附加到 append 请求,但是如果 append 请求很少且分散,leader 将发送一个单独的请求来通知从节点提交的索引。

  • TRUNCATE
    如果 Leader 通过索引完成日志对比,则 Leader 将发送  TRUNCATE 给它的从节点。

    COMMIT
    通常,leader 会将提交的索引附加到 append 请求,但是如果 append 请求很少且分散,leader 将发送一个单独的请求来通知从节点提交的索引。

    对主从节点的请求类型有了一个初步的认识后,我们将从 EntryDispatcher 的业务处理入口 doWork 方法开始讲解。

    2.3 doWork 方法详解

    
     1public void doWork() {
     2    try {
     3        if (!checkAndFreshState()) {                                            // @1
     4            waitForRunning(1);
     5            return;
     6        }
     7
     8        if (type.get() == PushEntryRequest.Type.APPEND) {   // @2
     9            doAppend();
    10        } else {
    11            doCompare();                                                           // @3
    12        }
    13        waitForRunning(1);
    14    } catch (Throwable t) {
    15        DLedgerEntryPusher.logger.error("[Push-{}]Error in {} writeIndex={} compareIndex={}", peerId, getName(), writeIndex, compareIndex, t);
    16        DLedgerUtils.sleep(500);
    17    }
    18}
    

    代码@1:检查状态,是否可以继续发送 append 或 compare。

    代码@2:如果推送类型为APPEND,主节点向从节点传播消息请求。

    代码@3:主节点向从节点发送对比数据差异请求(当一个新节点被选举成为主节点时,往往这是第一步)。

    2.3.1 checkAndFreshState 详解

    EntryDispatcher#checkAndFreshState

    
     1private boolean checkAndFreshState() {
     2    if (!memberState.isLeader()) {     // @1
     3        return false;
     4    }
     5    if (term != memberState.currTerm() || leaderId == null || !leaderId.equals(memberState.getLeaderId())) {     // @2
     6        synchronized (memberState) {
     7            if (!memberState.isLeader()) {
     8                return false;
     9            }
    10            PreConditions.check(memberState.getSelfId().equals(memberState.getLeaderId()), DLedgerResponseCode.UNKNOWN);
    11            term = memberState.currTerm();
    12            leaderId = memberState.getSelfId();
    13            changeState(-1, PushEntryRequest.Type.COMPARE);
    14        }
    15    }
    16    return true;
    17}
    

    代码@1:如果节点的状态不是主节点,则直接返回 false。则结束 本次 doWork 方法。因为只有主节点才需要向从节点转发日志。

    代码@2:如果当前节点状态是主节点,但当前的投票轮次与状态机轮次或 leaderId 还未设置,或 leaderId 与状态机的 leaderId 不相等,这种情况通常是集群触发了重新选举,设置其term、leaderId与状态机同步,即将发送COMPARE 请求。

    接下来看一下 changeState (改变状态)。

    
     1private synchronized void changeState(long index, PushEntryRequest.Type target) {
     2    logger.info("[Push-{}]Change state from {} to {} at {}", peerId, type.get(), target, index);
     3    switch (target) {
     4        case APPEND:      // @1
     5            compareIndex = -1;
     6            updatePeerWaterMark(term, peerId, index);
     7            quorumAckChecker.wakeup();
     8            writeIndex = index + 1;
     9            break;
    10        case COMPARE:    // @2
    11            if (this.type.compareAndSet(PushEntryRequest.Type.APPEND, PushEntryRequest.Type.COMPARE)) {
    12                compareIndex = -1;
    13                pendingMap.clear();
    14            }
    15            break;
    16        case TRUNCATE:     // @3
    17            compareIndex = -1;
    18            break;
    19        default:
    20            break;
    21    }
    22    type.set(target);
    23} 
    

    代码@1:如果将目标类型设置为 append,则重置 compareIndex ,并设置 writeIndex 为当前 index 加1。

    代码@2:如果将目标类型设置为 COMPARE,则重置 compareIndex 为负一,接下将向各个从节点发送 COMPARE 请求类似,并清除已挂起的请求。

    代码@3:如果将目标类型设置为 TRUNCATE,则重置 compareIndex 为负一。

    接下来具体来看一下 APPEND、COMPARE、TRUNCATE 等请求。

    2.3.2 append 请求详解

    EntryDispatcher#doAppend

    
     1private void doAppend() throws Exception {
     2    while (true) {
     3        if (!checkAndFreshState()) {                                                 // @1
     4            break;
     5        }
     6        if (type.get() != PushEntryRequest.Type.APPEND) {        // @2
     7            break;
     8        }
     9        if (writeIndex  dLedgerStore.getLedgerEndIndex()) {    // @3
    10            doCommit();
    11            doCheckAppendResponse();
    12            break;
    13        }
    14        if (pendingMap.size() = maxPendingSize || (DLedgerUtils.elapsed(lastCheckLeakTimeMs)  1000)) {     // @4
    15            long peerWaterMark = getPeerWaterMark(term, peerId);
    16            for (Long index : pendingMap.keySet()) {
    17                if (index  peerWaterMark) {
    18                    pendingMap.remove(index);
    19                }
    20            }
    21            lastCheckLeakTimeMs = System.currentTimeMillis();
    22        }
    23        if (pendingMap.size() = maxPendingSize) {    // @5
    24            doCheckAppendResponse();
    25            break;
    26        }
    27        doAppendInner(writeIndex);                               // @6
    28        writeIndex++;
    29    }
    30}
    

    代码@1:检查状态,已经在上面详细介绍。

    代码@2:如果请求类型不为 APPEND,则退出,结束本轮 doWork 方法执行。

    代码@3:writeIndex 表示当前追加到从该节点的序号,通常情况下主节点向从节点发送 append 请求时,会附带主节点的已提交指针,但如何 append 请求发不那么频繁,writeIndex 大于 leaderEndIndex 时(由于pending请求超过其 pending 请求的队列长度(默认为1w),时,会阻止数据的追加,此时有可能出现 writeIndex 大于 leaderEndIndex 的情况,此时单独发送 COMMIT 请求。

    代码@4:检测 pendingMap(挂起的请求数量)是否发送泄漏,即挂起队列中容量是否超过允许的最大挂起阀值。获取当前节点关于本轮次的当前水位线(已成功 append 请求的日志序号),如果发现正在挂起请求的日志序号小于水位线,则丢弃。

    代码@5:如果挂起的请求(等待从节点追加结果)大于 maxPendingSize 时,检查并追加一次 append 请求。

    代码@6:具体的追加请求。

    2.3.2.1 doCommit 发送提交请求

    EntryDispatcher#doCommit

    
    1private void doCommit() throws Exception {
    2    if (DLedgerUtils.elapsed(lastPushCommitTimeMs)  1000) {   // @1
    3        PushEntryRequest request = buildPushRequest(null, PushEntryRequest.Type.COMMIT);   // @2
    4        //Ignore the results
    5        dLedgerRpcService.push(request);                                                                                        // @3
    6        lastPushCommitTimeMs = System.currentTimeMillis();
    7    }
    8}
    

    代码@1:如果上一次单独发送 commit 的请求时间与当前时间相隔低于 1s,放弃本次提交请求。

    代码@2:构建提交请求。

    代码@3:通过网络向从节点发送 commit 请求。

    接下来先了解一下如何构建 commit 请求包。

    EntryDispatcher#buildPushRequest

    
     1private PushEntryRequest buildPushRequest(DLedgerEntry entry, PushEntryRequest.Type target) {
     2    PushEntryRequest request = new PushEntryRequest();
     3    request.setGroup(memberState.getGroup());  
     4    request.setRemoteId(peerId);                          
     5    request.setLeaderId(leaderId);
     6    request.setTerm(term);
     7    request.setEntry(entry);
     8    request.setType(target);
     9    request.setCommitIndex(dLedgerStore.getCommittedIndex());
    10    return request;
    11}
    

    提交包请求字段主要包含如下字段:DLedger 节点所属组、从节点 id、主节点 id,当前投票轮次、日志内容、请求类型与 committedIndex(主节点已提交日志序号)。

    2.3.2.2 doCheckAppendResponse 检查并追加请求

    EntryDispatcher#doCheckAppendResponse

    
    1private void doCheckAppendResponse() throws Exception {
    2    long peerWaterMark = getPeerWaterMark(term, peerId);   // @1
    3    Long sendTimeMs = pendingMap.get(peerWaterMark + 1); 
    4    if (sendTimeMs != null && System.currentTimeMillis() - sendTimeMs  dLedgerConfig.getMaxPushTimeOutMs()) { // @2
    5        logger.warn("[Push-{}]Retry to push entry at {}", peerId, peerWaterMark + 1);
    6        doAppendInner(peerWaterMark + 1);
    7    }
    8}
    

    该方法的作用是检查 append 请求是否超时,其关键实现如下:

  • 获取已成功 append 的序号。
  • 从挂起的请求队列中获取下一条的发送时间,如果不为空并去超过了 append 的超时时间,则再重新发送 append 请求,最大超时时间默认为 1s,可以通过 maxPushTimeOutMs 来改变默认值。
  • 从挂起的请求队列中获取下一条的发送时间,如果不为空并去超过了 append 的超时时间,则再重新发送 append 请求,最大超时时间默认为 1s,可以通过 maxPushTimeOutMs 来改变默认值。

    2.3.2.3 doAppendInner 追加请求

    向从节点发送 append 请求。

    EntryDispatcher#doAppendInner

    
     1private void doAppendInner(long index) throws Exception {
     2    DLedgerEntry entry = dLedgerStore.get(index);   // @1
     3    PreConditions.check(entry != null, DLedgerResponseCode.UNKNOWN, "writeIndex=%d", index);
     4    checkQuotaAndWait(entry);                                   // @2
     5    PushEntryRequest request = buildPushRequest(entry, PushEntryRequest.Type.APPEND);   // @3
     6    CompletableFuturePushEntryResponse responseFuture = dLedgerRpcService.push(request);   // @4
     7    pendingMap.put(index, System.currentTimeMillis());                                                                          // @5
     8    responseFuture.whenComplete((x, ex) - {
     9        try {
    10            PreConditions.check(ex == null, DLedgerResponseCode.UNKNOWN);
    11            DLedgerResponseCode responseCode = DLedgerResponseCode.valueOf(x.getCode());
    12            switch (responseCode) {
    13                case SUCCESS:                                                                                                                // @6
    14                    pendingMap.remove(x.getIndex());
    15                    updatePeerWaterMark(x.getTerm(), peerId, x.getIndex());
    16                    quorumAckChecker.wakeup();
    17                    break;
    18                case INCONSISTENT_STATE:                                                                                         // @7
    19                    logger.info("[Push-{}]Get INCONSISTENT_STATE when push index={} term={}", peerId, x.getIndex(), x.getTerm());
    20                    changeState(-1, PushEntryRequest.Type.COMPARE);
    21                    break;
    22                default:
    23                    logger.warn("[Push-{}]Get error response code {} {}", peerId, responseCode, x.baseInfo());
    24                    break;
    25            }
    26        } catch (Throwable t) {
    27            logger.error("", t);
    28        }
    29    });
    30    lastPushCommitTimeMs = System.currentTimeMillis();
    31}
    

    代码@1:首先根据序号查询出日志。

    代码@2:检测配额,如果超过配额,会进行一定的限流,其关键实现点:

  • 首先触发条件:append 挂起请求数已超过最大允许挂起数;基于文件存储并主从差异超过300m,可通过 peerPushThrottlePoint 配置。
  • 每秒追加的日志超过 20m(可通过 peerPushQuota 配置),则会 sleep 1s中后再追加。
  • 每秒追加的日志超过 20m(可通过 peerPushQuota 配置),则会 sleep 1s中后再追加。

    代码@3:构建 PUSH  请求日志。

    代码@4:通过 Netty 发送网络请求到从节点,从节点收到请求会进行处理(本文并不会探讨与网络相关的实现细节)。

    代码@5:用 pendingMap 记录待追加的日志的发送时间,用于发送端判断是否超时的一个依据。

    代码@6:请求成功的处理逻辑,其关键实现点如下:

  • 移除 pendingMap 中的关于该日志的发送超时时间。
  • 更新已成功追加的日志序号(按投票轮次组织,并且每个从服务器一个键值对)。
  • 唤醒 quorumAckChecker 线程(主要用于仲裁 append 结果),后续会详细介绍。
  • 更新已成功追加的日志序号(按投票轮次组织,并且每个从服务器一个键值对)。

    代码@7:Push 请求出现状态不一致情况,将发送 COMPARE 请求,来对比主从节点的数据是否一致。

    日志转发 append 追加请求类型就介绍到这里了,接下来我们继续探讨另一个请求类型 compare。

    2.3.3  compare 请求详解

    COMPARE 类型的请求有 doCompare 方法发送,首先该方法运行在 while (true) 中,故在查阅下面代码时,要注意其退出循环的条件。
    EntryDispatcher#doCompare

    
     1if (!checkAndFreshState()) {
     2    break;
     3}
     4if (type.get() != PushEntryRequest.Type.COMPARE
     5    && type.get() != PushEntryRequest.Type.TRUNCATE) {
     6    break;
     7}
     8if (compareIndex == -1 && dLedgerStore.getLedgerEndIndex() == -1) {
     9    break;
    10}
    

    Step1:验证是否执行,有几个关键点如下:

  • 判断是否是主节点,如果不是主节点,则直接跳出。
  • 如果是请求类型不是 COMPARE 或 TRUNCATE 请求,则直接跳出。
  • 如果已比较索引 和 ledgerEndIndex 都为 -1 ,表示一个新的 DLedger 集群,则直接跳出。
  • 如果是请求类型不是 COMPARE 或 TRUNCATE 请求,则直接跳出。

    EntryDispatcher#doCompare

    
    1if (compareIndex == -1) {
    2    compareIndex = dLedgerStore.getLedgerEndIndex();
    3    logger.info("[Push-{}][DoCompare] compareIndex=-1 means start to compare", peerId);
    4} else if (compareIndex  dLedgerStore.getLedgerEndIndex() || compareIndex  dLedgerStore.getLedgerBeginIndex()) {
    5    logger.info("[Push-{}][DoCompare] compareIndex={} out of range {}-{}", peerId, compareIndex, dLedgerStore.getLedgerBeginIndex(), dLedgerStore.getLedgerEndIndex());
    6    compareIndex = dLedgerStore.getLedgerEndIndex();
    7}
    

    Step2:如果 compareIndex 为 -1 或compareIndex 不在有效范围内,则重置待比较序列号为当前已已存储的最大日志序号:ledgerEndIndex。

    
    1DLedgerEntry entry = dLedgerStore.get(compareIndex);
    2PreConditions.check(entry != null, DLedgerResponseCode.INTERNAL_ERROR, "compareIndex=%d", compareIndex);
    3PushEntryRequest request = buildPushRequest(entry, PushEntryRequest.Type.COMPARE);
    4CompletableFuturePushEntryResponse responseFuture = dLedgerRpcService.push(request);
    5PushEntryResponse response = responseFuture.get(3, TimeUnit.SECONDS);
    

    Step3:根据序号查询到日志,并向从节点发起 COMPARE 请求,其超时时间为 3s。

    EntryDispatcher#doCompare

    
     1long truncateIndex = -1;
     2if (response.getCode() == DLedgerResponseCode.SUCCESS.getCode()) {   // @1
     3    if (compareIndex == response.getEndIndex()) {
     4        changeState(compareIndex, PushEntryRequest.Type.APPEND);
     5        break;
     6    } else {
     7        truncateIndex = compareIndex;
     8    }
     9
    10} else if (response.getEndIndex()  dLedgerStore.getLedgerBeginIndex() 
    11        || response.getBeginIndex()  dLedgerStore.getLedgerEndIndex()) {    // @2
    12    truncateIndex = dLedgerStore.getLedgerBeginIndex();
    13} else if (compareIndex  response.getBeginIndex()) {                                    // @3
    14    truncateIndex = dLedgerStore.getLedgerBeginIndex();
    15} else if (compareIndex  response.getEndIndex()) {                                      // @4
    16    compareIndex = response.getEndIndex();
    17} else {                                                                                                              // @5
    18    compareIndex--;
    19}
    20
    21if (compareIndex  dLedgerStore.getLedgerBeginIndex()) {                          // @6
    22    truncateIndex = dLedgerStore.getLedgerBeginIndex();
    23}
    

    Step4:根据响应结果计算需要截断的日志序号,其主要实现关键点如下:

  • 代码@1:如果两者的日志序号相同,则无需截断,下次将直接先从节点发送 append 请求;否则将 truncateIndex  设置为响应结果中的 endIndex。
  • 代码@2:如果从节点存储的最大日志序号小于主节点的最小序号,或者从节点的最小日志序号大于主节点的最大日志序号,即两者不相交,这通常发生在从节点崩溃很长一段时间,而主节点删除了过期的条目时。truncateIndex 设置为主节点的 ledgerBeginIndex,即主节点目前最小的偏移量。
  • 代码@3:如果已比较的日志序号小于从节点的开始日志序号,很可能是从节点磁盘发送损耗,从主节点最小日志序号开始同步。
  • 代码@4:如果已比较的日志序号大于从节点的最大日志序号,则已比较索引设置为从节点最大的日志序号,触发数据的继续同步。
  • 代码@5:如果已比较的日志序号大于从节点的开始日志序号,但小于从节点的最大日志序号,则待比较索引减一。
  • 代码@6:如果比较出来的日志序号小于主节点的最小日志需要,则设置为主节点的最小序号。
  • 代码@2:如果从节点存储的最大日志序号小于主节点的最小序号,或者从节点的最小日志序号大于主节点的最大日志序号,即两者不相交,这通常发生在从节点崩溃很长一段时间,而主节点删除了过期的条目时。truncateIndex 设置为主节点的 ledgerBeginIndex,即主节点目前最小的偏移量。

    代码@4:如果已比较的日志序号大于从节点的最大日志序号,则已比较索引设置为从节点最大的日志序号,触发数据的继续同步。

    代码@6:如果比较出来的日志序号小于主节点的最小日志需要,则设置为主节点的最小序号。

    
    1if (truncateIndex != -1) {
    2    changeState(truncateIndex, PushEntryRequest.Type.TRUNCATE);
    3    doTruncate(truncateIndex);
    4    break;
    5}
    

    Step5:如果比较出来的日志序号不等于 -1 ,则向从节点发送 TRUNCATE 请求。

    2.3.3.1 doTruncate 详解
    
     1private void doTruncate(long truncateIndex) throws Exception {
     2    PreConditions.check(type.get() == PushEntryRequest.Type.TRUNCATE, DLedgerResponseCode.UNKNOWN);
     3    DLedgerEntry truncateEntry = dLedgerStore.get(truncateIndex);
     4    PreConditions.check(truncateEntry != null, DLedgerResponseCode.UNKNOWN);
     5    logger.info("[Push-{}]Will push data to truncate truncateIndex={} pos={}", peerId, truncateIndex, truncateEntry.getPos());
     6    PushEntryRequest truncateRequest = buildPushRequest(truncateEntry, PushEntryRequest.Type.TRUNCATE);
     7    PushEntryResponse truncateResponse = dLedgerRpcService.push(truncateRequest).get(3, TimeUnit.SECONDS);
     8    PreConditions.check(truncateResponse != null, DLedgerResponseCode.UNKNOWN, "truncateIndex=%d", truncateIndex);
     9    PreConditions.check(truncateResponse.getCode() == DLedgerResponseCode.SUCCESS.getCode(), DLedgerResponseCode.valueOf(truncateResponse.getCode()), "truncateIndex=%d", truncateIndex);
    10    lastPushCommitTimeMs = System.currentTimeMillis();
    11    changeState(truncateIndex, PushEntryRequest.Type.APPEND);
    12}
    

    该方法主要就是构建 truncate 请求到从节点。

    关于服务端的消息复制转发就介绍到这里了,主节点负责向从服务器PUSH请求,从节点自然而然的要处理这些请求,接下来我们就按照主节点发送的请求,来具体分析一下从节点是如何响应的。

    由于微信单篇文章字数的限制,从服务器接收到主节点的 PUSH 请求后如何处理、以及主服务根据所有从服务器的响应后进行仲裁(需要集群内半数以上节点追加成功后才认为是有效数据)等实现细节,则在下一篇文章中给出。

    笔者推荐阅读 RocketMQ DLedger 多副本系列文章:

    1、

    2、

    3、

    更多文章请关注中间件兴趣圈公众号:

    源码分析 RocketMQ DLedger(多副本) 之日志复制-上篇

    原文始发于微信公众号(中间件兴趣圈):

    本人花费半年的时间总结的《Java面试指南》已拿腾讯等大厂offer,已开源在github ,欢迎star!

    本文GitHub https://github.com/OUYANGSIHAI/JavaInterview 已收录,这是我花了6个月总结的一线大厂Java面试总结,本人已拿大厂offer,欢迎star

    原文链接:blog.ouyangsihai.cn >> 源码分析 RocketMQ DLedger(多副本) 之日志复制-上篇


     上一篇
    源码分析 RocketMQ DLedger(多副本) 之日志复制-下篇 源码分析 RocketMQ DLedger(多副本) 之日志复制-下篇
    温馨提示:由于微信单篇文章的字数限制,RocketMQ DLedger 日志复制分为两篇文章介绍。本篇紧接着上文。 3、EntryHandler 详解EntryHandler 同样是一个线程,当节点状态为从节点时激活。 3.1 核
    下一篇 
    源码分析 RocketMQ DLedger(多副本) 之日志追加流程 源码分析 RocketMQ DLedger(多副本) 之日志追加流程
    上一篇我们详细分析了 ,本文将详细分析日志复制的实现。 有了前篇 ,本文将直接从 Leader 处理客户端请求入口开始,其入口为:DLedgerServer 的 handleAppend 方法开始讲起。 1、日志复制基本流程在正式分析 Ro