etcd raft 的实现

Etcd Raft使用入门及原理解析,随着 etcd 的升级很多网上的文章还是停留在 github.com/coreos/etcd/ 代码库阶段

现在,已经是 go.etcd.io/etcd/ 但是,案例代码也没有修改。因此产生一个念头是修改当前单例为符合对应版本的

介绍

Etcd的Raft库已经在生产环境得到了非常广泛的应用,有力的支撑了etcd、K8S、Docker Swarm、TiDB/TiKV等分布式系统的构建,当你能够熟练的使用一个成熟的Raft库、甚至如果能够自己实现一个,那会有种’有了锤子,干什么都是钉子’的感觉。

特性

Etcd raft基本上已经实现了Raft协议的完整特性,包括:

  • Leader选举
  • 日志复制
  • 日志压缩
  • 成员变更
  • Leader和Follower都支持高效的线性只读查询请求
  • 通过batch、pipeline等手段优化日志复制、网络IO的延迟

概览

etcd的raft实现都在etcd/raft目录下,但是大部分的实现都在下面几个比较核心的文件:

  • raft.go: 从名字也可以看出来,这个是最核心的部分,比如leader选择的逻辑、raft消息的处理逻辑等
  • node.go: 可以理解为raft集群的一个节点,客户端也主要是这个类打交道,比如心跳的逻辑、propose、状态机、成员变更等都是这个类负责处理。
  • log.go: raft日志相关的代码,比如保存日志记录
  • raft.proto: 定义了raft一些核心的RPC数据结构,由于protobuf是跨语言的,因此如果想用其他语言重写etcd raft,那么至少这部分内容都是可以复用的

用法

客户端主要使用Node和raft集群交互,首先需要启动一个raft集群,有两种方式:

  • 启动一个全新的raft集群
  • 加入一个已经存在的raft集群(节点重启、扩容、缩容)

启动一个三节点的集群:

	storage := raft.NewMemoryStorage()
	c := &raft.Config{
		//代表一个节点的ID,必须唯一,并且不能为0,不能重复利用,和zookeeper的id类似
		ID:              0x01,
		ElectionTick:    10,
		HeartbeatTick:   1,
		Storage:         storage,
		MaxSizePerMsg:   4096,
		MaxInflightMsgs: 256,
	}

 //设置节点列表
  n := raft.StartNode(c, []raft.Peer{{ID: 0x02}, {ID: 0x03}})

这里需要强调一个点,etcd的raft实现并不包括网络部分,网络通讯部分需要使用者自己实现,因此这里节点列表传入的是ID,而ip:port到id的映射需要库使用者自己实现。 如果让一个新的节点加入集群,那么就不需要传入节点列表,首先通过ProposeConfChangeRPC发起一个成员变更请求,在任意一个raft集群节点都可以,然后启动这个节点:

  //配置参考上文中的代码段
  n := raft.StartNode(c, nil)

如果是重启一个节点,那么这里需要注意,我们需要恢复这个节点之前的状态,比如当前term、根据快照和日志恢复状态机等:

 storage := raft.NewMemoryStorage()

  // Recover the in-memory storage from persistent snapshot, state and entries.
  // 根据快照、entry日志等恢复当前raft节点到之前的状态
  storage.ApplySnapshot(snapshot)
  storage.SetHardState(state)
  storage.Append(entries)

  c := &raft.Config{
    ID:              0x01,
    ElectionTick:    10,
    HeartbeatTick:   1,
    Storage:         storage,
    MaxSizePerMsg:   4096,
    MaxInflightMsgs: 256,
  }

  // Restart raft without peer information.
  // Peer information is already included in the storage.
  // 重启该raft节点,此时不用传入任何节点相关信息,因为已经在刚刚的恢复过程中填充好了
  n := raft.RestartNode(c)

当raft集群启动完成后,对于一个raft节点,用户需要做几件事情,伪码如下:

for {
    select {
    case <-s.Ticker:
      n.Tick()
    case rd := <-s.Node.Ready():
      saveToStorage(rd.HardState, rd.Entries, rd.Snapshot)
      send(rd.Messages)
      if !raft.IsEmptySnap(rd.Snapshot) {
        processSnapshot(rd.Snapshot)
      }
      for _, entry := range rd.CommittedEntries {
        process(entry)
        if entry.Type == raftpb.EntryConfChange {
          var cc raftpb.ConfChange
          cc.Unmarshal(entry.Data)
          s.Node.ApplyConfChange(cc)
        }
      }
      s.Node.Advance()
    case <-s.done:
      return
    }
  }
case <-s.Ticker

库使用者需要定时调用tick()方法,根据节点当前的角色调用对应的逻辑:

  • 心跳, leader需要定时发送心跳包给follower
  • 选举,如果一定时间没有收到leader的心跳,则转换为候选者,竞选leader
case rd := <-s.Node.Ready(): 处理Ready

Ready封装了可以准备开始读取的entries、messages,需要保存到持久化介质、同步给其他节点:

