# Raft算法及etcd/raft的实现思路借鉴

#### 节点需要维持的几个重要属性

• currentTerm 当前的任期号，从0开始递增 (Time is divided into terms, and each term begins with an election.)
• commitIndex 当前被调剂哦的最大日志条目的索引值

#### raft的选举过程

raft节点维持一个有限状态机，其中包含 Follower , Candidate , Leader 三个状态，状态的转换如下所示：

1. Follower => Candidate : 所有节点在初始时均设置为 Follower ，正常情况下，相应来自Leader的RPC请求或者候选人的投票请求；心跳超时的时候，感知到 Leader 已经迷失，且暂无候选人征求投票，那么将随机sleep一段时间（避免 split vote），然后将自身置为 Candidate

2. Candidate => Leader : 节点状态为Candidate时，投票给自己，然后向所有的已知节点发送投票请求，统计投票结果：

3. 当自己获得超过半数的选票时，则将状态从 Candidate 转为 Leader ，并将currentTerm + 1，正如算法描述中说的，每一轮（Term）从一次选举开始，到下一次选举结束。
3. Leader => Follower : 当Leader节点无法正常发送心跳导致Follower心跳超时，在Leader节点恢复之后，除了Leader之外的其它节点可能已经重新选举：

2. 正常发送心跳信息时，接收到的Response.Term比currentTerm更大，将自己切换为Follower状态

Follower/Candidate/Leader 的往来消息中，一定包含 Term 信息

#### 日志复制

raft中的日志(log entry)并不是系统Debug日志，而是序列化后的 command ，这些Command复制到各个节点后，通过序列化内容的解析出命令后，在各个节点上执行并返回操作结果从而实现 复制状态机

#### RPC调用

raft共识算法中，维持心跳、日志复制、请求投票均由Leader/Candidate向所有的Follower节点发送，通过 RequestVoteAppendEntries 两个RPC方法实现。

1. netty + avro 简单地实现RPC调用
2. 直接使用 gRPC

#### 状态迁移

Raft as a state machine. The state machine takes a Message as input. A message can either be a local timer update or a network message sent from a remote peer. The state machine’s output is a 3-tuple {[]Messages, []LogEntries, NextState} consisting of an array of Messages , log entries , and Raft state changes . For state machines with the same state, the same state machine input should always generate the same state machine output.

etcd/raft实现了一个状态机，输入 Message ，输出 {[]Messages, []LogEntries, NextState} ，并且确保输出函数的幂等性。其中状态机和输出函数的对应关系如下：

1. Follower stepFollower(r *raft, m pb.Message)
2. Candidate stepCandidate(r *raft, m pb.Message)
3. Leader stepLeader(r *raft, m pb.Message)

#### 选举 心跳 超时

Tick 可以理解为撞针，每次产生撞击的动作：

electionElapsed/heartbeatElapsed


etcd/raft中没有指定何时产生Tick动作，raftexample中给出的例子是由 time.Ticker 驱动(golang中的 time.NewTicker 可以取一个定时发送通知的channel，也是一个定时器)

#### 核心实现

Tick

1. Write Entries, HardState and Snapshot to persistent storage in order, i.e. Entries first, then HardState and Snapshot if they are not empty. If persistent storage supports atomic writes then all of them can be written together. Note that when writing an Entry with Index i, any previously-persisted entries with Index >= i must be discarded.
2. Send all Messages to the nodes named in the To field. It is important that no messages be sent until the latest HardState has been persisted to disk, and all Entries written by any previous Ready batch (Messages may be sent while entries from the same batch are being persisted). To reduce the I/O latency, an optimization can be applied to make leader write to disk in parallel with its followers (as explained at section 10.2.1 in Raft thesis). If any Message has type MsgSnap, call Node.ReportSnapshot() after it has been sent (these messages may be large). Note: Marshalling messages is not thread-safe; it is important to make sure that no new entries are persisted while marshalling. The easiest way to achieve this is to serialise the messages directly inside the main raft loop.
3. Apply Snapshot (if any) and CommittedEntries to the state machine. If any committed Entry has Type EntryConfChange, call Node.ApplyConfChange() to apply it to the node. The configuration change may be cancelled at this point by setting the NodeID field to zero before calling ApplyConfChange (but ApplyConfChange must be called one way or the other, and the decision to cancel must be based solely on the state machine and not external information such as the observed health of the node).
4. Call Node.Advance() to signal readiness for the next batch of updates. This may be done at any time after step 1, although all updates must be processed in the order they were returned by Ready.

