Raft算法之日志复制

Raft算法之日志复制

一、日志复制大致流程

在Leader选举过程中,集群最终会选举出一个Leader节点,而集群中剩余的其他节点将会成为Follower节点。Leader节点除了向Follower节点发送心跳消息,还会处理客户端的请求,并将客户端的更新操作以消息(Append Entries消息)的形式发送到集群中所有的Follower节点。当Follower节点记录收到的这些消息之后,会向Leader节点返回相应的响应消息。当Leader节点在收到半数以上的Follower节点的响应消息之后,会对客户端的请求进行应答。最后,Leader会提交客户端的更新操作,该过程会发送Append Entries消息到Follower节点,通知Follower节点该操作已经提交,同时Leader节点和Follower节点也就可以将该操作应用到自己的状态机中。

参考资料:https://blog.csdn.net/qq_43949280/article/details/122669244

二、ETCD中raft模块的日志复制

2.1 消息的发送

前文中提到Leader节点会处理客户端的更新操作,这就是阅读代码的入口。

ETCD代码中除了有raft模块,还有一个raftexample模块,是对raft模块的使用示例,该模块位置如下:
在这里插入图片描述

看完这个模块的文件,觉得处理数据存储的入口应该在kvstore.go文件中。在这个文件中有一个newKVStore(...)方法,如果要使用使用kvstore结构体的话,肯定会调用newKVStore(...)方法。

我们来看看这个方法的调用点:
在这里插入图片描述

可以确定调用点只在main.go文件中,如下所示:

// contrib/raftexample/main.go文件
func main() {
	cluster := flag.String("cluster", "http://127.0.0.1:9021", "comma separated cluster peers")
	id := flag.Int("id", 1, "node ID")
	kvport := flag.Int("port", 9121, "key-value server port")
	join := flag.Bool("join", false, "join an existing cluster")
	flag.Parse()

	proposeC := make(chan string)
	defer close(proposeC)
	confChangeC := make(chan raftpb.ConfChange)
	defer close(confChangeC)

	// raft provides a commit stream for the proposals from the http api
	var kvs *kvstore // 定义kvstore
	getSnapshot := func() ([]byte, error) { return kvs.getSnapshot() }
	commitC, errorC, snapshotterReady := newRaftNode(*id, strings.Split(*cluster, ","), *join, getSnapshot, proposeC, confChangeC)

	kvs = newKVStore(<-snapshotterReady, proposeC, commitC, errorC) // ref-1 创建kvstore

	// the key-value http handler will propose updates to raft
	serveHttpKVAPI(kvs, *kvport, confChangeC, errorC) // ref-2 使用kvstore
}

我们接着看ref-2处使用kvstore的函数serveHttpKVAPI(...)的细节,如下所示:

// serveHttpKVAPI starts a key-value server with a GET/PUT API and listens.
func serveHttpKVAPI(kv *kvstore, port int, confChangeC chan<- raftpb.ConfChange, errorC <-chan error) {
	srv := http.Server{ // 创建http server
		Addr: ":" + strconv.Itoa(port),
		Handler: &httpKVAPI{
			store:       kv, // 把前面提到的kvstore 赋值给httpKVAPI的成员字段store
			confChangeC: confChangeC,
		},
	}
	go func() { // 开启http server
		if err := srv.ListenAndServe(); err != nil {
			log.Fatal(err)
		}
	}()

	// exit when raft goes down
	if err, ok := <-errorC; ok {
		log.Fatal(err)
	}
}

现在的关键是httpKVAPI类型,它基于由raft支撑的key-value存储来处理http请求,下面是该类型细节:

// contrib/raftexample/httpapi.go文件
// Handler for a http based key-value store backed by raft
type httpKVAPI struct {
	store       *kvstore
	confChangeC chan<- raftpb.ConfChange
}

