etcd raft 的实现
Etcd Raft使用入门及原理解析,随着 etcd 的升级很多网上的文章还是停留在 github.com/coreos/etcd/ 代码库阶段
现在,已经是 go.etcd.io/etcd/ 但是,案例代码也没有修改。因此产生一个念头是修改当前单例为符合对应版本的
介绍
Etcd的Raft库已经在生产环境得到了非常广泛的应用,有力的支撑了etcd、K8S、Docker Swarm、TiDB/TiKV等分布式系统的构建,当你能够熟练的使用一个成熟的Raft库、甚至如果能够自己实现一个,那会有种’有了锤子,干什么都是钉子’的感觉。
特性
Etcd raft基本上已经实现了Raft协议的完整特性,包括:
- Leader选举
- 日志复制
- 日志压缩
- 成员变更
- Leader和Follower都支持高效的线性只读查询请求
- 通过batch、pipeline等手段优化日志复制、网络IO的延迟
概览
etcd的raft实现都在etcd/raft
目录下,但是大部分的实现都在下面几个比较核心的文件:
raft.go
: 从名字也可以看出来,这个是最核心的部分,比如leader选择的逻辑、raft消息的处理逻辑等node.go
: 可以理解为raft集群的一个节点,客户端也主要是这个类打交道,比如心跳的逻辑、propose、状态机、成员变更等都是这个类负责处理。log.go
: raft日志相关的代码,比如保存日志记录raft.proto
: 定义了raft一些核心的RPC数据结构,由于protobuf是跨语言的,因此如果想用其他语言重写etcd raft
,那么至少这部分内容都是可以复用的
用法
客户端主要使用Node
和raft集群交互,首先需要启动一个raft集群,有两种方式:
- 启动一个全新的raft集群
- 加入一个已经存在的raft集群(节点重启、扩容、缩容)
启动一个三节点的集群:
storage := raft.NewMemoryStorage()
c := &raft.Config{
//代表一个节点的ID,必须唯一,并且不能为0,不能重复利用,和zookeeper的id类似
ID: 0x01,
ElectionTick: 10,
HeartbeatTick: 1,
Storage: storage,
MaxSizePerMsg: 4096,
MaxInflightMsgs: 256,
}
//设置节点列表
n := raft.StartNode(c, []raft.Peer{{ID: 0x02}, {ID: 0x03}})
这里需要强调一个点,etcd的raft实现并不包括网络部分,网络通讯部分需要使用者自己实现,因此这里节点列表传入的是ID,而ip:port到id的映射需要库使用者自己实现。
如果让一个新的节点加入集群,那么就不需要传入节点列表,首先通过ProposeConfChange
RPC发起一个成员变更请求,在任意一个raft集群节点都可以,然后启动这个节点:
//配置参考上文中的代码段
n := raft.StartNode(c, nil)
如果是重启一个节点,那么这里需要注意,我们需要恢复这个节点之前的状态,比如当前term、根据快照和日志恢复状态机等:
storage := raft.NewMemoryStorage()
// Recover the in-memory storage from persistent snapshot, state and entries.
// 根据快照、entry日志等恢复当前raft节点到之前的状态
storage.ApplySnapshot(snapshot)
storage.SetHardState(state)
storage.Append(entries)
c := &raft.Config{
ID: 0x01,
ElectionTick: 10,
HeartbeatTick: 1,
Storage: storage,
MaxSizePerMsg: 4096,
MaxInflightMsgs: 256,
}
// Restart raft without peer information.
// Peer information is already included in the storage.
// 重启该raft节点,此时不用传入任何节点相关信息,因为已经在刚刚的恢复过程中填充好了
n := raft.RestartNode(c)
当raft集群启动完成后,对于一个raft节点,用户需要做几件事情,伪码如下:
for {
select {
case <-s.Ticker:
n.Tick()
case rd := <-s.Node.Ready():
saveToStorage(rd.HardState, rd.Entries, rd.Snapshot)
send(rd.Messages)
if !raft.IsEmptySnap(rd.Snapshot) {
processSnapshot(rd.Snapshot)
}
for _, entry := range rd.CommittedEntries {
process(entry)
if entry.Type == raftpb.EntryConfChange {
var cc raftpb.ConfChange
cc.Unmarshal(entry.Data)
s.Node.ApplyConfChange(cc)
}
}
s.Node.Advance()
case <-s.done:
return
}
}
case <-s.Ticker
库使用者需要定时调用tick()
方法,根据节点当前的角色调用对应的逻辑:
- 心跳, leader需要定时发送心跳包给follower
- 选举,如果一定时间没有收到leader的心跳,则转换为候选者,竞选leader
case rd := <-s.Node.Ready():
处理Ready
Ready封装了可以准备开始读取的entries、messages,需要保存到持久化介质、同步给其他节点:
type Ready struct {
// The current volatile state of a Node.
// SoftState will be nil if there is no update.
// It is not required to consume or store SoftState.
*SoftState
// The current state of a Node to be saved to stable storage BEFORE
// Messages are sent.
// HardState will be equal to empty state if there is no update.
pb.HardState
// ReadStates can be used for node to serve linearizable read requests locally
// when its applied index is greater than the index in ReadState.
// Note that the readState will be returned when raft receives msgReadIndex.
// The returned is only valid for the request that requested to read.
ReadStates []ReadState
// Entries specifies entries to be saved to stable storage BEFORE
// Messages are sent.
Entries []pb.Entry
// Snapshot specifies the snapshot to be saved to stable storage.
Snapshot pb.Snapshot
// CommittedEntries specifies entries to be committed to a
// store/state-machine. These have previously been committed to stable
// store.
CommittedEntries []pb.Entry
// Messages specifies outbound messages to be sent AFTER Entries are
// committed to stable storage.
// If it contains a MsgSnap message, the application MUST report back to raft
// when the snapshot has been received or has failed by calling ReportSnapshot.
Messages []pb.Message
// MustSync indicates whether the HardState and Entries must be synchronously
// written to disk or if an asynchronous write is permissible.
MustSync bool
}
调用
Node.Ready()
,处理当前raft节点的状态,其中有些步骤可以并行执行- 将entries、HardState、快照按照顺序写到持久化介质中,底层存储介质支持原子写入,那么也可以一次性将他们写入
- 将所有的消息发送给远程节点,但一定要先将最近的HardState、上一轮Ready中的entries都持久化之后(可以和同一轮的entries持久化并行执行)。如果有类型为
MsgSnap
的消息,在这个消息发送成功之后,需要调用Node.ReportSnapshot()
。 - 如果有快照的话需要和已提交的entries一起应用到状态机(库使用者提供),如果已经提交的entries中包含
EntryConfChange
,那么需要调用Node.ApplyConfChange()
将节点的变更信息同步到本节点
调用
Node.Advance()
通知节点,表明本轮Ready已经处理完毕,可以开始处理下一轮。
另外还需要注意,由于网络部分需要库使用者自己实现,因此当收到一条消息的时候,需要将该消息转发给raft节点:
func recvRaftRPC(ctx context.Context, m raftpb.Message) {
n.Step(ctx, m)
}
发起提议
如果需要向raft集群发起一个提议,那么需要用下面这种方式:
// 协议的数据持久化成字节数组
n.Propose(ctx, data)
如果找个提议处理完成(已经持久化到持久化介质并同步到其他节点),那么就可以通过Ready
的comitedEntries获取到,类型是raftpb.EntryNormal
, 然后用户就可以根据自己的业务逻辑,将其应用到状态机中。
raft集群不保证该协议一定能够处理成功,若一定超时时间内,还未收到响应,那么需要根据业务场景考虑是否需要重试。
节点变更
如果需要对raft集群扩容或缩容,那么需要构造ConfChange
,并调用:
n.ProposeConfChange(ctx, cc)
如果该变更请求处理成功,那么在commitedEntries中会有一条类型为raftpb.EntryConfChange
的记录,
var cc raftpb.ConfChange
cc.Unmarshal(data)
n.ApplyConfChange(cc)
需要自己实现的部分
etcd的raft已经实现了大部分的功能,但是还是有几个组件需要使用者自己根据业务场景实现:
- 网络通讯部分
- Write ahead log
- 快照
网络通讯部分
网络部分说白了就是消息的收发,你可以理解为raft只依赖了接口,这个接口实现了两个方法: send
、receive
,但是具体的实现需要库使用者自己写,这部分相对比较简单,使用RPC、HTTP、自定义协议都可以,具体的实现逻辑可以参考etcd自己的代码
Write-Ahead-Log(WAL)
如上文中提到的,用户需要保存Ready中的一些状态,比如entries、hardstate等,WAL有很多分布式系统都实现了,基本上参考他们的实现,结合自己的业务实现一个难度不会很大,如果是直接使用etcd raft库,那么可以直接基于etcd中wal的实现,另外也可以基于RocksDB等嵌入式KV实现,但是对于key-value的结构设计要考虑好,wal的原理后面有时间再叙述。
快照
快照应该都知道,比如说Redis的持久化,有一种模式是保存用户发过来的命令,但时间长了之后,这个日志会变的越来越大,这个时候当你扩容、重启节点的时候,加载这个文件会耗费很长时间,导致服务不可用,因此需要将内存中的状态持久化到磁盘中。 比如:
incr index
incr index
这个时候index的值为2,当然这个例子只有两条命令,但假如说有一千万条记录,那么重放日志需要耗费很长时间,因此我们可以直接将index:2
这个kv对写到磁盘中,那么这个时候之前对这个key的一千万条操作日志就变成了这一条记录。
那么raft的快照其实也类似,应用需要将自己状态机的当前快照,持久化成一个快照文件,并写入磁盘中,我们知道这个过程会非常慢,因此可以考虑和其他过程并行执行,以及其他的一些性能优化,这个后面的博客再写。
简单来实现的话,我们直接将状态机用json序列化成一个字节数组,并写入到本地文件中,后续读取的时候。
如何基于raft实现一个简单的分布式KV存储
这里简单描述一下流程,只是为了更容易理解etcd raft的使用方法,后面会再写篇博客详细记录:
- 应用实现自己的状态机,处理快照、已提交日志、WAL等
- 当用户发起一个put请求时,将该请求序列化成字节数组,propose到raft集群
- 处理成功后,会出现在commitedEntries中,解析该entry,回放到状态机中,这个时候该请求的结果已经可以在所有的raft节点上查询到了
- 用户发起查询请求,直接在用户封装的状态机中查询,并返回给用户
总结
本文只是简单描述了下etcd raft的使用方法,总的来说etcd raft的实现已经非常完善,但还是需要用户自己处理非常多的细节,比如网络、write aheadlog等,如果对raft不熟悉,相信会很难上手,我的想法是能够在其之上再封装一层,提供一个状态机接口,用户只需要关心自己的业务逻辑,其他的全部都交给库来处理。
``