type Ready struct {
	// The current volatile state of a Node.
	// SoftState will be nil if there is no update.
	// It is not required to consume or store SoftState.
	*SoftState

	// The current state of a Node to be saved to stable storage BEFORE
	// Messages are sent.
	// HardState will be equal to empty state if there is no update.
	pb.HardState

	// ReadStates can be used for node to serve linearizable read requests locally
	// when its applied index is greater than the index in ReadState.
	// Note that the readState will be returned when raft receives msgReadIndex.
	// The returned is only valid for the request that requested to read.
	ReadStates []ReadState

	// Entries specifies entries to be saved to stable storage BEFORE
	// Messages are sent.
	Entries []pb.Entry

	// Snapshot specifies the snapshot to be saved to stable storage.
	Snapshot pb.Snapshot

	// CommittedEntries specifies entries to be committed to a
	// store/state-machine. These have previously been committed to stable
	// store.
	CommittedEntries []pb.Entry

	// Messages specifies outbound messages to be sent AFTER Entries are
	// committed to stable storage.
	// If it contains a MsgSnap message, the application MUST report back to raft
	// when the snapshot has been received or has failed by calling ReportSnapshot.
	Messages []pb.Message

	// MustSync indicates whether the HardState and Entries must be synchronously
	// written to disk or if an asynchronous write is permissible.
	MustSync bool
}
  • 调用Node.Ready(),处理当前raft节点的状态,其中有些步骤可以并行执行

    • 将entries、HardState、快照按照顺序写到持久化介质中,底层存储介质支持原子写入,那么也可以一次性将他们写入
    • 将所有的消息发送给远程节点,但一定要先将最近的HardState、上一轮Ready中的entries都持久化之后(可以和同一轮的entries持久化并行执行)。如果有类型为MsgSnap的消息,在这个消息发送成功之后,需要调用Node.ReportSnapshot()
    • 如果有快照的话需要和已提交的entries一起应用到状态机(库使用者提供),如果已经提交的entries中包含EntryConfChange,那么需要调用Node.ApplyConfChange()将节点的变更信息同步到本节点
  • 调用Node.Advance()通知节点,表明本轮Ready已经处理完毕,可以开始处理下一轮。

另外还需要注意,由于网络部分需要库使用者自己实现,因此当收到一条消息的时候,需要将该消息转发给raft节点:

	func recvRaftRPC(ctx context.Context, m raftpb.Message) {
		n.Step(ctx, m)
	}

发起提议

如果需要向raft集群发起一个提议,那么需要用下面这种方式:

        // 协议的数据持久化成字节数组
	n.Propose(ctx, data)

如果找个提议处理完成(已经持久化到持久化介质并同步到其他节点),那么就可以通过Ready的comitedEntries获取到,类型是raftpb.EntryNormal, 然后用户就可以根据自己的业务逻辑,将其应用到状态机中。

raft集群不保证该协议一定能够处理成功,若一定超时时间内,还未收到响应,那么需要根据业务场景考虑是否需要重试。

节点变更

如果需要对raft集群扩容或缩容,那么需要构造ConfChange,并调用:

	n.ProposeConfChange(ctx, cc)

如果该变更请求处理成功,那么在commitedEntries中会有一条类型为raftpb.EntryConfChange的记录,

    var cc raftpb.ConfChange
	cc.Unmarshal(data)
	n.ApplyConfChange(cc)

需要自己实现的部分

etcd的raft已经实现了大部分的功能,但是还是有几个组件需要使用者自己根据业务场景实现:

  • 网络通讯部分
  • Write ahead log
  • 快照

网络通讯部分

网络部分说白了就是消息的收发,你可以理解为raft只依赖了接口,这个接口实现了两个方法: sendreceive,但是具体的实现需要库使用者自己写,这部分相对比较简单,使用RPC、HTTP、自定义协议都可以,具体的实现逻辑可以参考etcd自己的代码

Write-Ahead-Log(WAL)

如上文中提到的,用户需要保存Ready中的一些状态,比如entries、hardstate等,WAL有很多分布式系统都实现了,基本上参考他们的实现,结合自己的业务实现一个难度不会很大,如果是直接使用etcd raft库,那么可以直接基于etcd中wal的实现,另外也可以基于RocksDB等嵌入式KV实现,但是对于key-value的结构设计要考虑好,wal的原理后面有时间再叙述。

快照

快照应该都知道,比如说Redis的持久化,有一种模式是保存用户发过来的命令,但时间长了之后,这个日志会变的越来越大,这个时候当你扩容、重启节点的时候,加载这个文件会耗费很长时间,导致服务不可用,因此需要将内存中的状态持久化到磁盘中。 比如:

incr index 
incr index

这个时候index的值为2,当然这个例子只有两条命令,但假如说有一千万条记录,那么重放日志需要耗费很长时间,因此我们可以直接将index:2这个kv对写到磁盘中,那么这个时候之前对这个key的一千万条操作日志就变成了这一条记录。 那么raft的快照其实也类似,应用需要将自己状态机的当前快照,持久化成一个快照文件,并写入磁盘中,我们知道这个过程会非常慢,因此可以考虑和其他过程并行执行,以及其他的一些性能优化,这个后面的博客再写。 简单来实现的话,我们直接将状态机用json序列化成一个字节数组,并写入到本地文件中,后续读取的时候。

如何基于raft实现一个简单的分布式KV存储

这里简单描述一下流程,只是为了更容易理解etcd raft的使用方法,后面会再写篇博客详细记录:

  • 应用实现自己的状态机,处理快照、已提交日志、WAL等
  • 当用户发起一个put请求时,将该请求序列化成字节数组,propose到raft集群
  • 处理成功后,会出现在commitedEntries中,解析该entry,回放到状态机中,这个时候该请求的结果已经可以在所有的raft节点上查询到了
  • 用户发起查询请求,直接在用户封装的状态机中查询,并返回给用户

总结

本文只是简单描述了下etcd raft的使用方法,总的来说etcd raft的实现已经非常完善,但还是需要用户自己处理非常多的细节,比如网络、write aheadlog等,如果对raft不熟悉,相信会很难上手,我的想法是能够在其之上再封装一层,提供一个状态机接口,用户只需要关心自己的业务逻辑,其他的全部都交给库来处理。

``

参考资料