func (h *httpKVAPI) ServeHTTP(w http.ResponseWriter, r *http.Request) {
	key := r.RequestURI
	defer r.Body.Close()
	switch {
	case r.Method == "PUT": // ref-3 设置键值对时,是用的put方法,在该模块的reademe文件有提到。
		v, err := ioutil.ReadAll(r.Body) // 读取客户端传递过来的body
		if err != nil {
			log.Printf("Failed to read on PUT (%v)\n", err)
			http.Error(w, "Failed on PUT", http.StatusBadRequest)
			return
		}

		h.store.Propose(key, string(v)) // ref-4 kvstore处理存储键值对

		// Optimistic-- no waiting for ack from raft. Value is not yet
		// committed so a subsequent GET on the key may return old value
		w.WriteHeader(http.StatusNoContent)
	case r.Method == "GET":
		if v, ok := h.store.Lookup(key); ok {
			w.Write([]byte(v))
		} else {
			http.Error(w, "Failed to GET", http.StatusNotFound)
		}
	case r.Method == "POST":
		url, err := ioutil.ReadAll(r.Body)
		if err != nil {
			log.Printf("Failed to read on POST (%v)\n", err)
			http.Error(w, "Failed on POST", http.StatusBadRequest)
			return
		}

		nodeId, err := strconv.ParseUint(key[1:], 0, 64)
		if err != nil {
			log.Printf("Failed to convert ID for conf change (%v)\n", err)
			http.Error(w, "Failed on POST", http.StatusBadRequest)
			return
		}

		cc := raftpb.ConfChange{
			Type:    raftpb.ConfChangeAddNode,
			NodeID:  nodeId,
			Context: url,
		}
		h.confChangeC <- cc

		// As above, optimistic that raft will apply the conf change
		w.WriteHeader(http.StatusNoContent)
	case r.Method == "DELETE":
		nodeId, err := strconv.ParseUint(key[1:], 0, 64)
		if err != nil {
			log.Printf("Failed to convert ID for conf change (%v)\n", err)
			http.Error(w, "Failed on DELETE", http.StatusBadRequest)
			return
		}

		cc := raftpb.ConfChange{
			Type:   raftpb.ConfChangeRemoveNode,
			NodeID: nodeId,
		}
		h.confChangeC <- cc

		// As above, optimistic that raft will apply the conf change
		w.WriteHeader(http.StatusNoContent)
	default:
		w.Header().Set("Allow", "PUT")
		w.Header().Add("Allow", "GET")
		w.Header().Add("Allow", "POST")
		w.Header().Add("Allow", "DELETE")
		http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
	}
}

终于在ref-4处看到了kvstore处理存储键值对的入口,就是Propose(...)方法。下面是该方法的细节:

// contrib/raftexample/kvstore.go文件
func (s *kvstore) Propose(k string, v string) {
	var buf bytes.Buffer
    // 对key-value数据进行编码,存储到buf中
	if err := gob.NewEncoder(&buf).Encode(kv{k, v}); err != nil {
		log.Fatal(err)
	}
	s.proposeC <- buf.String() // 将buf中的数据传递过channel
}

在代码中可以看到把数据传递给了proposeC这个channel,现在的关键就是找出来哪儿在从这个channel读取数据。

首先找到proposeC字段所在的类型定义,然后查看proposeC字段的使用点,可以看到它是在创建kvstore类型变量的时候传递进来的一个channel。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-JEILdB8q-1689314736004)(images/image-20230707171843285.png)]

接着跟踪,可以发现这个channel是newKVStore(...)函数的一个入参,这个函数我们在一开始的时候分析过。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-5GV6iAJv-1689314736005)(images/image-20230707172103285.png)]

我们重新回到ref-1处的代码,看看newKVStore调用是怎么传递这个关键channel的:

// contrib/raftexample/main.go文件
func main() {
	cluster := flag.String("cluster", "http://127.0.0.1:9021", "comma separated cluster peers")
	id := flag.Int("id", 1, "node ID")
	kvport := flag.Int("port", 9121, "key-value server port")
	join := flag.Bool("join", false, "join an existing cluster")
	flag.Parse()

	proposeC := make(chan string) // 创建proposeC
	defer close(proposeC)
	confChangeC := make(chan raftpb.ConfChange)
	defer close(confChangeC)

	// raft provides a commit stream for the proposals from the http api
	var kvs *kvstore // 定义kvstore
	getSnapshot := func() ([]byte, error) { return kvs.getSnapshot() }
	commitC, errorC, snapshotterReady := newRaftNode(*id, strings.Split(*cluster, ","), *join, getSnapshot, proposeC, confChangeC) // ref-5 传递proposeC给raft的node

	kvs = newKVStore(<-snapshotterReady, proposeC, commitC, errorC) // ref-1 创建kvstore

	// the key-value http handler will propose updates to raft
	serveHttpKVAPI(kvs, *kvport, confChangeC, errorC) // ref-2 使用kvstore
}

现在可以断定proposeC这个channel的数据读取就在ref-5处代码调用的newRaftNode(...)里面,代码如下所示:

// contrib/raftexample/raft.go 文件
// newRaftNode initiates a raft instance and returns a committed log entry
// channel and error channel. Proposals for log updates are sent over the
// provided the proposal channel. All log entries are replayed over the
// commit channel, followed by a nil message (to indicate the channel is
// current), then new log entries. To shutdown, close proposeC and read errorC.
func newRaftNode(id int, peers []string, join bool, getSnapshot func() ([]byte, error), proposeC <-chan string,
	confChangeC <-chan raftpb.ConfChange) (<-chan *commit, <-chan error, <-chan *snap.Snapshotter) {

	commitC := make(chan *commit)
	errorC := make(chan error)

	rc := &raftNode{
		proposeC:    proposeC, // ref-6  proposeC赋值给字段proposeC
		confChangeC: confChangeC,
		commitC:     commitC,
		errorC:      errorC,
		id:          id,
		peers:       peers,
		join:        join,
		waldir:      fmt.Sprintf("raftexample-%d", id),
		snapdir:     fmt.Sprintf("raftexample-%d-snap", id),
		getSnapshot: getSnapshot,
		snapCount:   defaultSnapshotCount,
		stopc:       make(chan struct{}),
		httpstopc:   make(chan struct{}),
		httpdonec:   make(chan struct{}),

		logger: zap.NewExample(),

		snapshotterReady: make(chan *snap.Snapshotter, 1),
		// rest of structure populated after WAL replay
	}
	go rc.startRaft()
	return commitC, errorC, rc.snapshotterReady
}