go func() {
// 提案、配置处理协程
confChangeCount := uint64(0)

for rc.proposeC != nil && rc.confChangeC != nil {
select {
case prop, ok := <-rc.proposeC:
if !ok {
rc.proposeC = nil
} else {
// blocks until accepted by raft state machine
rc.node.Propose(context.TODO(), []byte(prop))
}

case cc, ok := <-rc.confChangeC:
if !ok {
rc.confChangeC = nil
} else {
confChangeCount++
cc.ID = confChangeCount
rc.node.ProposeConfChange(context.TODO(), cc)
}
}
}
// client closed channel; shutdown raft if not already
close(rc.stopc)
}()

// event loop on raft state machine updates
for {
select {
case <-ticker.C:
rc.node.Tick()

// store raft entries to wal, then publish over commit channel

// 持久化HardState、LogEntities （HardState包含Term、Vote、Commit等信息）
rc.wal.Save(rd.HardState, rd.Entries)
// 处理Snapshot
if !raft.IsEmptySnap(rd.Snapshot) {
rc.saveSnap(rd.Snapshot)
rc.raftStorage.ApplySnapshot(rd.Snapshot)
rc.publishSnapshot(rd.Snapshot)
}

// 广播发送日志到所有节点（提交。提交成功的原则：多数节点确认提交）
rc.raftStorage.Append(rd.Entries)
rc.transport.Send(rd.Messages)
// 应用状态机到所有的节点 (lastApply, committedIndex]: 通过commitC通道告诉其它处理协程
if ok := rc.publishEntries(rc.entriesToApply(rd.CommittedEntries)); !ok {
rc.stop()
return
}
// 尝试创建snapshot
rc.maybeTriggerSnapshot()
// 提示raft (rd) 处理完成完成

case err := <-rc.transport.ErrorC:
rc.writeError(err)
return

case <-rc.stopc:
rc.stop()
return
}
}


#### etcd/raft的日志同步

etcd/raft中用了 Progress 来控制leader和follower之间的日志同步，progress 表示的是在Leader视角下的Follower的日志复制的进度，其中包含了 probe snapshot replicate 三种状态，以下是etcd的desin文稿中的描述：

                            +--------------------------------------------------------+
|                  send snapshot                         |
|                                                        |
+---------+----------+                                  +----------v---------+
+--->       probe        |                                  |      snapshot      |
|   |  max inflight = 1  lastsnap.index)
|             |               (match=m.index,next=match+1)
(next=match+1)|             |
|             |
|             |
|             |   (match=m.index,next=match+1)
|             |
|             |
|             |
|   +---------v----------+
|   |     replicate      |
+---+  max inflight = n  |
+--------------------+


#### Probe状态

max inflight = 1

#### Replicate状态

max inflight = n

#### Snapshot状态

max inflight = 0

#### 状态切换

progerss允许的状态切换： probe replicate snapshot

1. init: probe

etcd/raft/design.md: A newly elected leader sets the progresses of all the followers to probe state with match = 0 and next = last indexetcd/raft design

1. probe replicate

The leader maintains a nextIndex for each follower, which is the index of the next log entry the leader will send to that follower. When a leader first comes to power, it initializes all nextIndex values to the index just after the last one in its log (11 in Figure 7). If a follower’s log is inconsistent with the leader’s, the AppendEntries consistency check will fail in the next AppendEntries RPC. After a rejection, the leader decrements nextIndex and retries the AppendEntries RPC. Eventually nextIndex will reach a point where the leader and follower logs match. When this happens, AppendEntries will succeed, which removes any conflicting entries in the follower’s log and appends entries from the leader’s log (if any). Once AppendEntries succeeds, the follower’s log is consistent with the leader’s, and it will remain that way for the rest of the term.raft paper

1. probe snapshot

#### 回顾

raft有很多详细的文献资料可以参考，而且大多不会晦涩难懂，在初期理解raft集群的工作原理非常有帮助。不过理解原理是一回事，动手实践又是另一回事，这个过程中不乏因为理解错误而做出有问题的实现（表现在coding的时候越来越疑惑，又回过头去翻raft的论文）。有许多的细节需要去考究，比如I/O的实现、持久化存储与日志压缩、超时机制等。参考etcd/raft的实现方式，在尝试理解别人的思考方式的过程中，可以从别的角度找到一些答案，以及许多东西可以借鉴，比如我都不会想到定时器可以抽象为 Tick （可测试性和可拓展性远高于写死的Timer）