Home

Etcd/Raft源码导读

最近希望学习一下Raft协议,所以看了看Etcd的Raft模块源码实现,在此做些记录,做个索引。

直接来到Etcd项目的raft的目录下面,里面的README文档写了如何使用raft库使用的一些用法,我们先从暴露的 Node API看起

// Node represents a node in a raft cluster.
type Node interface {
	Tick()
	Campaign(ctx context.Context) error
	Propose(ctx context.Context, data []byte) error
	ProposeConfChange(ctx context.Context, cc pb.ConfChange) error
	Step(ctx context.Context, msg pb.Message) error
	Ready() <-chan Ready
	Advance()
	ApplyConfChange(cc pb.ConfChange) *pb.ConfState
	TransferLeadership(ctx context.Context, lead, transferee uint64)
	ReadIndex(ctx context.Context, rctx []byte) error
	Status() Status
	ReportUnreachable(id uint64)
	ReportSnapshot(id uint64, status SnapshotStatus)
	Stop()
}

初看起来有点一头雾水,不过API如此设计是有原因的。我们知道Raft算法可以将节点描述为一个状态机。

我们知道状态机接收输入,然后根据自身状态和输入数据转变改变自身的状态,然后再输出数据。所以我们要寻找输入输出和状态,带着这些概念我们来看StartNode函数

// StartNode returns a new Node given configuration and a list of raft peers.
// It appends a ConfChangeAddNode entry for each given peer to the initial log.
func StartNode(c *Config, peers []Peer) Node {
	r := newRaft(c)
	// become the follower at term 1 and apply initial configuration
	// entries of term 1
	r.becomeFollower(1, None)
	for _, peer := range peers {
		cc := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: peer.ID, Context: peer.Context}
		d, err := cc.Marshal()
		if err != nil {
			panic("unexpected marshal error")
		}
		e := pb.Entry{Type: pb.EntryConfChange, Term: 1, Index: r.raftLog.lastIndex() + 1, Data: d}
		r.raftLog.append(e)
	}
	// Mark these initial entries as committed.
	r.raftLog.committed = r.raftLog.lastIndex()

	for _, peer := range peers {
		r.addNode(peer.ID)
	}

	n := newNode()
	n.logger = c.Logger
	go n.run(r)
	return &n
}
// node is the canonical implementation of the Node interface
type node struct {
	propc      chan msgWithResult
	recvc      chan pb.Message
	confc      chan pb.ConfChange
	confstatec chan pb.ConfState
	readyc     chan Ready
	advancec   chan struct{}
	tickc      chan struct{}
	done       chan struct{}
	stop       chan struct{}
	status     chan chan Status

	logger Logger
}

...
func (n *node) run(r *raft) {
	...

	for {
		....

		select {
		case pm := <-propc:
			m := pm.m
			m.From = r.id
			err := r.Step(m)
			if pm.result != nil {
				pm.result <- err
				close(pm.result)
			}
		case m := <-n.recvc:
			// filter out response message from unknown From.
			if pr := r.getProgress(m.From); pr != nil || !IsResponseMsg(m.Type) {
				r.Step(m)
			}
		case cc := <-n.confc:
			....
		case <-n.tickc:
			r.tick()
		case readyc <- rd:
			....
		case <-advancec:
			....
		case c := <-n.status:
			c <- getStatus(r)
		case <-n.stop:
			close(n.done)
			return
		}
	}
}

StartNode在后台跑了一个node.run(), node包含了一堆的chan和外交打交道,node所实现的接口函数如Campaign,Propose,Tick都是通过chan发送请求到 run()方法的for循环中处理的,这样保证了操作的原子性。

纵观run()循环的代码,发现各种消息最终都落到的调用raft对象的Step方法上面了。其实这个raft对象就是我们要找的状态机,Step方法就是状态机的状态转换函数,输入数据的结构就是在raft/raftpb下面由protobuf定义的 Message结构。