我们接着跟raftNodeproposeC调用点,从下图中可以看到读取proposeC数据点只有一个。
在这里插入图片描述

我们接着看读取数据的具体代码:

// contrib/raftexample/raft.go文件
func (rc *raftNode) serveChannels() {
	snap, err := rc.raftStorage.Snapshot()
	if err != nil {
		panic(err)
	}
	rc.confState = snap.Metadata.ConfState
	rc.snapshotIndex = snap.Metadata.Index
	rc.appliedIndex = snap.Metadata.Index

	defer rc.wal.Close()

	ticker := time.NewTicker(100 * time.Millisecond)
	defer ticker.Stop()

	// send proposals over raft
	go func() {
		confChangeCount := uint64(0)

		for rc.proposeC != nil && rc.confChangeC != nil {
			select {
			case prop, ok := <-rc.proposeC: // 读取键值对数据
				if !ok {
					rc.proposeC = nil
				} else {
					// blocks until accepted by raft state machine
					rc.node.Propose(context.TODO(), []byte(prop)) // ref-7 处理客户端写入的键值对
				}

			case cc, ok := <-rc.confChangeC:
				if !ok {
					rc.confChangeC = nil
				} else {
					confChangeCount++
					cc.ID = confChangeCount
					rc.node.ProposeConfChange(context.TODO(), cc)
				}
			}
		}
		// client closed channel; shutdown raft if not already
		close(rc.stopc)
	}()

	...... // 省略
}

我们接着看ref-7raftNode是怎么处理键值对写入的,由于Node是一个接口,我们需要看看这个Propose(...)方法的实现:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-TQpZJlzu-1689314736006)(images/image-20230707173531638.png)]

可以看到在raft模块中只有一个实现,在node.go文件中,如下所示:

// raft/node.go文件
func (n *node) Propose(ctx context.Context, data []byte) error {
	return n.stepWait(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}})
}

可以看到,把数据放入到了pb.Entry中,并且将pb.Message的消息类型设置为了pb.MsgProp。我们接着看stepWait(...)方法:

// raft/node.go 文件
func (n *node) stepWait(ctx context.Context, m pb.Message) error {
	return n.stepWithWaitOption(ctx, m, true)
}

// 进入到使用消息的状态机中。
// Step advances the state machine using msgs. The ctx.Err() will be returned,
// if any.
func (n *node) stepWithWaitOption(ctx context.Context, m pb.Message, wait bool) error {
	if m.Type != pb.MsgProp { // 如果消息类型不是pb.MsgProp
		select {
		case n.recvc <- m:
			return nil
		case <-ctx.Done():
			return ctx.Err()
		case <-n.done:
			return ErrStopped
		}
	}
	ch := n.propc // 赋值channel
	pm := msgWithResult{m: m} // 依据消息构建msgWithResult类型变量
	if wait { // 上游传递是true
		pm.result = make(chan error, 1) // 创建接收处理结果的channel
	}
	select {
	case ch <- pm: // ref-7  将构建的消息发送出去
		if !wait {
			return nil
		}
	case <-ctx.Done():
		return ctx.Err()
	case <-n.done:
		return ErrStopped
	}
	select {
	case err := <-pm.result: // ref-8  等待处理结果
		if err != nil {
			return err
		}
	case <-ctx.Done():
		return ctx.Err()
	case <-n.done:
		return ErrStopped
	}
	return nil
}

ref-7处是在将消息发送出去,那么现在的关键就是消息在哪儿读取的呢?

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-HhXH0t68-1689314736006)(images/image-20230707174633031.png)]

依据调用点信息,我们找到如下使用propc的地方:

