论文回顾
先简单回顾下raft论文的主要内容以及raft的流程。
raft是一个一致性的协议,主要用来实现一个多副本的状态机,如下图:

状态机的初始状态一样,然后输入同样的命令序列,那么状态机的最终状态也是一样的。raft就是用来协商生成同样的日志序列。
raft主要流程
raft主要是为了可理解性设计的,方便研发人员形成直觉。raft解决问题的思路是任务的分解,把一致性问题分解成了下面的子问题:
- leader选举
- 日志复制
- 安全性
leader选举
每个节点有三种状态,Follower, Candidate, Leader。正常情况下,Leader会不停的给Follower发心跳,来保持自己的权威。当Leader异常,Follower收不到新的心跳的时候,follower就变成Candidate,term加1,参加选举,当收到多数派的同意后,这个Candidate就变成了leader。每个leader对应一个term,每个term最多只有一个leader。为了防止split vote,引入了随机化的election timeout,election timeout小的那个节点最先参与选举,有效的避免了split vote的问题。

日志复制
raft客户端的读写请求都是由leader来处理的,如果是follower收到了读写的请求,会把请求转发给leader来处理。raft的日志只会从leader向follower流动。Log Matching Property:每条日志都记录有日志所在的term,以及日志的index。在AppendEntry的时候,会对比上一条日志的term和index看和leader的是否一致,如果不一致的话,会向前滚动,直到一致为止,然后从这个一致的地方开始同步日志,这样所有的日志都以leader的为准。

安全性
- 在选leader的时候,保证leader要有最长最新的日志;
- leader不会主动提交上个term的日志;
- Log Matching Property保证所有的日志都是一样的。
raft集群成员变更
raft集群成员变更主要是要避免同时有两个主的场景,raft使用两阶段的方式来解决这个问题。第一阶段先提交一个joint的配置,然后第二阶段再提交new的配置。
etcd/raft代码流程分析
这里简单分析下 https://github.com/coreos/etcd/tree/master/contrib/raftexample 核心的代码流程。

如上图,raftexample由上图中的几个核心的go routine组成。
http handler:http handler对外提供restful的接口,供外界调用,比如通过下面的命令PUT一个hello: world键值对:
1
curl -L http://127.0.0.1:12380/hello -XPUT -d world
核心代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16func (h *httpKVAPI) ServeHTTP(w http.ResponseWriter, r *http.Request) {
key := r.RequestURI
switch {
case r.Method == "PUT":
v, err := ioutil.ReadAll(r.Body)
if err != nil {
log.Printf("Failed to read on PUT (%v)\n", err)
http.Error(w, "Failed on PUT", http.StatusBadRequest)
return
}
h.store.Propose(key, string(v))
// Optimistic-- no waiting for ack from raft. Value is not yet
// committed so a subsequent GET on the key may return old value
w.WriteHeader(http.StatusNoContent)
其他Method类似,这里省咯。。。
}http handler协程通过proposeC向其他协程提交任务。
1
2
3
4
5
6
7func (s *kvstore) Propose(k string, v string) {
var buf bytes.Buffer
if err := gob.NewEncoder(&buf).Encode(kv{k, v}); err != nil {
log.Fatal(err)
}
s.proposeC <- buf.String()
}send proposals协程
send proposals协议比较简单,只是从proposeC中读取propose请求,然后通过调用node.Propose接口再发给n.propc,交给node节点来处理:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25// send proposals over raft
go func() {
var confChangeCount uint64 = 0
for rc.proposeC != nil && rc.confChangeC != nil {
select {
case prop, ok := <-rc.proposeC:
if !ok {
rc.proposeC = nil
} else {
// blocks until accepted by raft state machine
rc.node.Propose(context.TODO(), []byte(prop))
}
case cc, ok := <-rc.confChangeC:
if !ok {
rc.confChangeC = nil
} else {
confChangeCount += 1
cc.ID = confChangeCount
rc.node.ProposeConfChange(context.TODO(), cc)
}
}
}
// client closed channel; shutdown raft if not already
close(rc.stopc)
}()node run协程:
node run是一个非常核心的协程,它内部维护集群的状态信息,接收外部的请求,以及整个raft集群内部的通信,来更新内部的状态,并通过readyc向外界传递已经ready的消息。
sm udpates状态机更新协程
sm updates协程从node的readyc中读取已经ready的信息,并把他们提交到commitC里。readCommits协程
readCommits协程从commitC中读取已提交的commit,并应用到本地的kvstore中:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15func (s *kvstore) readCommits(commitC <-chan *string, errorC <-chan error) {
for data := range commitC {
var dataKv kv
dec := gob.NewDecoder(bytes.NewBufferString(*data))
if err := dec.Decode(&dataKv); err != nil {
log.Fatalf("raftexample: could not decode message (%v)", err)
}
s.mu.Lock()
s.kvStore[dataKv.Key] = dataKv.Val
s.mu.Unlock()
}
if err, ok := <-errorC; ok {
log.Fatal(err)
}
}
etcd/raft使用方法及注意事项
https://github.com/coreos/etcd/blob/master/raft/doc.go
这里有比较详细的使用方法以及把etcd/raft嵌入自己程序的文档及流程,
https://github.com/coreos/etcd/tree/master/contrib/raftexample
这里有个用etcd/raft实现key-value存储的例子。
ref
raft paper: https://raft.github.io/raft.pdf