func (r *raft) Step(m pb.Message) error {
		switch {
		case m.Term == 0:
		    // local message
		case m.Term > r.Term:
			.....
		case m.Term < r.Term:
			....
		}
	   
		switch m.Type {
		case pb.MsgHup:
			if r.state != StateLeader {
				....
			}
		case pb.MsgVote, pb.MsgPreVote:
	   
			....
		return nil
}

不出意料,raftStep根据输入的Message和内部状态如自身角色/Term/集群状态来做下一步的状态转移。比如调用 becomeFollower, becomeLeader,becomeCandidate方法来切换角色。

raft状态机的输出是通过调用send方法,将需要发送出去的消息装入邮箱r.msgs列表里面暂存,等待被人拿走。

// send persists state to stable storage and then sends to its mailbox.
func (r *raft) send(m pb.Message) {
	m.From = r.id
	...
	r.msgs = append(r.msgs, m)
}

最终来取走邮箱里面消息的还是 node

func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready {
	rd := Ready{
		Entries:          r.raftLog.unstableEntries(),
		CommittedEntries: r.raftLog.nextEnts(),
		Messages:         r.msgs,
	}
	if softSt := r.softState(); !softSt.equal(prevSoftSt) {
		rd.SoftState = softSt
	}
	if hardSt := r.hardState(); !isHardStateEqual(hardSt, prevHardSt) {
		rd.HardState = hardSt
	}
	if r.raftLog.unstable.snapshot != nil {
		rd.Snapshot = *r.raftLog.unstable.snapshot
	}
	if len(r.readStates) != 0 {
		rd.ReadStates = r.readStates
	}
	rd.MustSync = MustSync(r.hardState(), prevHardSt, len(rd.Entries))
	return rd
}


func (n *node) run(r *raft) {
	var propc chan msgWithResult
	var readyc chan Ready
	var advancec chan struct{}
	var rd Ready

	lead := None
	prevSoftSt := r.softState()
	prevHardSt := emptyState

	for {
		if advancec != nil {
			// dvancec != nil 表示Ready已被发送,等待确认。
			readyc = nil 
		} else {
			rd = newReady(r, prevSoftSt, prevHardSt) // 取出msgs放入Ready中
			if rd.containsUpdates() {
				readyc = n.readyc
			} else {
				readyc = nil
			}
		}
		....
	case readyc <- rd: // 将Ready通过chanel发送出去
			if rd.SoftState != nil {
				prevSoftSt = rd.SoftState
			}
			if len(rd.Entries) > 0 {
				prevLastUnstablei = rd.Entries[len(rd.Entries)-1].Index
				prevLastUnstablet = rd.Entries[len(rd.Entries)-1].Term
				havePrevLastUnstablei = true
			}
			if !IsEmptyHardState(rd.HardState) {
				prevHardSt = rd.HardState
			}
			if !IsEmptySnap(rd.Snapshot) {
				prevSnapi = rd.Snapshot.Metadata.Index
			}
			if index := rd.appliedCursor(); index != 0 {
				applyingToI = index
			}

			r.msgs = nil // 已将消息发出,清空邮箱
			r.readStates = nil
			r.reduceUncommittedSize(rd.CommittedEntries)
			advancec = n.advancec // 设置 advancec chanel,等待消息确认发送。
		case <-advancec:
			if applyingToI != 0 {
				r.raftLog.appliedTo(applyingToI)
				applyingToI = 0
			}
			if havePrevLastUnstablei {
				r.raftLog.stableTo(prevLastUnstablei, prevLastUnstablet)
				havePrevLastUnstablei = false
			}
			r.raftLog.stableSnapTo(prevSnapi)
			advancec = nil
			
		...


应用从node.Ready()取走消息后,需要自己负责通过网络,将消息发送给集群其他节点,从其他节点接受到的消息,通过Step方法驱动node

其实raft模块的API使用模式可以参照rafe/rafttest目录下的测试,写的很清晰。在该目录下直接运行go test可以测试整个过程。另外这个模块使用mapchanel实现了个测试用的网络,很巧妙,值得学习借鉴。

func (n *node) start() {
	n.stopc = make(chan struct{})
	ticker := time.Tick(5 * time.Millisecond)

	go func() {
		for {
			select {
			case <-ticker:
				n.Tick() // 定期tick
			case rd := <-n.Ready(): // 接受输出
				if !raft.IsEmptyHardState(rd.HardState) {
					n.mu.Lock()
					n.state = rd.HardState
					n.mu.Unlock()
					n.storage.SetHardState(n.state)
				}
			
				n.storage.Append(rd.Entries) // 将Entries写入存储
				time.Sleep(time.Millisecond)
				// TODO: make send async, more like real world...
				for _, m := range rd.Messages {
					n.iface.send(m) // 将消息发送给其他节点
				}
				n.Advance()  // 调用 Advance() 确认消息已发出
			case m := <-n.iface.recv():
				go n.Step(context.TODO(), m) // 处理收到的消息
			case <-n.stopc:
				n.Stop()
				log.Printf("raft.%d: stop", n.id)
				n.Node = nil
				close(n.stopc)
				return
			case p := <-n.pausec:
				recvms := make([]raftpb.Message, 0)
				for p {
					select {
					case m := <-n.iface.recv():
						recvms = append(recvms, m)
					case p = <-n.pausec:
					}
				}
				// step all pending messages
				for _, m := range recvms {
					n.Step(context.TODO(), m)
				}
			}
		}
	}()
}

最后再来说一下Storage接口。

// Storage is an interface that may be implemented by the application
// to retrieve log entries from storage.
//
// If any Storage method returns an error, the raft instance will
// become inoperable and refuse to participate in elections; the
// application is responsible for cleanup and recovery in this case.
type Storage interface {
	// InitialState returns the saved HardState and ConfState information.
	InitialState() (pb.HardState, pb.ConfState, error)
	// Entries returns a slice of log entries in the range [lo,hi).
	// MaxSize limits the total size of the log entries returned, but
	// Entries returns at least one entry if any.
	Entries(lo, hi, maxSize uint64) ([]pb.Entry, error)
	// Term returns the term of entry i, which must be in the range
	// [FirstIndex()-1, LastIndex()]. The term of the entry before
	// FirstIndex is retained for matching purposes even though the
	// rest of that entry may not be available.
	Term(i uint64) (uint64, error)
	// LastIndex returns the index of the last entry in the log.
	LastIndex() (uint64, error)
	// FirstIndex returns the index of the first log entry that is
	// possibly available via Entries (older entries have been incorporated
	// into the latest Snapshot; if storage only contains the dummy entry the
	// first log entry is not available).
	FirstIndex() (uint64, error)
	// Snapshot returns the most recent snapshot.
	// If snapshot is temporarily unavailable, it should return ErrSnapshotTemporarilyUnavailable,
	// so raft state machine could know that Storage needs some time to prepare
	// snapshot and call Snapshot later.
	Snapshot() (pb.Snapshot, error)
}

raft维护一个entry的Log,保证集群Log的安全性,而Storage就是用来保存Log的地方,不同的应用可以用不同的方法来存储日志,但要实现Storage接口给raft状态机查询当前的Term和Index等的一些信息,具体的使用的地方可以阅读raftLog的相关代码。

今天带着大家游览了一下raft模块大体的结构,就像有了个地图,后面想研究哪一方面可以快速找到相应的代码。 其实关于Etcd/Raft其实能讲的还有很多,比如raft论文没有的learner和prevote,线性一致性如何实现等,后面有机会再来说说。