// raft/node.go文件
func (n *node) run() {
	var propc chan msgWithResult
	var readyc chan Ready
	var advancec chan struct{}
	var rd Ready

	r := n.rn.raft

	lead := None

	for {
		if advancec != nil {
			readyc = nil
		} else if n.rn.HasReady() {
			// Populate a Ready. Note that this Ready is not guaranteed to
			// actually be handled. We will arm readyc, but there's no guarantee
			// that we will actually send on it. It's possible that we will
			// service another channel instead, loop around, and then populate
			// the Ready again. We could instead force the previous Ready to be
			// handled first, but it's generally good to emit larger Readys plus
			// it simplifies testing (by emitting less frequently and more
			// predictably).
			rd = n.rn.readyWithoutAccept()
			readyc = n.readyc
		}

		if lead != r.lead {
			if r.hasLeader() {
				if lead == None {
					r.logger.Infof("raft.node: %x elected leader %x at term %d", r.id, r.lead, r.Term)
				} else {
					r.logger.Infof("raft.node: %x changed leader from %x to %x at term %d", r.id, lead, r.lead, r.Term)
				}
				propc = n.propc // ref-9 将节点的propc赋值给变量propc
			} else {
				r.logger.Infof("raft.node: %x lost leader %x at term %d", r.id, lead, r.Term)
				propc = nil
			}
			lead = r.lead
		}

		select {
		// TODO: maybe buffer the config propose if there exists one (the way
		// described in raft dissertation)
		// Currently it is dropped in Step silently.
		case pm := <-propc:  // 读取propc中的数据
			m := pm.m // 将pb.Message取出来
			m.From = r.id
			err := r.Step(m) // ref-9
			if pm.result != nil {
				pm.result <- err
				close(pm.result)
			}
		...... // 省略其他case
		case <-advancec:
			n.rn.Advance(rd)
			rd = Ready{}
			advancec = nil
		case c := <-n.status:
			c <- getStatus(r)
		case <-n.stop:
			close(n.done)
			return
		}
	}

该run()方法在前一篇博文中分析过,在此就不在赘述。我们接着看ref-9处是如何在step(...)方法中处理消息的:

// raft/raft.go文件
func (r *raft) Step(m pb.Message) error {
	// Handle the message term, which may result in our stepping down to a follower.
	switch { // 处理消息的任期数据
	case m.Term == 0: // 由于前面的数据都没有设置term,所以会走这个case
		// local message
	case m.Term > r.Term:
		...... // 省略

	case m.Term < r.Term:
		
		...... // 省略
	}

	switch m.Type {
	case pb.MsgHup:
		...... // 省略

	case pb.MsgVote, pb.MsgPreVote:
		...... // 省略

	default:
		err := r.step(r, m) // ref-10 处理消息
		if err != nil {
			return err
		}
	}
	return nil
}

我们继续看ref-10处是如何处理消息的,下面是该函数的访问点:
在这里插入图片描述

我们知道当前分析的是Leader节点,所以可以直接锁定唯一调用点就是将stepLeader赋值给r.step,代码如下所示:

// raft/raft.go文件
func (r *raft) becomeLeader() {
	// TODO(xiangli) remove the panic when the raft implementation is stable
	if r.state == StateFollower {
		panic("invalid transition [follower -> leader]")
	}
	r.step = stepLeader // ref-11 将stepLeader赋值给step字段
	r.reset(r.Term)
	r.tick = r.tickHeartbeat
	r.lead = r.id
	r.state = StateLeader
	...... // 省略
}

现在的关键就是stepLeader函数了。becomeLeader在上一篇博客中也提到过。下面我们接着看stepLeader函数细节:

// raft/raft.go文件
func stepLeader(r *raft, m pb.Message) error {
	// These message types do not require any progress for m.From.
	switch m.Type {
	case pb.MsgBeat:
		...... // 省略
		return nil
	case pb.MsgCheckQuorum:
		...... // 省略
		return nil
	case pb.MsgProp: // 依据前文阅读代码,消息类型是MsgProp,所以会走这个分支
		if len(m.Entries) == 0 {
			r.logger.Panicf("%x stepped empty MsgProp", r.id)
		}
		if r.prs.Progress[r.id] == nil {
			// If we are not currently a member of the range (i.e. this node
			// was removed from the configuration while serving as leader),
			// drop any new proposals.
			return ErrProposalDropped
		}
		if r.leadTransferee != None {
			r.logger.Debugf("%x [term %d] transfer leadership to %x is in progress; dropping proposal", r.id, r.Term, r.leadTransferee)
			return ErrProposalDropped
		}

		for i := range m.Entries {
			e := &m.Entries[i]
			var cc pb.ConfChangeI
			if e.Type == pb.EntryConfChange { // 如果是配置改变
				var ccc pb.ConfChange
				if err := ccc.Unmarshal(e.Data); err != nil {
					panic(err)
				}
				cc = ccc
			} else if e.Type == pb.EntryConfChangeV2 { // 如果是配置改变的V2版本
				var ccc pb.ConfChangeV2
				if err := ccc.Unmarshal(e.Data); err != nil {
					panic(err)
				}
				cc = ccc
			}
			if cc != nil {
				alreadyPending := r.pendingConfIndex > r.raftLog.applied
				alreadyJoint := len(r.prs.Config.Voters[1]) > 0
				wantsLeaveJoint := len(cc.AsV2().Changes) == 0

				var refused string
				if alreadyPending {
					refused = fmt.Sprintf("possible unapplied conf change at index %d (applied to %d)", r.pendingConfIndex, r.raftLog.applied)
				} else if alreadyJoint && !wantsLeaveJoint {
					refused = "must transition out of joint config first"
				} else if !alreadyJoint && wantsLeaveJoint {
					refused = "not in joint state; refusing empty conf change"
				}

				if refused != "" {
					r.logger.Infof("%x ignoring conf change %v at config %s: %s", r.id, cc, r.prs.Config, refused)
					m.Entries[i] = pb.Entry{Type: pb.EntryNormal}
				} else {
					r.pendingConfIndex = r.raftLog.lastIndex() + uint64(i) + 1
				}
			}
		}

		if !r.appendEntry(m.Entries...) { // ref-13 将entry数据追加到raftlog中
			return ErrProposalDropped
		}
		r.bcastAppend() // ref-12 将entry数据广播到其他节点上
		return nil
	case pb.MsgReadIndex:
		...... // 省略
		return nil
	}

	// All other message types require a progress for m.From (pr).
	pr := r.prs.Progress[m.From]
	if pr == nil {
		r.logger.Debugf("%x no progress available for %x", r.id, m.From)
		return nil
	}
	switch m.Type {
    ...... // 省略
    }

ref-12处的代码是我们的关注点,接着看看数据是怎么广播出去的:

// raft/raft.go文件
// bcastAppend sends RPC, with entries to all peers that are not up-to-date
// according to the progress recorded in r.prs.
func (r *raft) bcastAppend() {
    // r.prs字段记录着其他节点的信息。这个visit方法就是遍历其他所有节点,然后发送信息
	r.prs.Visit(func(id uint64, _ *tracker.Progress) {
		if id == r.id {
			return
		}
		r.sendAppend(id) // ref-14 发送数据给其他节点
	})
}

我们接着看看怎么发送数据给其他节点的:

// raft/raft.go 文件
// sendAppend sends an append RPC with new entries (if any) and the
// current commit index to the given peer.
func (r *raft) sendAppend(to uint64) {
	r.maybeSendAppend(to, true)
}
// maybeSendAppend sends an append RPC with new entries to the given peer,
// if necessary. Returns true if a message was sent. The sendIfEmpty
// argument controls whether messages with no entries will be sent
// ("empty" messages are useful to convey updated Commit indexes, but
// are undesirable when we're sending multiple messages in a batch).
func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
	pr := r.prs.Progress[to]
	if pr.IsPaused() {
		return false
	}
	m := pb.Message{}
	m.To = to
	// 从r.raftlog中获取任期和entry数据。这个地方就和前面往r.raftlog中存入日志呼应起来了。
	term, errt := r.raftLog.term(pr.Next - 1)
	ents, erre := r.raftLog.entries(pr.Next, r.maxMsgSize)
	if len(ents) == 0 && !sendIfEmpty {
		return false
	}

	if errt != nil || erre != nil { // send snapshot if we failed to get term or entries
		...... // 省略对错误情况的处理
	} else {
        // 组装要发送的消息
		m.Type = pb.MsgApp  // 注意这个消息类型是pb.MsgApp
		m.Index = pr.Next - 1
		m.LogTerm = term
		m.Entries = ents
		m.Commit = r.raftLog.committed
		if n := len(m.Entries); n != 0 {
			switch pr.State {
			// optimistically increase the next when in StateReplicate
			case tracker.StateReplicate:
				last := m.Entries[n-1].Index
				pr.OptimisticUpdate(last)
				pr.Inflights.Add(last)
			case tracker.StateProbe:
				pr.ProbeSent = true
			default:
				r.logger.Panicf("%x is sending append in unhandled state %s", r.id, pr.State)
			}
		}
	}
	r.send(m) // 发送数据
	return true
}

现在的关键点,在于r.send(m)是如何将数据发送出去的:

// raft/raft.go文件
// send schedules persisting state to a stable storage and AFTER that
// sending the message (as part of next Ready message processing).
func (r *raft) send(m pb.Message) {
	if m.From == None {
		m.From = r.id
	}
	if m.Type == pb.MsgVote || m.Type == pb.MsgVoteResp || m.Type == pb.MsgPreVote || m.Type == pb.MsgPreVoteResp {
		if m.Term == 0 {
			// All {pre-,}campaign messages need to have the term set when
			// sending.
			// - MsgVote: m.Term is the term the node is campaigning for,
			//   non-zero as we increment the term when campaigning.
			// - MsgVoteResp: m.Term is the new r.Term if the MsgVote was
			//   granted, non-zero for the same reason MsgVote is
			// - MsgPreVote: m.Term is the term the node will campaign,
			//   non-zero as we use m.Term to indicate the next term we'll be
			//   campaigning for
			// - MsgPreVoteResp: m.Term is the term received in the original
			//   MsgPreVote if the pre-vote was granted, non-zero for the
			//   same reasons MsgPreVote is
			panic(fmt.Sprintf("term should be set when sending %s", m.Type))
		}
	} else {
		if m.Term != 0 {
			panic(fmt.Sprintf("term should not be set when sending %s (was %d)", m.Type, m.Term))
		}
		// do not attach term to MsgProp, MsgReadIndex
		// proposals are a way to forward to the leader and
		// should be treated as local message.
		// MsgReadIndex is also forwarded to leader.
		if m.Type != pb.MsgProp && m.Type != pb.MsgReadIndex {
			m.Term = r.Term
		}
	}
	r.msgs = append(r.msgs, m) // 将消息m追加到r.msgs上
}

消息被追加到r.msgs上,那么哪儿又在读取这个r.msgs呢?只有一个地方,该r.msgs被赋值给其他字段:

// raft/node.go文件
func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready {
	rd := Ready{
		Entries:          r.raftLog.unstableEntries(),
		CommittedEntries: r.raftLog.nextEnts(),
		Messages:         r.msgs, // 将r.msgs赋值给Messages
	}
	...... // 省略其他处理
	return rd
}

传输Messages的地方如下所示:

func (rc *raftNode) serveChannels() {
    ...... // 省略

	// event loop on raft state machine updates
	for {
		select {
		case <-ticker.C:
			rc.node.Tick()

		// store raft entries to wal, then publish over commit channel
		case rd := <-rc.node.Ready():
			rc.wal.Save(rd.HardState, rd.Entries)
			if !raft.IsEmptySnap(rd.Snapshot) {
				rc.saveSnap(rd.Snapshot)
				rc.raftStorage.ApplySnapshot(rd.Snapshot)
				rc.publishSnapshot(rd.Snapshot)
			}
			rc.raftStorage.Append(rd.Entries)
			rc.transport.Send(rd.Messages) // 调用传输模块,发送消息。这个传输模块是ETCD的etcdserver模块提供的。
			applyDoneC, ok := rc.publishEntries(rc.entriesToApply(rd.CommittedEntries))
			if !ok {
				rc.stop()
				return
			}
			rc.maybeTriggerSnapshot(applyDoneC)
			rc.node.Advance()

		case err := <-rc.transport.ErrorC:
			rc.writeError(err)
			return

		case <-rc.stopc:
			rc.stop()
			return
		}
	}
}

2.2 消息的接收

在集群中,Follower会接收到leader的消息,我们直接看becomeFollower函数,如下所示:

// raft/raft.go 文件
func (r *raft) becomeFollower(term uint64, lead uint64) {
	r.step = stepFollower // ref-15 设置处理消息接收的函数
	r.reset(term)
	r.tick = r.tickElection
	r.lead = lead
	r.state = StateFollower
	r.logger.Infof("%x became follower at term %d", r.id, r.Term)
}

我们接着看关键函数stepFollower,如下所示:

// raft/raft.go文件
func stepFollower(r *raft, m pb.Message) error {
	switch m.Type {
	case pb.MsgProp:
		if r.lead == None {
			r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
			return ErrProposalDropped
		} else if r.disableProposalForwarding {
			r.logger.Infof("%x not forwarding to leader %x at term %d; dropping proposal", r.id, r.lead, r.Term)
			return ErrProposalDropped
		}
		m.To = r.lead
		r.send(m)
	case pb.MsgApp: // 上文中Leader最后发送的消息类型就是pb.MsgApp,因此会走这个分支
		r.electionElapsed = 0
		r.lead = m.From
		r.handleAppendEntries(m) // ref-16 处理消息中的entries数据
	case pb.MsgHeartbeat:
		r.electionElapsed = 0
		r.lead = m.From
		r.handleHeartbeat(m)
	case pb.MsgSnap:
		r.electionElapsed = 0
		r.lead = m.From
		r.handleSnapshot(m)
	case pb.MsgTransferLeader:
		if r.lead == None {
			r.logger.Infof("%x no leader at term %d; dropping leader transfer msg", r.id, r.Term)
			return nil
		}
		m.To = r.lead
		r.send(m)
	case pb.MsgTimeoutNow:
		r.logger.Infof("%x [term %d] received MsgTimeoutNow from %x and starts an election to get leadership.", r.id, r.Term, m.From)
		// Leadership transfers never use pre-vote even if r.preVote is true; we
		// know we are not recovering from a partition so there is no need for the
		// extra round trip.
		r.hup(campaignTransfer)
	case pb.MsgReadIndex:
		if r.lead == None {
			r.logger.Infof("%x no leader at term %d; dropping index reading msg", r.id, r.Term)
			return nil
		}
		m.To = r.lead
		r.send(m)
	case pb.MsgReadIndexResp:
		if len(m.Entries) != 1 {
			r.logger.Errorf("%x invalid format of MsgReadIndexResp from %x, entries count: %d", r.id, m.From, len(m.Entries))
			return nil
		}
		r.readStates = append(r.readStates, ReadState{Index: m.Index, RequestCtx: m.Entries[0].Data})
	}
	return nil
}

我们接着看关键函数handleAppendEntries,如下所示:

// raft/raft.go文件
func (r *raft) handleAppendEntries(m pb.Message) {
	if m.Index < r.raftLog.committed { // 如果消息的index小于提交的记录,则什么也不做。
		r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
		return
	}

    // 开始追加entry数据
	if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {
		r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
	} else {
		...... // 省略
	}
}

现在关键步骤是r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...),我们接着看:

