最近希望学习一下Raft协议,所以看了看Etcd的Raft模块源码实现,在此做些记录,做个索引。
直接来到Etcd项目的raft的目录下面,里面的README文档写了如何使用raft库使用的一些用法,我们先从暴露的 Node
API看起
// Node represents a node in a raft cluster.
type Node interface {
Tick()
Campaign(ctx context.Context) error
Propose(ctx context.Context, data []byte) error
ProposeConfChange(ctx context.Context, cc pb.ConfChange) error
Step(ctx context.Context, msg pb.Message) error
Ready() <-chan Ready
Advance()
ApplyConfChange(cc pb.ConfChange) *pb.ConfState
TransferLeadership(ctx context.Context, lead, transferee uint64)
ReadIndex(ctx context.Context, rctx []byte) error
Status() Status
ReportUnreachable(id uint64)
ReportSnapshot(id uint64, status SnapshotStatus)
Stop()
}
初看起来有点一头雾水,不过API如此设计是有原因的。我们知道Raft算法可以将节点描述为一个状态机。
我们知道状态机接收输入,然后根据自身状态和输入数据转变改变自身的状态,然后再输出数据。所以我们要寻找输入输出和状态,带着这些概念我们来看StartNode
函数
// StartNode returns a new Node given configuration and a list of raft peers.
// It appends a ConfChangeAddNode entry for each given peer to the initial log.
func StartNode(c *Config, peers []Peer) Node {
r := newRaft(c)
// become the follower at term 1 and apply initial configuration
// entries of term 1
r.becomeFollower(1, None)
for _, peer := range peers {
cc := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: peer.ID, Context: peer.Context}
d, err := cc.Marshal()
if err != nil {
panic("unexpected marshal error")
}
e := pb.Entry{Type: pb.EntryConfChange, Term: 1, Index: r.raftLog.lastIndex() + 1, Data: d}
r.raftLog.append(e)
}
// Mark these initial entries as committed.
r.raftLog.committed = r.raftLog.lastIndex()
for _, peer := range peers {
r.addNode(peer.ID)
}
n := newNode()
n.logger = c.Logger
go n.run(r)
return &n
}
// node is the canonical implementation of the Node interface
type node struct {
propc chan msgWithResult
recvc chan pb.Message
confc chan pb.ConfChange
confstatec chan pb.ConfState
readyc chan Ready
advancec chan struct{}
tickc chan struct{}
done chan struct{}
stop chan struct{}
status chan chan Status
logger Logger
}
...
func (n *node) run(r *raft) {
...
for {
....
select {
case pm := <-propc:
m := pm.m
m.From = r.id
err := r.Step(m)
if pm.result != nil {
pm.result <- err
close(pm.result)
}
case m := <-n.recvc:
// filter out response message from unknown From.
if pr := r.getProgress(m.From); pr != nil || !IsResponseMsg(m.Type) {
r.Step(m)
}
case cc := <-n.confc:
....
case <-n.tickc:
r.tick()
case readyc <- rd:
....
case <-advancec:
....
case c := <-n.status:
c <- getStatus(r)
case <-n.stop:
close(n.done)
return
}
}
}
StartNode
在后台跑了一个node.run()
, node
包含了一堆的chan
和外交打交道,node
所实现的接口函数如Campaign
,Propose
,Tick
都是通过chan
发送请求到 run()
方法的for循环中处理的,这样保证了操作的原子性。
纵观run()
循环的代码,发现各种消息最终都落到的调用raft
对象的Step
方法上面了。其实这个raft
对象就是我们要找的状态机,Step
方法就是状态机的状态转换函数,输入数据的结构就是在raft/raftpb
下面由protobuf定义的 Message
结构。
func (r *raft) Step(m pb.Message) error {
switch {
case m.Term == 0:
// local message
case m.Term > r.Term:
.....
case m.Term < r.Term:
....
}
switch m.Type {
case pb.MsgHup:
if r.state != StateLeader {
....
}
case pb.MsgVote, pb.MsgPreVote:
....
return nil
}
不出意料,raft
的Step
根据输入的Message
和内部状态如自身角色/Term/集群状态来做下一步的状态转移。比如调用 becomeFollower
, becomeLeader
,becomeCandidate
方法来切换角色。
raft
状态机的输出是通过调用send
方法,将需要发送出去的消息装入邮箱r.msgs
列表里面暂存,等待被人拿走。
// send persists state to stable storage and then sends to its mailbox.
func (r *raft) send(m pb.Message) {
m.From = r.id
...
r.msgs = append(r.msgs, m)
}
最终来取走邮箱里面消息的还是 node
。
func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready {
rd := Ready{
Entries: r.raftLog.unstableEntries(),
CommittedEntries: r.raftLog.nextEnts(),
Messages: r.msgs,
}
if softSt := r.softState(); !softSt.equal(prevSoftSt) {
rd.SoftState = softSt
}
if hardSt := r.hardState(); !isHardStateEqual(hardSt, prevHardSt) {
rd.HardState = hardSt
}
if r.raftLog.unstable.snapshot != nil {
rd.Snapshot = *r.raftLog.unstable.snapshot
}
if len(r.readStates) != 0 {
rd.ReadStates = r.readStates
}
rd.MustSync = MustSync(r.hardState(), prevHardSt, len(rd.Entries))
return rd
}
func (n *node) run(r *raft) {
var propc chan msgWithResult
var readyc chan Ready
var advancec chan struct{}
var rd Ready
lead := None
prevSoftSt := r.softState()
prevHardSt := emptyState
for {
if advancec != nil {
// dvancec != nil 表示Ready已被发送,等待确认。
readyc = nil
} else {
rd = newReady(r, prevSoftSt, prevHardSt) // 取出msgs放入Ready中
if rd.containsUpdates() {
readyc = n.readyc
} else {
readyc = nil
}
}
....
case readyc <- rd: // 将Ready通过chanel发送出去
if rd.SoftState != nil {
prevSoftSt = rd.SoftState
}
if len(rd.Entries) > 0 {
prevLastUnstablei = rd.Entries[len(rd.Entries)-1].Index
prevLastUnstablet = rd.Entries[len(rd.Entries)-1].Term
havePrevLastUnstablei = true
}
if !IsEmptyHardState(rd.HardState) {
prevHardSt = rd.HardState
}
if !IsEmptySnap(rd.Snapshot) {
prevSnapi = rd.Snapshot.Metadata.Index
}
if index := rd.appliedCursor(); index != 0 {
applyingToI = index
}
r.msgs = nil // 已将消息发出,清空邮箱
r.readStates = nil
r.reduceUncommittedSize(rd.CommittedEntries)
advancec = n.advancec // 设置 advancec chanel,等待消息确认发送。
case <-advancec:
if applyingToI != 0 {
r.raftLog.appliedTo(applyingToI)
applyingToI = 0
}
if havePrevLastUnstablei {
r.raftLog.stableTo(prevLastUnstablei, prevLastUnstablet)
havePrevLastUnstablei = false
}
r.raftLog.stableSnapTo(prevSnapi)
advancec = nil
...
应用从node.Ready()
取走消息后,需要自己负责通过网络,将消息发送给集群其他节点,从其他节点接受到的消息,通过Step
方法驱动node
。
其实raft模块的API使用模式可以参照rafe/rafttest
目录下的测试,写的很清晰。在该目录下直接运行go test
可以测试整个过程。另外这个模块使用map
和chanel
实现了个测试用的网络,很巧妙,值得学习借鉴。
func (n *node) start() {
n.stopc = make(chan struct{})
ticker := time.Tick(5 * time.Millisecond)
go func() {
for {
select {
case <-ticker:
n.Tick() // 定期tick
case rd := <-n.Ready(): // 接受输出
if !raft.IsEmptyHardState(rd.HardState) {
n.mu.Lock()
n.state = rd.HardState
n.mu.Unlock()
n.storage.SetHardState(n.state)
}
n.storage.Append(rd.Entries) // 将Entries写入存储
time.Sleep(time.Millisecond)
// TODO: make send async, more like real world...
for _, m := range rd.Messages {
n.iface.send(m) // 将消息发送给其他节点
}
n.Advance() // 调用 Advance() 确认消息已发出
case m := <-n.iface.recv():
go n.Step(context.TODO(), m) // 处理收到的消息
case <-n.stopc:
n.Stop()
log.Printf("raft.%d: stop", n.id)
n.Node = nil
close(n.stopc)
return
case p := <-n.pausec:
recvms := make([]raftpb.Message, 0)
for p {
select {
case m := <-n.iface.recv():
recvms = append(recvms, m)
case p = <-n.pausec:
}
}
// step all pending messages
for _, m := range recvms {
n.Step(context.TODO(), m)
}
}
}
}()
}
最后再来说一下Storage
接口。
// Storage is an interface that may be implemented by the application
// to retrieve log entries from storage.
//
// If any Storage method returns an error, the raft instance will
// become inoperable and refuse to participate in elections; the
// application is responsible for cleanup and recovery in this case.
type Storage interface {
// InitialState returns the saved HardState and ConfState information.
InitialState() (pb.HardState, pb.ConfState, error)
// Entries returns a slice of log entries in the range [lo,hi).
// MaxSize limits the total size of the log entries returned, but
// Entries returns at least one entry if any.
Entries(lo, hi, maxSize uint64) ([]pb.Entry, error)
// Term returns the term of entry i, which must be in the range
// [FirstIndex()-1, LastIndex()]. The term of the entry before
// FirstIndex is retained for matching purposes even though the
// rest of that entry may not be available.
Term(i uint64) (uint64, error)
// LastIndex returns the index of the last entry in the log.
LastIndex() (uint64, error)
// FirstIndex returns the index of the first log entry that is
// possibly available via Entries (older entries have been incorporated
// into the latest Snapshot; if storage only contains the dummy entry the
// first log entry is not available).
FirstIndex() (uint64, error)
// Snapshot returns the most recent snapshot.
// If snapshot is temporarily unavailable, it should return ErrSnapshotTemporarilyUnavailable,
// so raft state machine could know that Storage needs some time to prepare
// snapshot and call Snapshot later.
Snapshot() (pb.Snapshot, error)
}
raft维护一个entry的Log,保证集群Log的安全性,而Storage
就是用来保存Log的地方,不同的应用可以用不同的方法来存储日志,但要实现Storage
接口给raft
状态机查询当前的Term和Index等的一些信息,具体的使用的地方可以阅读raftLog
的相关代码。
今天带着大家游览了一下raft
模块大体的结构,就像有了个地图,后面想研究哪一方面可以快速找到相应的代码。 其实关于Etcd/Raft其实能讲的还有很多,比如raft论文没有的learner和prevote,线性一致性如何实现等,后面有机会再来说说。