// raft/log.go 文件
// maybeAppend returns (0, false) if the entries cannot be appended. Otherwise,
// it returns (last index of new entries, true).
func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents ...pb.Entry) (lastnewi uint64, ok bool) {
	if l.matchTerm(index, logTerm) {
		lastnewi = index + uint64(len(ents))
		ci := l.findConflict(ents)
		switch {
		case ci == 0:
		case ci <= l.committed:
			l.logger.Panicf("entry %d conflict with committed entry [committed(%d)]", ci, l.committed)
		default:
			offset := index + 1
			l.append(ents[ci-offset:]...) // ref-16 将数据追加到日志中
		}
		l.commitTo(min(committed, lastnewi)) // 提交数据
		return lastnewi, true
	}
	return 0, false
}

ref-16处代码在处理数据的追加,详细细节如下:

// raft/log.go文件
func (l *raftLog) append(ents ...pb.Entry) uint64 {
	if len(ents) == 0 {
		return l.lastIndex()
	}
	if after := ents[0].Index - 1; after < l.committed {
		l.logger.Panicf("after(%d) is out of range [committed(%d)]", after, l.committed)
	}
	l.unstable.truncateAndAppend(ents)
	return l.lastIndex()
}

// raft/log_unstable.go文件
func (u *unstable) truncateAndAppend(ents []pb.Entry) {
	after := ents[0].Index
	switch {
	case after == u.offset+uint64(len(u.entries)):
		// after is the next index in the u.entries
		// directly append
		u.entries = append(u.entries, ents...)
	case after <= u.offset:
		u.logger.Infof("replace the unstable entries from index %d", after)
		// The log is being truncated to before our current offset
		// portion, so set the offset and replace the entries
		u.offset = after
		u.entries = ents
	default:
		// truncate to after and copy to u.entries
		// then append
		u.logger.Infof("truncate the unstable entries before index %d", after)
		u.entries = append([]pb.Entry{}, u.slice(u.offset, after)...)
		u.entries = append(u.entries, ents...)
	}
}

func (u *unstable) truncateAndAppend(ents []pb.Entry) {
	after := ents[0].Index
	switch {
	case after == u.offset+uint64(len(u.entries)):
		// after is the next index in the u.entries
		// directly append
		u.entries = append(u.entries, ents...)
	case after <= u.offset:
		u.logger.Infof("replace the unstable entries from index %d", after)
		// The log is being truncated to before our current offset
		// portion, so set the offset and replace the entries
		u.offset = after
		u.entries = ents
	default:
		// truncate to after and copy to u.entries
		// then append
		u.logger.Infof("truncate the unstable entries before index %d", after)
		u.entries = append([]pb.Entry{}, u.slice(u.offset, after)...)
		u.entries = append(u.entries, ents...)
	}
}

日志复制流程的分析到这儿就结束了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/40095.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

1.Docker概念

文章目录 Docker概念Docker容器与虚拟机的区别内核中的2个重要技术Linux Namespace的6大类型docker三个重要概念部署Dockeryum安装二进制安装 Docker 概念 docker是一个开源的应用容器引擎&#xff0c;基于go语言开发并遵循了apache2.0协议开源。docker可以让开发者打包他们的…

AtcoderABC243场

A - Shampoo A - Shampoo ] 题目大意 高桥家有三个人&#xff1a;高桥、他的父亲和他的母亲。每个人每晚都在浴室洗头发。他们按照顺序使用AA、BB和CC毫升的洗发水。 问&#xff0c;今天早上瓶子里有VV毫升的洗发水。在不重新装满的情况下&#xff0c;谁会第一个用完洗发水洗头…

K8s入门

K8s入门 目录 K8s入门namespacepoddeployment多版本扩缩容治愈能力滚动更新版本回退 serviceClusterIPNodePort ingress域名访问路径重写流量限制 存储抽象PV&PVCConfigMapSecret namespace kubectl get ns # 获取命名空间 kubectl create ns 名字 # 创建命名空间 ku…

学习babylon.js --- [3] 开启https

babylonjs提供WebVR功能&#xff0c;但是使用这个功能得用https&#xff0c;本文讲述如何使用自签名证书来开启https&#xff0c;基于第二篇文章中搭建的工程。 一 生成自签名证书 首先要安装openssl&#xff0c;这个去网上搜下就行了。安装完之后在终端下输入openssl回车可以…

【CNN记录】pytorch中BatchNorm2d

torch.nn.BatchNorm2d(num_features, eps1e-05, momentum0.1, affineTrue, track_running_statsTrue, deviceNone, dtypeNone) 功能&#xff1a;对输入的四维数组进行批量标准化处理&#xff08;归一化&#xff09; 计算公式如下&#xff1a; 对于所有的batch中样本的同一个ch…

【Spring core学习四】Bean作用域和生命周期

目录 一、Bean的作用域 &#x1f308;1、被修改的Bean值现象 &#x1f308;2、 Bean 的 6 种作⽤域 &#x1f308;3、设置作用域 二、Spring的执行流程 三、Bean的生命周期 &#x1f308;1、Bean生命周期的过程 &#x1f308;2、演示生命周期 一、Bean的作用域 &…

大华相机接入web页面实现人脸识别

先看下效果&#xff0c;中间主视频流就是大华相机&#xff08;视频编码H.264&#xff09;&#xff0c;海康相机&#xff08;视屏编码H.265&#xff09; 前端接入视屏流代码 <!--视频流--><div id"col2"><div class"cell" style"flex: …

[GXYCTF2019]simple CPP

前言 三个加密区域&#xff0c;第一次是基本运算&#xff0c;八位叠加&#xff0c;z3方程 分析 第一轮加密&#xff0c;和Dst中模27异或 &#xff08;出题人对动调有很大意见呢&#xff09; 将输入的字符串按八位存入寄存器中&#xff0c;然后将寄存器内容转存到内存 第一次…

数仓-零基础小白到入土-学习路线

数仓-零基础小白到入土-学习路线 铺垫一下下讲在前面涉及基础技术栈&#xff1a;中级&#xff1a;全部掌握之后&#x1f446;&#xff1a;去刷面试题&#xff1a; 初级中级高级博主独家面试题&#xff1a;数仓名词&#xff1a;催更我戳戳个人主页&#xff1a;[up自己的网站](ht…

【电路原理学习笔记】第4章:能量与功率:4.5 稳压电源与电池

第4章&#xff1a;能量与功率 4.5 稳压电源与电池 电网采用交流电形式将电能从发电站传输给用户&#xff0c;这是因为交流电易于转换成适宜传输的高压和终端用户使用的低压。在远距离传输时&#xff0c;采用高电压传输的效率和效益要高得多。对于给定的功率&#xff0c;较高的…

基于linux下的高并发服务器开发(第一章)- 目录操作函数

09 / 目录操作函数 &#xff08;1&#xff09;int mkdir(const char* pathname,mode_t mode); #include <sys/stat.h> #include <sys/types.h>int mkdir(const char *pathname, mode_t mode); 作用&#xff1a;创建一个目录 参数&#xff1a; pat…

【OC总结- Block】

文章目录 前言2. Block2.1 Block的使用规范2.2 __block修饰符2.3 Block的类型2.4 Block的循环引用及解决循环引用的场景引入解决循环引用Block循环引用场景 2.5 Block的实现及其本质2.5.1 初始化部分2.5.2 调用部分2.5.3 捕获变量 Block本质2.6 Block捕获变量 和 对象2.7 Block…

基于 ChatGPT 的 helm 入门

1. 写在最前面 公司最近在推业务上云&#xff08;底层为 k8s 管理&#xff09;&#xff0c;平台侧为了简化业务侧部署的复杂度&#xff0c;基于 helm 、chart 等提供了一个发布平台。 发布平台的使用使业务侧在不了解 helm 、chart 等工具的时候&#xff0c;「只要点点」就可…

LCD—STM32液晶显示(1.显示器简介及LCD显示原理)(6000字详细介绍)

目录 显示器简介 液晶显示器 液晶 像素 液晶屏缺点 LED显示器 OLED显示器 显示器的基本参数 STM32板载液晶控制原理&#xff08;不带微控制器&#xff09; 液晶控制原理 控制信号线(不带液晶控制器) 液晶数据传输时序 显存 总结 3.2寸液晶屏介绍&#xff08;搭载…

IIS Express本地开发测试如何映射到外网访问?

1.IIS Express是什么 IIS Express是为开发人员优化的轻量级、自包含版本的IIS。它具有IIS 7及以上的所有核心功能&#xff0c;以及为简化网站开发而设计的附加功能。 IIS Express&#xff08;跟ASP.NET开发服务器一样&#xff09;可以快速地从硬盘上的某个文件夹上启动网站…

SylixOS下SSH和SFTP连接

简要 基于网络的连接&#xff08;telnet&#xff0c;ftp&#xff09;方便高效&#xff0c;但其是基于明文的通信&#xff0c;容易被窃取、篡改和攻击&#xff0c;存在网络安全问题&#xff0c;尤其在进行远程访问时&#xff0c;穿过复杂未知的公网环境非常危险&#xff0c;为此…

中信银行西安分行举办金融助力外贸企业“走出去“高端论坛

7月14日&#xff0c;中信银行西安分行联合中国出口信用保险公司陕西分公司、西安市工商联举办"智汇西安、信融全球"——金融助力外贸企业"走出去"高端论坛。该论坛紧跟“加快建设贸易强国”的战略指引&#xff0c;以创新金融服务助力外贸企业融入高水平对外…

C++-----vector

本期我们来学习C中的vector&#xff0c;因为有string的基础&#xff0c;所以我们会讲解的快一点 目录 vector介绍 vector常用接口 构造函数 sort 迭代器 size&#xff0c;max_size&#xff0c;capacity&#xff0c;empty reserve和resize front和back data insert和…

解决appium-doctor报opencv4nodejs cannot be found

一、下载cmake 在CMake官网下载&#xff1a;cmake-3.6.1-win64-x64.msi 二、安装cmake cmake安装过程 在安装时要选择勾选为所有用户添加CMake环境变量 三、检查cmake安装 重新管理员打开dos系统cmd命令提示符&#xff0c;输入cmake -version cmake -version四、安装opencv4no…

pycharm里debug时torch数组显示不全

pycharm里查看torch数组全部值 一、在Pycharm运行torch数组时&#xff0c;通常只能看到数组的一部分二、解决办法1、debug后&#xff0c;鼠标右键想要查看完整的数组&#xff0c;选择Evaluate Expression2、输入np.array(x0.data)&#xff0c;x0为想要查看的数组名&#xff0c;…