KV-Server记录

KV-Server记录

RPC模块

项目使用到的RPC代码高度依赖于protobuf。RPC 是一种使得分布式系统中的不同模块之间能够透明地进行远程调用的技术,使得开发者可以更方便地构建分布式系统,而不用过多关注底层通信细节,调用另一台机器的方法会表现的像调用本地的方法一样。

那么无论对外表现如何,只要设计多个主机之间的通信,必不可少的就是网络通讯这一步

我们可以看看一次RPC请求到底干了什么?

img

由于底层网络通信框架使用的是运输层协议,只能发送字节流,因此会涉及到对象的序列化/反序列化问题,即上图中所示的黄色部分,而常见的网络数据传输格式包括如下三种:

  • XML:一种通用和轻量级的数据交换格式语言,是指可扩展标记语言以文本结构进行存储。
  • JSON:一种通用和轻量级的数据交换格式,也是以文本的结构进行存储,是一种简单的消息格式。JSON 作为数据包格式传输时具有更高的效率,这是因为 JSON 不像 XML 那样需要有严格的闭合标签,这就让有效数据量与总数据包比有着显著的提升,从而减少同等数据流量的情况下网络的传输压力。
  • Protobuf:是 Google 开发的一种独立和轻量级的数据交换格式,以二进制结构进行存储,用于不同服务之间序列化数据。它是一种轻便高效的结构化数据存储格式,可以用于结构化数据串行化,或者序列化,可用于通讯协议、数据存储等领域的语言无关、平台无关、可扩展的序列化结构数据格式。

而该项目便是使用 Protobuf 来进行消息的序列化和反序列化,同时使用其来实现RPC框架,其底层的通信流程如下图所示:

img

打包流程

  1. 序列化函数参数得到 argsStr,其长度为 argsSize;
  2. 打包 service_name、method_name 和 argsSize 得到 rpcHeader;
  3. 序列化 rpcHeader 得到 rpcHeaderStr,其长度为 headerSize;
  4. 将 headerSize 存储到数据包的前 4 个字节,后面的依次是 rpcHeaderStr 和 argsStr;
  5. 通过网络发送数据包;

解包流程

  1. 通过网络接收数据包;
  2. 首先取出数据包的前 4 个字节,读取出 headerSize 的大小;
  3. 从第 5 个字节开始,读取 headerSize 字节大小的数据,即为 rpcHeaderStr 的内容;
  4. 反序列化 rpcHeaderStr,得到 service_name、method_name 和 argsSize;
  5. 从 4+headerSize 字节开始,读取 argsSize 字节大小的数据,即为 argsStr 的内容;
  6. 反序列化 argsStr 得到函数参数 args;

消息内容

raft节点之间rpc通信函数:

1
2
3
4
5
6
7
// 只有raft节点之间才会涉及rpc通信
service raftRpc
{
rpc AppendEntries(AppendEntriesArgs) returns(AppendEntriesReply);
rpc InstallSnapshot (InstallSnapshotRequest) returns (InstallSnapshotResponse);
rpc RequestVote (RequestVoteArgs) returns (RequestVoteReply);
}

日志实体

1
2
3
4
5
6
// 日志实体
message LogEntry{
bytes Command =1; // 命令,猜测是增删改查的命令?
int32 LogTerm =2; // 日志
int32 LogIndex = 3;
}

RpcArgs内容:

1
2
3
4
5
6
7
8
9
10
// AppendEntriesArgs 由leader复制log条目,也可以当做是心跳连接,注释中的rf为leader节点
message AppendEntriesArgs {
// 下面几个参数和论文中相同
int32 Term =1; // 自己当前的任期号
int32 LeaderId =2; // 领导人的id
int32 PrevLogIndex =3; // 前一个日志的日志号
int32 PrevLogTerm =4; // 前一个日志的任期号
repeated LogEntry Entries = 5; // 当前日志体
int32 LeaderCommit = 6; // 当前日志体
}

RpcReply内容:

1
2
3
4
5
6
7
// AppendEntriesReply 论文中没有提及返回要设置哪些状态
message AppendEntriesReply {
int32 Term =1; // leader的term可能是与Follower不同的,
bool Success =2;
int32 UpdateNextIndex = 3; // 快速调整leader对应的nextIndex
int32 AppState =4; // 用来标识节点(网络)状态
}

请求投票参数

1
2
3
4
5
6
message RequestVoteArgs  {
int32 Term =1;
int32 CandidateId =2;
int32 LastLogIndex =3;
int32 LastLogTerm =4;
}

请求投票回复

1
2
3
4
5
6
7
8
9
// RequestVoteReply
// example RequestVote RPC reply structure.
// field names must start with capital letters!
message RequestVoteReply {
// Your data here (2A).
int32 Term =1;
bool VoteGranted =2;
int32 VoteState =3;
}

发起快照请求:

1
2
3
4
5
6
7
message InstallSnapshotRequest  {
int32 LeaderId =1; // 领导人的 ID,以便于跟随者重定向请求
int32 Term =2; // 领导人的任期号
int32 LastSnapShotIncludeIndex =3; // 快照中包含的最后日志条目的索引值
int32 LastSnapShotIncludeTerm =4; // 快照中包含的最后日志条目的任期号
bytes Data =5;//快照信息,当然是用bytes来传递
}

发起快照回复

1
2
3
4
// InstallSnapshotResponse 只用返回Term,因为对于快照只要Term是符合的就是无条件接受的
message InstallSnapshotResponse {
int32 Term = 1;
}

Raft类主要内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
class Raft :
{
private:
std::mutex m_mtx;
std::vector<std::shared_ptr< RaftRpc >> m_peers; //需要与其他raft节点通信,这里保存与其他结点通信的rpc入口
std::shared_ptr<Persister> m_persister; //持久化层,负责raft数据的持久化
int m_me; //raft是以集群启动,这个用来标识自己的的编号
int m_currentTerm; //记录当前的term
int m_votedFor; //记录当前term给谁投票过
std::vector<mprrpc:: LogEntry> m_logs; //// 日志条目数组,包含了状态机要执行的指令集,以及收到领导时的任期号
// 这两个状态所有结点都在维护,易失
int m_commitIndex;
int m_lastApplied; // 已经汇报给状态机(上层应用)的log 的index

// 这两个状态是由leader来维护,易失 ,这两个部分在内容补充的部分也会再讲解
std::vector<int> m_nextIndex; // 这两个状态的下标1开始,因为通常commitIndex和lastApplied从0开始,应该是一个无效的index,因此下标从1开始
std::vector<int> m_matchIndex;
enum Status
{
Follower,
Candidate,
Leader
};
// 保存当前身份
Status m_status;

std::shared_ptr<LockQueue<ApplyMsg>> applyChan; // client从这里取日志,client与raft通信的接口
// ApplyMsgQueue chan ApplyMsg // raft内部使用的chan,applyChan是用于和服务层交互,最后好像没用上

// 选举超时
std::chrono::_V2::system_clock::time_point m_lastResetElectionTime;
// 心跳超时,用于leader
std::chrono::_V2::system_clock::time_point m_lastResetHearBeatTime;

// 用于传入快照点
// 储存了快照中的最后一个日志的Index和Term
int m_lastSnapshotIncludeIndex;
int m_lastSnapshotIncludeTerm;

public:

void AppendEntries1(const mprrpc::AppendEntriesArgs *args, mprrpc::AppendEntriesReply *reply); //日志同步 + 心跳 rpc ,重点关注
void applierTicker(); //定期向状态机写入日志,非重点函数

bool CondInstallSnapshot(int lastIncludedTerm, int lastIncludedIndex, std::string snapshot); //快照相关,非重点
void doElection(); //发起选举
void doHeartBeat(); //leader定时发起心跳
// 每隔一段时间检查睡眠时间内有没有重置定时器,没有则说明超时了
// 如果有则设置合适睡眠时间:睡眠到重置时间+超时时间
void electionTimeOutTicker(); //监控是否该发起选举了
std::vector<ApplyMsg> getApplyLogs();
int getNewCommandIndex();
void getPrevLogInfo(int server, int *preIndex, int *preTerm);
void GetState(int *term, bool *isLeader); //看当前节点是否是leader
void InstallSnapshot( const mprrpc::InstallSnapshotRequest *args, mprrpc::InstallSnapshotResponse *reply);
void leaderHearBeatTicker(); //检查是否需要发起心跳(leader)
void leaderSendSnapShot(int server);
void leaderUpdateCommitIndex(); //leader更新commitIndex
bool matchLog(int logIndex, int logTerm); //对应Index的日志是否匹配,只需要Index和Term就可以知道是否匹配
void persist(); //持久化
void RequestVote(const mprrpc::RequestVoteArgs *args, mprrpc::RequestVoteReply *reply); //变成candidate之后需要让其他结点给自己投票
bool UpToDate(int index, int term); //判断当前节点是否含有最新的日志
int getLastLogIndex();
void getLastLogIndexAndTerm(int *lastLogIndex, int *lastLogTerm);
int getLogTermFromLogIndex(int logIndex);
int GetRaftStateSize();
int getSlicesIndexFromLogIndex(int logIndex); //设计快照之后logIndex不能与在日志中的数组下标相等了,根据logIndex找到其在日志数组中的位置


bool sendRequestVote(int server , std::shared_ptr<mprrpc::RequestVoteArgs> args , std::shared_ptr<mprrpc::RequestVoteReply> reply, std::shared_ptr<int> votedNum) ; // 请求其他结点的投票
bool sendAppendEntries(int server ,std::shared_ptr<mprrpc::AppendEntriesArgs> args , std::shared_ptr<mprrpc::AppendEntriesReply> reply , std::shared_ptr<int> appendNums ) ; //Leader发送心跳后,对心跳的回复进行对应的处理


//rf.applyChan <- msg //不拿锁执行 可以单独创建一个线程执行,但是为了同意使用std:thread ,避免使用pthread_create,因此专门写一个函数来执行
void pushMsgToKvServer(ApplyMsg msg); //给上层的kvserver层发送消息
void readPersist(std::string data);
std::string persistData();
void Start(Op command,int* newLogIndex,int* newLogTerm,bool* isLeader ) ; // 发布发来一个新日志
// 即kv-server主动发起,请求raft(持久层)保存snapshot里面的数据,index是用来表示snapshot快照执行到了哪条命令
void Snapshot(int index , std::string snapshot );

public:
void init(std::vector<std::shared_ptr< RaftRpc >> peers,int me,std::shared_ptr<Persister> persister,std::shared_ptr<LockQueue<ApplyMsg>> applyCh); //初始化

重点需要关注的:

  1. Raft的主要流程:领导选举(sendRequestVote RequestVote ) 日志同步、心跳(sendAppendEntries AppendEntries
  2. 定时器的维护:主要包括raft向状态机定时写入(applierTicker )、心跳维护定时器(leaderHearBeatTicker )、选举超时定时器(electionTimeOutTicker )。
  3. 持久化相关:包括哪些内容需要持久化,什么时候需要持久化(persist)
  4. m_nextIndex 保存leader下一次应该从哪一个日志开始发送给follower;
  5. m_matchIndex表示follower在哪一个日志是已经匹配了的(由于日志安全性,某一个日志匹配,那么这个日志及其之前的日志都是匹配的)

init() 启动初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
void Raft::init(std::vector<std::shared_ptr<RaftRpc>> peers, int me, std::shared_ptr<Persister> persister, std::shared_ptr<LockQueue<ApplyMsg>> applyCh) {
m_peers = peers; //与其他结点沟通的rpc类
m_persister = persister; //持久化类
m_me = me; //标记自己,毕竟不能给自己发送rpc吧

m_mtx.lock();

//applier
this->applyChan = applyCh; //与kv-server沟通
// rf.ApplyMsgQueue = make(chan ApplyMsg)
m_currentTerm = 0; //初始化term为0
m_status = Follower; //初始化身份为follower
m_commitIndex = 0;
m_lastApplied = 0;
m_logs.clear();
for (int i =0;i<m_peers.size();i++){
m_matchIndex.push_back(0);
m_nextIndex.push_back(0);
}
m_votedFor = -1; //当前term没有给其他人投过票就用-1表示

m_lastSnapshotIncludeIndex = 0;
m_lastSnapshotIncludeTerm = 0;
m_lastResetElectionTime = now();
m_lastResetHearBeatTime = now();

// initialize from state persisted before a crash
readPersist(m_persister->ReadRaftState());
if(m_lastSnapshotIncludeIndex > 0){
m_lastApplied = m_lastSnapshotIncludeIndex;
//rf.commitIndex = rf.lastSnapshotIncludeIndex 崩溃恢复不能读取commitIndex
}

m_mtx.unlock();
// start ticker 开始三个定时器
std::thread t(&Raft::leaderHearBeatTicker, this);
t.detach();

std::thread t2(&Raft::electionTimeOutTicker, this);
t2.detach();

std::thread t3(&Raft::applierTicker, this);
t3.detach();
}

从上面可以看到一共产生了三个定时器,分别维护:选举、日志同步和心跳、raft节点与kv-server的联系。相互之间是比较隔离的。

Leader选举

主要涉及函数及其流程:

img

electionTimeOutTicker:负责查看是否该发起选举,如果该发起选举就执行doElection发起选举。

doElection:实际发起选举,构造需要发送的rpc,并多线程调用sendRequestVote处理rpc及其相应。

sendRequestVote:负责发送选举中的RPC,在发送完rpc后还需要负责接收并处理对端发送回来的响应。

RequestVote:接收别人发来的选举请求,主要检验是否要给对方投票。

electionTimeOutTicker

选举超时由于electionTimeOutTicker 维护。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
void Raft::electionTimeOutTicker() {
// Check if a Leader election should be started.
while (true) {
m_mtx.lock();
auto nowTime = now(); //睡眠前记录时间
auto suitableSleepTime = getRandomizedElectionTimeout() + m_lastResetElectionTime - nowTime;
m_mtx.unlock();
if (suitableSleepTime.count() > 1) {
std::this_thread::sleep_for(suitableSleepTime);
}

if ((m_lastResetElectionTime - nowTime).count() > 0) { //说明睡眠的这段时间有重置定时器,那么就没有超时,再次睡眠
continue;
}
doElection();
}
}

在死循环中,首先计算距离下一次超时应该睡眠的时间suitableSleepTime,然后睡眠这段时间,醒来后查看睡眠的这段时间选举超时定时器是否被触发,如果没有触发就发起选举。

“举超时定时器是否被触发”:选举定时器的触发条件:收到leader发来的appendEntryRPC 、给其他的节点选举投票

在死循环中,首先计算距离上次重置选举计时器的时间加上随机化的选举超时时间,然后线程根据这个时间决定是否睡眠。若超时时间未到,线程进入睡眠状态,若在此期间选举计时器被重置,则继续循环。若超时时间已到,调用doElection() 函数启动领导者选举过程。

doElection()函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
void Raft::doElection() {
lock_guard<mutex> g(m_mtx); //c11新特性,使用raii避免死锁


if (m_status != Leader) {
DPrintf("[ ticker-func-rf(%d) ] 选举定时器到期且不是leader,开始选举 \n", m_me);
//当选举的时候定时器超时就必须重新选举,不然没有选票就会一直卡住
m_status = Candidate;
///开始新一轮的选举
m_currentTerm += 1; //无论是刚开始竞选,还是超时重新竞选,term都要增加
m_votedFor = m_me; //即是自己给自己投票,也避免candidate给同辈的candidate投
persist();
std::shared_ptr<int> votedNum = std::make_shared<int>(1); // 使用 make_shared 函数初始化 !! 亮点
// 重新设置定时器
m_lastResetElectionTime = now();
// 发布RequestVote RPC
for (int i = 0; i < m_peers.size(); i++) {
if (i == m_me) {
continue;
}
int lastLogIndex = -1, lastLogTerm = -1;
getLastLogIndexAndTerm(&lastLogIndex, &lastLogTerm);//获取最后一个log的term和下标,以添加到RPC的发送

//初始化发送参数
std::shared_ptr<mprrpc::RequestVoteArgs> requestVoteArgs = std::make_shared<mprrpc::RequestVoteArgs>();
requestVoteArgs->set_term(m_currentTerm);
requestVoteArgs->set_candidateid(m_me);
requestVoteArgs->set_lastlogindex(lastLogIndex);
requestVoteArgs->set_lastlogterm(lastLogTerm);
std::shared_ptr<mprrpc::RequestVoteReply> requestVoteReply = std::make_shared<mprrpc::RequestVoteReply>();

//使用匿名函数执行避免其拿到锁

std::thread t(&Raft::sendRequestVote, this, i, requestVoteArgs, requestVoteReply, votedNum); // 创建新线程并执行函数,并传递参数
t.detach();
}
}
}

Raft::AppendEntries1()函数

如果对方的任期号比自己当前任期号小,说明对方已经是过期领导人了,那么更新reply状态并直接return。

1
2
3
4
5
6
7
8
if (args->term() < m_currentTerm) { 	// 对方的任期号比自己当前任期号小
reply->set_success(false);
reply->set_term(m_currentTerm);
reply->set_updatenextindex(-100); // 论文中:让领导人可以及时更新自己
DPrintf("[func-AppendEntries-rf{%d}] 拒绝了 因为Leader{%d}的term{%v}< rf{%d}.term{%d}\n", m_me, args->leaderid(),
args->term(), m_me, m_currentTerm);
return; // 注意从过期的领导人收到消息不要重设定时器
}

如果对方任期号比自己当前任期号大,那么就说明leader更新了,不管自己之前是不是leader,现在都要成为follwer。

1
2
3
4
5
6
7
8
9
10
if (args->term() > m_currentTerm) { // 对方任期号比自己当前任期号大
// 三变 ,防止遗漏,无论什么时候都是三变
// args.LeaderId, args.Term, rf.me, rf.currentTerm)
m_status = Follower;
m_currentTerm = args->term();
m_votedFor = -1; // 这里设置成-1有意义,如果突然宕机然后上线理论上是可以投票的
// 这里可不返回,应该改成让改节点尝试接收日志
// 如果是领导人和candidate突然转到Follower好像也不用其他操作
// 如果本来就是Follower,那么其term变化,相当于“不言自明”的换了追随的对象,因为原来的leader的term更小,是不会再接收其消息了
}

如果上面两种情况都不满足,那么说明自己收到的就是当前leader发的包。因为只有leader会向follwer更新日志或者发心跳包。这里的args其实就是leader发送过来的参数。

如果leader的prevlogindex(前一个日志的日志号)大于当前节点lastLogIndex(最新的日志号),那么说明leader发送的日志过于新了,当前节点要从lastLogIndex + 1一个一个更新。

1
2
3
4
5
6
7
//	那么就比较日志,日志有3种情况
if (args->prevlogindex() > getLastLogIndex()) {
reply->set_success(false);
reply->set_term(m_currentTerm);
reply->set_updatenextindex(getLastLogIndex() + 1);
return;
}

如果leader的prevlogindex比当前节点快照的最后一条日志号小的话,那么就说明leader共发送的日志比当前节点已经快照的最后一个日志号都要小。

1
2
3
4
5
6
7
8
else if (args->prevlogindex() < m_lastSnapshotIncludeIndex) {
// 如果prevlogIndex还没有更上快照
reply->set_success(false);
reply->set_term(m_currentTerm);
reply->set_updatenextindex(
m_lastSnapshotIncludeIndex +
1); // todo 如果想直接弄到最新好像不对,因为是从后慢慢往前匹配的,这里不匹配说明后面的都不匹配
}

这里为什么?

1
2
3
4
5
6
7
// 错误写法like:  rf.shrinkLogsToIndex(args.PrevLogIndex)
// rf.logs = append(rf.logs, args.Entries...)
// 因为可能会收到过期的log!!! 因此这里是大于等于
myAssert(
getLastLogIndex() >= args->prevlogindex() + args->entries_size(),
format("[func-AppendEntries1-rf{%d}]rf.getLastLogIndex(){%d} != args.PrevLogIndex{%d}+len(args.Entries){%d}",
m_me, getLastLogIndex(), args->prevlogindex(), args->entries_size()));

Raft::sendAppendEntries()函数

函数原型

1
2
3
bool Raft::sendAppendEntries(int server, std::shared_ptr<raftRpcProctoc::AppendEntriesArgs> args,
std::shared_ptr<raftRpcProctoc::AppendEntriesReply> reply,
std::shared_ptr<int> appendNums)

args是要发送的AE rpc,reply是返回,m_peers是一个代理类的数组,直接调用m_peers[server]的AppendEntries函数,并得到返回值,如果ok为false直接报错并返回,否则对reply进行处理。

1
bool ok = m_peers[server]->AppendEntries(args.get(), reply.get());

加锁,同时收到多个reply的并发处理。

1
std::lock_guard<std::mutex> lg1(m_mtx);

如果reply->term() > m_currentTerm,说明自己的任期号已经不是最新的了,将自己转为follower:

1
2
3
4
5
6
if (reply->term() > m_currentTerm) {
m_status = Follower;
m_currentTerm = reply->term();
m_votedFor = -1;
return ok;
}

如果reply->term() < m_currentTerm,说明收到的是过期回复,直接返回即可。

1
2
3
4
5
else if (reply->term() < m_currentTerm) {
DPrintf("[func -sendAppendEntries rf{%d}] 节点:{%d}的term{%d}<rf{%d}的term{%d}\n", m_me, server, reply->term(),
m_me, m_currentTerm);
return ok;
}

如果reply->success()不为真:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
if (!reply->success()) {
// 日志不匹配,正常来说就是index要往前-1,既然能到这里,第一个日志(idnex =
// 1)发送后肯定是匹配的,因此不用考虑变成负数 因为真正的环境不会知道是服务器宕机还是发生网络分区了
if (reply->updatenextindex() != -100) { // -100说明对方的任期号比自己大,在前面已经处理过了(转为follower)
// todo:待总结,就算term匹配,失败的时候nextIndex也不是照单全收的,因为如果发生rpc延迟,leader的term可能从不符合term要求
// 变得符合term要求
// 但是不能直接赋值reply.UpdateNextIndex
DPrintf("[func -sendAppendEntries rf{%d}] 返回的日志term相等,但是不匹配,回缩nextIndex[%d]:{%d}\n", m_me,
server, reply->updatenextindex());

// 更新server下次要更新的index
m_nextIndex[server] = reply->updatenextindex(); // 失败是不更新matchIndex的
}
// 怎么越写越感觉rf.nextIndex数组是冗余的呢,看下论文fig2,其实不是冗余的
}

否则说明更新成功,同时更新server的m_matchIndex和m_nextIndex,并将appendNums所对应的值加一。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
else {
*appendNums = *appendNums + 1;
DPrintf("---------------------------tmp------------------------- 節點{%d}返回true,當前*appendNums{%d}", server,
*appendNums);
// rf.matchIndex[server] = len(args.Entries) //只要返回一个响应就对其matchIndex应该对其做出反应,
// 但是这么修改是有问题的,如果对某个消息发送了多遍(心跳时就会再发送),那么一条消息会导致n次上涨
m_matchIndex[server] = std::max(m_matchIndex[server], args->prevlogindex() + args->entries_size());
m_nextIndex[server] = m_matchIndex[server] + 1;
int lastLogIndex = getLastLogIndex();

myAssert(m_nextIndex[server] <= lastLogIndex + 1,
format("error msg:rf.nextIndex[%d] > lastLogIndex+1, len(rf.logs) = %d lastLogIndex{%d} = %d", server,
m_logs.size(), server, lastLogIndex));
if (*appendNums >= 1 + m_peers.size() / 2) {
// 可以commit了
// 两种方法保证幂等性,1.赋值为0 2.上面≥改为==

*appendNums = 0;
// todo https://578223592-laughing-halibut-wxvpggvw69qh99q4.github.dev/ 不断遍历来统计rf.commitIndex
// 改了好久!!!!!
// leader只有在当前term有日志提交的时候才更新commitIndex,因为raft无法保证之前term的Index是否提交
// 只有当前term有日志提交,之前term的log才可以被提交,只有这样才能保证“领导人完备性{当选领导人的节点拥有之前被提交的所有log,当然也可能有一些没有被提交的}”
// rf.leaderUpdateCommitIndex()
if (args->entries_size() > 0 && args->entries(args->entries_size() - 1).logterm() == m_currentTerm) {
DPrintf(
"---------------------------tmp------------------------- 当前term有log成功提交,更新leader的m_commitIndex "
"from{%d} to{%d}",
m_commitIndex, args->prevlogindex() + args->entries_size());

m_commitIndex = std::max(m_commitIndex, args->prevlogindex() + args->entries_size());
}
myAssert(m_commitIndex <= lastLogIndex,
format("[func-sendAppendEntries,rf{%d}] lastLogIndex:%d rf.commitIndex:%d\n", m_me, lastLogIndex,
m_commitIndex));
// fmt.Printf("[func-sendAppendEntries,rf{%v}] len(rf.logs):%v rf.commitIndex:%v\n", rf.me, len(rf.logs),
// rf.commitIndex)
}

Raft::sendRequestVote()函数

函数总览:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
bool Raft::sendRequestVote(int server, std::shared_ptr<mprrpc::RequestVoteArgs> args, std::shared_ptr<mprrpc::RequestVoteReply> reply,
std::shared_ptr<int> votedNum) {


bool ok = m_peers[server]->RequestVote(args.get(),reply.get());

if (!ok) {
return ok;//rpc通信失败就立即返回,避免资源消耗
}

lock_guard<mutex> lg(m_mtx);
if(reply->term() > m_currentTerm){
//回复的term比自己大,说明自己落后了,那么就更新自己的状态并且退出
m_status = Follower; //三变:身份,term,和投票
m_currentTerm = reply->term();
m_votedFor = -1; //term更新了,那么这个term自己肯定没投过票,为-1
persist(); //持久化
return true;
} else if ( reply->term() < m_currentTerm ) {
//回复的term比自己的term小,不应该出现这种情况
return true;
}

if(!reply->votegranted()){ //这个节点因为某些原因没给自己投票,没啥好说的,结束本函数
return true;
}
//给自己投票了
*votedNum = *votedNum + 1; //voteNum多一个
if (*votedNum >= m_peers.size()/2+1) {
//变成leader
*votedNum = 0; //重置voteDNum,如果不重置,那么就会变成leader很多次,是没有必要的,甚至是错误的!!!

// 第一次变成leader,初始化状态和nextIndex、matchIndex
m_status = Leader;
int lastLogIndex = getLastLogIndex();
for (int i = 0; i <m_nextIndex.size() ; i++) {
m_nextIndex[i] = lastLogIndex + 1 ;//有效下标从1开始,因此要+1
m_matchIndex[i] = 0; //每换一个领导都是从0开始,见论文的fig2
}
std::thread t(&Raft::doHeartBeat, this); //马上向其他节点宣告自己就是leader
t.detach();

persist();
}
return true;
}

首先发送投票请求,如果失败直接返回:

1
2
3
4
5
6
7

bool ok = m_peers[server]->RequestVote(args.get(), reply.get());


if (!ok) {
return ok;
}

如果成功,加锁后对reply进行处理:

1
2
3
4
5
6
7
8
9
10
std::lock_guard<std::mutex> lg(m_mtx);
if (reply->term() > m_currentTerm) { // 对方任期号比自己大,说明自己应该转变为follower
m_status = Follower; // 三变:身份,term,和投票
m_currentTerm = reply->term();
m_votedFor = -1;
persist();
return true;
} else if (reply->term() < m_currentTerm) { // 对方任期号比自己小,说明是过期的回复
return true;
}

如果以上两种情况都不满足,那么就是合法的投票回复:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
 if (!reply->votegranted()) {	// 如果对方没有给自己投票,直接返回
return true;
}

*votedNum = *votedNum + 1; // 投票数 + 1
if (*votedNum >= m_peers.size() / 2 + 1) { // 若得票已经大于一半,转变为leader
// 变成leader
*votedNum = 0;
if (m_status == Leader) {
// 如果已经是leader了,那么是就是了,不会进行下一步处理了k
myAssert(false,
format("[func-sendRequestVote-rf{%d}] term:{%d} 同一个term当两次领导,error", m_me, m_currentTerm));
}
// 之前不是leader,现在需要变为leader
m_status = Leader;


int lastLogIndex = getLastLogIndex();
for (int i = 0; i < m_nextIndex.size(); i++) {
m_nextIndex[i] = lastLogIndex + 1; // 有效下标从1开始,因此要+1
m_matchIndex[i] = 0; // 每换一个领导都是从0开始,见fig2
}
std::thread t(&Raft::doHeartBeat, this); // 马上向其他节点宣告自己就是leader
t.detach();

persist();
}

Raft::RequestVote()函数

函数总览:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
void Raft::RequestVote( const mprrpc::RequestVoteArgs *args, mprrpc::RequestVoteReply *reply) {
lock_guard<mutex> lg(m_mtx);

Defer ec1([this]() -> void { //应该先持久化,再撤销lock,因此这个写在lock后面
this->persist();
});
// 对args的term的三种情况分别进行处理,大于小于等于自己的term都是不同的处理
// reason: 出现网络分区,该竞选者已经OutOfDate(过时)
if (args->term() < m_currentTerm) { // 为什么不是小于等于?
reply->set_term(m_currentTerm);
reply->set_votestate(Expire);
reply->set_votegranted(false);
return;
}
// 论文fig2:右下角,如果任何时候rpc请求或者响应的term大于自己的term,更新term,并变成follower
if (args->term() > m_currentTerm) {

m_status = Follower;
m_currentTerm = args->term();
m_votedFor = -1;

// 重置定时器:收到leader的ae,开始选举,透出票
// 这时候更新了term之后,votedFor也要置为-1
}

// 现在节点任期都是相同的(任期小的也已经更新到新的args的term了)
// 要检查log的term和index是不是匹配的了
int lastLogTerm = getLastLogIndex();
//只有没投票,且candidate的日志的新的程度 ≥ 接受者的日志新的程度 才会授票
if (!UpToDate(args->lastlogindex(), args->lastlogterm())) {

//日志太旧了
reply->set_term(m_currentTerm);
reply->set_votestate(Voted);
reply->set_votegranted(false);
return;
}

// 当因为网络质量不好导致的请求丢失重发就有可能!!!!
// 因此需要避免重复投票
if (m_votedFor != -1 && m_votedFor != args->candidateid()) {
reply->set_term(m_currentTerm);
reply->set_votestate(Voted);
reply->set_votegranted(false);
return;
} else {
//同意投票
m_votedFor = args->candidateid();
m_lastResetElectionTime = now(); //认为必须要在投出票的时候才重置定时器,
reply->set_term(m_currentTerm);
reply->set_votestate(Normal);
reply->set_votegranted(true);
return;
}
}

日志复制/心跳

img

可以从流程图看到,函数实现上我尽量将心跳日志复制的流程统一,方便理解和后期统一修改理解AppendEntry 相关内容,snapshot的逻辑是类似的。

leaderHearBeatTicker:负责查看是否该发送心跳了,如果该发起就执行doHeartBeat。

doHeartBeat:实际发送心跳,判断到底是构造需要发送的rpc,并多线程调用sendRequestVote处理rpc及其相应。

sendAppendEntries:负责发送日志的RPC,在发送完rpc后还需要负责接收并处理对端发送回来的响应。

leaderSendSnapShot:负责发送快照的RPC,在发送完rpc后还需要负责接收并处理对端发送回来的响应。

AppendEntries:接收leader发来的日志请求,主要检验用于检查当前日志是否匹配并同步leader的日志到本机。

InstallSnapshot:接收leader发来的快照请求,同步快照到本机。

leaderHearBeatTicker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 负责查看是否该发送心跳了,如果该发起就执行doHeartBeat
void Raft::leaderHearBeatTicker() {
while (true) {
auto nowTime = now();
m_mtx.lock();
auto suitableSleepTime = std::chrono::milliseconds(HeartBeatTimeout) + m_lastResetHearBeatTime - nowTime;
m_mtx.unlock();
if (suitableSleepTime.count() < 1) {
suitableSleepTime = std::chrono::milliseconds(1);
}
std::this_thread::sleep_for(suitableSleepTime);
if ((m_lastResetHearBeatTime - nowTime).count() > 0) { //说明睡眠的这段时间有重置定时器,那么就没有超时,再次睡眠
continue;
}
doHeartBeat();
}
}

其基本逻辑和选举定时器electionTimeOutTicker一模一样,不一样之处在于设置的休眠时间不同,这里是根据HeartBeatTimeout来设置,而electionTimeOutTicker中是根据getRandomizedElectionTimeout() 设置。

doHeartBeat

这里目前逻辑写的不统一,发送快照leaderSendSnapShot和发送日志sendAppendEntries的rpc值的构造没有统一,且写在一坨。

可以抽离出来。目前先将就,关注主要逻辑。

带注释版:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
// 发送心跳,判断到底是构造需要发送的rpc,并多线程调用sendRequestVote处理rpc及其相应
void Raft::doHeartBeat() {
std::lock_guard<mutex> g(m_mtx);
if (m_status == Leader) {
auto appendNums = std::make_shared<int>(1); //正确返回的节点的数量
//对Follower(除了自己外的所有节点发送AE)
for (int i = 0; i < m_peers.size(); i++) {
if(i == m_me){ //不对自己发送AE
continue;
}
//日志压缩加入后要判断是发送快照还是发送AE
if (m_nextIndex[i] <= m_lastSnapshotIncludeIndex) {
// 要发送的日志已经被做成快照,必须发送快照了
std::thread t(&Raft::leaderSendSnapShot, this, i);
t.detach();
continue;
}
// 发送心跳,构造发送值
// 下面都是m_nextIndex[i] > m_lastSnapshotIncludeIndex了
int preLogIndex = -1;
int PrevLogTerm = -1;
getPrevLogInfo(i, &preLogIndex, &PrevLogTerm); //获取本次发送的一系列日志的上一条日志的信息,以判断是否匹配
std::shared_ptr<mprrpc::AppendEntriesArgs> appendEntriesArgs = std::make_shared<mprrpc::AppendEntriesArgs>();
appendEntriesArgs->set_term(m_currentTerm);
appendEntriesArgs->set_leaderid(m_me);
appendEntriesArgs->set_prevlogindex(preLogIndex);
appendEntriesArgs->set_prevlogterm(PrevLogTerm);
appendEntriesArgs->clear_entries();
appendEntriesArgs->set_leadercommit(m_commitIndex);
// 作用是携带上prelogIndex的下一条日志及其之后的所有日志
//leader对每个节点发送的日志长短不一,但是都保证从prevIndex发送直到最后
// 注意这个preLogIndex和PrevLogTerm是对应follower的index和term
if (preLogIndex != m_lastSnapshotIncludeIndex) { // follower的preLogIndex > leader快照的最后一条日志index,那么发送从preindex + 1一直到最后
for (int j = getSlicesIndexFromLogIndex(preLogIndex) + 1; j < m_logs.size(); ++j) {
mprrpc::LogEntry *sendEntryPtr = appendEntriesArgs->add_entries();
*sendEntryPtr = m_logs[j];
}
} else { // follower的preLogIndex == leader快照的最后一条日志index,等于要把m_logs的日志全发过去
for (const auto& item: m_logs) {
mprrpc::LogEntry *sendEntryPtr = appendEntriesArgs->add_entries();
*sendEntryPtr = item;
}
}
int lastLogIndex = getLastLogIndex();
//初始化返回值
const std::shared_ptr<mprrpc::AppendEntriesReply> appendEntriesReply = std::make_shared<mprrpc::AppendEntriesReply>();

std::thread t(&Raft::sendAppendEntries, this, i, appendEntriesArgs, appendEntriesReply,
appendNums); // 创建新线程并执行b函数,并传递参数
t.detach();
}
m_lastResetHearBeatTime = now(); //leader发送心跳,重置心跳时间,
}
}

加速日志匹配

​ 这部分在AppendEntries 函数里,涉及代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 不匹配,不匹配不是一个一个往前,而是有优化加速
// PrevLogIndex 长度合适,但是不匹配,因此往前寻找 矛盾的term的第一个元素
// 为什么该term的日志都是矛盾的呢?也不一定都是矛盾的,只是这么优化减少rpc而已
// ?什么时候term会矛盾呢?很多情况,比如leader接收了日志之后马上就崩溃等等
reply->set_updatenextindex(args->prevlogindex());

for (int index = args->prevlogindex(); index >= m_lastSnapshotIncludeIndex; --index) {
if (getLogTermFromLogIndex(index) != getLogTermFromLogIndex(args->prevlogindex())) {
reply->set_updatenextindex(index + 1);
break;
}
}

reply->set_success(false);
reply->set_term(m_currentTerm);

return;

如果日志不匹配的话可以一个一个往前的倒退。但是这样的话可能会设计很多个rpc之后才能找到匹配的日志,那么就一次多倒退几个数。

倒退几个呢?这里认为如果某一个日志不匹配,那么这一个日志所在的term的所有日志大概率都不匹配,那么就倒退到 最后一个日志所在的term的最后那个命令。

持久化

持久化就是把不能丢失的数据保存到磁盘。

持久化的内容为两部分:

  1. raft节点的部分信息;
    1. m_currentTerm :当前节点的Term,避免重复到一个Term,可能会遇到重复投票等问题。
    2. m_votedFor :当前Term给谁投过票,避免故障后重复投票。
    3. m_logs :raft节点保存的全部的日志信息。
  2. kvDb的快照
    1. m_lastSnapshotIncludeIndex :快照的信息,快照最新包含哪个日志Index
    2. m_lastSnapshotIncludeTerm :快照的信息,快照最新包含哪个日志Term,与m_lastSnapshotIncludeIndex 是对应的。

不妨想一想,其他的信息为什么不用持久化,比如说:身份、commitIndex、applyIndex等等。

applyIndex不持久化是经典raft的实现,在一些工业实现上可能会优化,从而持久化。

即applyIndex不持久化不会影响“共识”。

Snapshot是kvDb的快照,也可以看成是日志,因此:全部的日志 = m_logs + snapshot

因为Snapshot是kvDB生成的,kvDB肯定不知道raft的存在,而什么term、什么日志Index都是raft才有的概念,因此snapshot中肯定没有term和index信息。

所以需要raft自己来保存这些信息。

故,快照与m_logs联合起来理解即可。

为什么要持久化这些内容?

两部分原因:共识安全、优化。

除了snapshot相关的部分,其他部分都是为了共识安全。

而snapshot是因为日志一个一个的叠加,会导致最后的存储非常大,因此使用snapshot来压缩日志。

不严谨的一种理解方式:

为什么snashot可以压缩日志?

日志是追加写的,对于一个变量的重复修改可能会重复保存,理论上对一个变量的反复修改会导致日志不断增大。

而snapshot是原地写,即只保存一个变量最后的值,自然所需要的空间就小了。

什么时候持久化

需要持久化的内容发送改变的时候就要注意持久化。

比如term 增加,日志增加等等。

具体的可以查看代码仓库中的void Raft::persist() 相关内容。

谁来调用持久化

谁来调用都可以,只要能保证需要持久化的内容能正确持久化。

仓库代码中选择的是raft类自己来完成持久化。因为raft类最方便感知自己的term之类的信息有没有变化。

注意,虽然持久化很耗时,但是持久化这些内容的时候不要放开锁,以防其他线程改变了这些值,导致其它异常。

具体怎么实现持久化|使用哪个函数持久化

其实持久化是一个非常难的事情,因为持久化需要考虑:速度、大小、二进制安全。

因此仓库实现目前采用的是使用boost库中的持久化实现,将需要持久化的数据序列化转成std::string 类型再写入磁盘。

当然其他的序列化方式也少可行的。

可以看到这一块还是有优化空间的,因此可以尝试对这里优化优化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
std::string Raft::persistData() {
BoostPersistRaftNode boostPersistRaftNode;
boostPersistRaftNode.m_currentTerm = m_currentTerm;
boostPersistRaftNode.m_votedFor = m_votedFor;
boostPersistRaftNode.m_lastSnapshotIncludeIndex = m_lastSnapshotIncludeIndex;
boostPersistRaftNode.m_lastSnapshotIncludeTerm = m_lastSnapshotIncludeTerm;
for (auto &item: m_logs) {
boostPersistRaftNode.m_logs.push_back(item.SerializeAsString());
}

std::stringstream ss;
boost::archive::text_oarchive oa(ss);
oa<<boostPersistRaftNode;
return ss.str();
}

kvServer怎么处理外部请求

img

图中是raftServer,这里叫成kvServer,是一样的。

kvServer其实是个中间组件,负责沟通kvDB和raft节点。

那么外部请求怎么打进来呢?

哦吼,当然是Server来负责呀,加入后变成了:

img

kvServer怎么和上层kvDB沟通,怎么和下层raft节点沟通?

通过这两个成员变量实现:

1
2
std::shared_ptr<LockQueue<ApplyMsg> > applyChan; //kvServer和raft节点的通信管道
std::unordered_map<std::string, std::string> m_kvDB; //kvDB,用unordered_map来替代

kvDB:使用的是unordered_map来代替上层的kvDB,因此没啥好说的.

raft节点:其中LockQueue 是一个并发安全的队列,这种方式其实是模仿的go中的channel机制。

在raft类中这里可以看到,raft类中也拥有一个applyChan,kvSever和raft类都持有同一个applyChan,来完成相互的通信。

从上面的结构图中可以看到kvServer负责与外部clerk通信。

那么一个外部请求的处理可以简单的看成两步:

  1. 接收外部请求。
  2. 本机内部与raft和kvDB协商如何处理该请求。
  3. 返回外部响应。

接收与响应外部请求

对于1和3,请求和返回的操作我们可以通过http、自定义协议等等方式实现,但是既然我们已经写出了rpc通信的一个简单的实现(源代码可见:这里),那就使用rpc来实现吧。

而且rpc可以直接完成请求和响应这一步,后面就不用考虑外部通信的问题了,好好处理好本机的流程即可。

相关函数是:

1
2
3
4
5
6
7
8
9
10
void PutAppend(google::protobuf::RpcController *controller,
const ::raftKVRpcProctoc::PutAppendArgs *request,
::raftKVRpcProctoc::PutAppendReply *response,
::google::protobuf::Closure *done) override;

void Get(google::protobuf::RpcController *controller,
const ::raftKVRpcProctoc::GetArgs *request,
::raftKVRpcProctoc::GetReply *response,
::google::protobuf::Closure *done) override;

见名知意,请求分成两种:get和put(也就是set)。

如果是putAppend,clerk中就调用PutAppend 的rpc。

如果是Get,clerk中就调用Get 的rpc。

与raft节点沟通

在正式开始之前我们必须要先了解 线性一致性 的相关概念。建议阅读线性一致性.

这里讲一讲raft如何做的。

每个 client 都需要一个唯一的标识符,它的每个不同命令需要有一个顺序递增的 commandId,clientId 和这个 commandId,clientId 可以唯一确定一个不同的命令,从而使得各个 raft 节点可以记录保存各命令是否已应用以及应用以后的结果。

即对于每个clinet,都有一个唯一标识,对于每个client,只执行递增的命令。

在保证线性一致性的情况下如何写kv

具体的思想在上面已经讲过,这里展示一下关键的代码实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//  chForRaftIndex是一个有锁队列
if (!chForRaftIndex->timeOutPop(CONSENSUS_TIMEOUT, &raftCommitOp)) {//通过超时pop来限定命令执行时间,如果超过时间还没拿到消息说明命令执行超时了。

if (ifRequestDuplicate(op.ClientId, op.RequestId)) {
reply->set_err(OK);// 超时了,但因为是重复的请求,返回ok,实际上就算没有超时,在真正执行的时候也要判断是否重复
} else {
reply->set_err(ErrWrongLeader); ///这里返回这个的目的让clerk重新尝试
}
} else {
//没超时,命令可能真正的在raft集群执行成功了。
if (raftCommitOp.ClientId == op.ClientId &&
raftCommitOp.RequestId == op.RequestId) { //可能发生leader的变更导致日志被覆盖,因此必须检查
reply->set_err(OK);
} else {
reply->set_err(ErrWrongLeader);
}
}

需要注意的是,这里的命令执行成功是指:本条命令在整个raft集群达到同步的状态,而不是一台机器上的raft保存了该命令。

全部代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
// 处理来自clerk的Get RPC
void KvServer::Get(const raftKVRpcProctoc::GetArgs *args, raftKVRpcProctoc::GetReply *reply) {
Op op;
op.Operation = "Get";
op.Key = args->key();
op.Value = "";
op.ClientId = args->clientid();
op.RequestId = args->requestid();


int raftIndex = -1;
int _ = -1;
bool isLeader = false;
m_raftNode->Start(op, &raftIndex, &_, &isLeader);//raftIndex:raft预计的logIndex ,虽然是预计,但是正确情况下是准确的,op的具体内容对raft来说 是隔离的

if (!isLeader) {
reply->set_err(ErrWrongLeader);
return;
}


// create waitForCh
m_mtx.lock();

if (waitApplyCh.find(raftIndex) == waitApplyCh.end()) {
waitApplyCh.insert(std::make_pair(raftIndex, new LockQueue<Op>()));
}
auto chForRaftIndex = waitApplyCh[raftIndex];

m_mtx.unlock(); //直接解锁,等待任务执行完成,不能一直拿锁等待


// timeout
Op raftCommitOp;

if (!chForRaftIndex->timeOutPop(CONSENSUS_TIMEOUT, &raftCommitOp)) {
int _ = -1;
bool isLeader = false;
m_raftNode->GetState(&_, &isLeader);

if (ifRequestDuplicate(op.ClientId, op.RequestId) && isLeader) {
//如果超时,代表raft集群不保证已经commitIndex该日志,但是如果是已经提交过的get请求,是可以再执行的。
// 不会违反线性一致性
std::string value;
bool exist = false;
ExecuteGetOpOnKVDB(op, &value, &exist);
if (exist) {
reply->set_err(OK);
reply->set_value(value);
} else {
reply->set_err(ErrNoKey);
reply->set_value("");
}
} else {
reply->set_err(ErrWrongLeader); //返回这个,其实就是让clerk换一个节点重试
}
} else {
//raft已经提交了该command(op),可以正式开始执行了
// DPrintf("[WaitChanGetRaftApplyMessage<--]Server %d , get Command <-- Index:%d , ClientId %d, RequestId %d, Opreation %v, Key :%v, Value :%v", kv.me, raftIndex, op.ClientId, op.RequestId, op.Operation, op.Key, op.Value)
//todo 这里还要再次检验的原因:感觉不用检验,因为leader只要正确的提交了,那么这些肯定是符合的
if (raftCommitOp.ClientId == op.ClientId && raftCommitOp.RequestId == op.RequestId) {
std::string value;
bool exist = false;
ExecuteGetOpOnKVDB(op, &value, &exist);
if (exist) {
reply->set_err(OK);
reply->set_value(value);
} else {
reply->set_err(ErrNoKey);
reply->set_value("");
}
} else {
reply->set_err(ErrWrongLeader);
}
}
m_mtx.lock(); //todo 這個可以先弄一個defer,因爲刪除優先級並不高,先把rpc發回去更加重要
auto tmp = waitApplyCh[raftIndex];
waitApplyCh.erase(raftIndex);
delete tmp;
m_mtx.unlock();
}

在保证线性一致性的情况下如何读kv

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// 同样使用有锁队列判断超时
if (!chForRaftIndex->timeOutPop(CONSENSUS_TIMEOUT, &raftCommitOp)) {
int _ = -1;
bool isLeader = false;
m_raftNode->GetState(&_, &isLeader);

if (ifRequestDuplicate(op.ClientId, op.RequestId) && isLeader) {
//如果超时,代表raft集群不保证已经commitIndex该日志,但是如果是已经提交过的get请求,是可以再执行的。
// 不会违反线性一致性
std::string value;
bool exist = false;
ExecuteGetOpOnKVDB(op, &value, &exist);
if (exist) {
reply->set_err(OK);
reply->set_value(value);
} else {
reply->set_err(ErrNoKey);
reply->set_value("");

}
} else {
reply->set_err(ErrWrongLeader); //返回这个,其实就是让clerk换一个节点重试
}
} else {
//raft已经提交了该command(op),可以正式开始执行了
//todo 这里感觉不用检验,因为leader只要正确的提交了,那么这些肯定是符合的
if (raftCommitOp.ClientId == op.ClientId && raftCommitOp.RequestId == op.RequestId) {
std::string value;
bool exist = false;
ExecuteGetOpOnKVDB(op, &value, &exist);
if (exist) {
reply->set_err(OK);
reply->set_value(value);
} else {
reply->set_err(ErrNoKey);
reply->set_value("");
}
} else {

}

个人感觉读与写不同的是,读就算操作过也可以重复执行,不会违反线性一致性。

因为毕竟不会改写数据库本身的内容。

以GET请求为例看一看流程

以一个读操作为例看一看流程:

首先外部RPC调用GET,

1
2
3
4
5
void KvServer::Get(google::protobuf::RpcController *controller, const ::raftKVRpcProctoc::GetArgs *request,
::raftKVRpcProctoc::GetReply *response, ::google::protobuf::Closure *done) {
KvServer::Get(request,response);
done->Run();
}

然后是根据请求参数生成Op,生成Op是因为raft和raftServer沟通用的是类似于go中的channel的机制,然后向下执行即可。

注意:在这个过程中需要判断当前节点是不是leader,如果不是leader的话就返回ErrWrongLeader ,让其他clerk换一个节点尝试。

RPC如何实现调用

这里以Raft类为例讲解下如何使用rpc远程调用的。

  1. 写protoc文件,并生成对应的文件,Raft类使用的protoc文件和生成的文件见:这里

  2. 继承生成的文件的类 class Raft : public raftRpcProctoc::raftRpc

  3. 重写rpc方法即可:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 重写基类方法,因为rpc远程调用真正调用的是这个方法
//序列化,反序列化等操作rpc框架都已经做完了,因此这里只需要获取值然后真正调用本地方法即可。
void AppendEntries(google::protobuf::RpcController *controller,
const ::raftRpcProctoc::AppendEntriesArgs *request,
::raftRpcProctoc::AppendEntriesReply *response,
::google::protobuf::Closure *done) override;
void InstallSnapshot(google::protobuf::RpcController *controller,
const ::raftRpcProctoc::InstallSnapshotRequest *request,
::raftRpcProctoc::InstallSnapshotResponse *response,
::google::protobuf::Closure *done) override;
void RequestVote(google::protobuf::RpcController *controller,
const ::raftRpcProctoc::RequestVoteArgs *request,
::raftRpcProctoc::RequestVoteReply *response,
::google::protobuf::Closure *done) override;

一些问题思考

m_nextIndex和m_matchIndex

m_nextIndex 保存leader下一次应该从哪一个日志开始发送给follower;m_matchIndex表示follower在哪一个日志是已经匹配了的(由于日志安全性,某一个日志匹配,那么这个日志及其之前的日志都是匹配的)。

m_nextIndexm_matchIndex 是否有冗余,即使用一个m_nextIndex 可以吗?

显然是不行的,m_nextIndex 的作用是用来寻找m_matchIndex ,不能直接取代。我们可以从这两个变量的变化看,在当选leader后,m_nextIndex 初始化为最新日志index,m_matchIndex 初始化为0,如果日志不匹配,那么m_nextIndex 就会不断的缩减,直到遇到匹配的日志,这时候m_nextIndex 应该一直为m_matchIndex+1

如果一直不发生故障,那么后期m_nextIndex就没有太大作用了,但是raft考虑需要考虑故障的情况,因此需要使用两个变量。

锁,能否在其中的某个地方提前放锁,或者使用多把锁来尝试提升性能?

多线程发送,能不能直接在doHeartBeat或者doElection函数里面直接一个一个发送消息呢?

可以有的优化空间

  1. 线程池,而不是每次rpc都不断地创建新线程
  2. 日志
  3. 从节点读取日志

面试问题

Raft算法的基本原理

回答要点:解释Raft算法的基本工作原理,包括领导者选举、日志复制和安全性保障。

示例回答

Raft算法是一种分布式算法,旨在解决分布式系统中的一致性问题,相对于Paxos算法而言更易于理解和实现。

Raft算法将系统中的所有节点分为三类角色:领导者(leader)、跟随者(follower)和候选人(candidate)。其选举机制确保系统中的一个节点被选为领导者(leader),领导者负责处理客户端的请求,并将更新复制到其他节点。

Raft算法的基本原理包括以下几个关键步骤:

  1. 领导者选举(Leader Election):在系统启动时或者当前领导者失效时,节点会发起选举过程。节点会在一个随机的超时时间内等待收到来自其他节点的心跳消息。如果在超时时间内没有收到心跳消息,节点就会成为候选人并发起选举。候选人向其他节点发送投票请求,并在得到大多数节点的投票后成为新的领导者。
  2. 日志复制(Log Replication):一旦领导者选举完成,新的领导者就会接收客户端的请求,并将更新的日志条目复制到其他节点。当大多数节点都成功复制了这些日志条目时,更新被认为是提交的,并且可以应用到节点的状态机中。
  3. 安全性(Safety):Raft算法通过确保在选举中只有一个领导者(单一领导者)、大多数节点的一致性以及只有领导者可以处理客户端请求等方式保证分布式系统的安全性。

通过以上机制,Raft算法确保了分布式系统中的一致性、可用性和分区容错性。

注意:如果这么回答如果面试官懂一些分布式算法的话,那么后续可能会提问Raft与其他分布式算法的关系。

领导者选举

回答要点:这里最好结合前面几个章节的流程图,结合自己理解回答。

示例回答

img

Raft中的领导者选举过程如下:

  1. 候选人状态(Candidate State)
  2. 节点在没有检测到领导者的情况下成为候选人,并将自己的任期编号(term)增加1。
  3. 候选人向其他节点发送投票请求,并请求其他节点投票支持自己成为新的领导者。
  4. 如果候选人在规定时间内收到了大多数节点的选票支持(即获得了大多数节点的投票),则成为新的领导者。
  5. 选举过程(Election Process)
  6. 在发起选举后,候选人会等待一定的随机时间(选举超时时间)来收集其他节点的投票。
  7. 如果在这个超时时间内没有收到大多数节点的选票,候选人将会重新开始一个新的选举周期,增加自己的任期编号,并再次发起选举
  8. 投票过程(Voting Process)
  9. 其他节点收到来自候选人的投票请求后,会检查自己的当前任期编号。如果候选人的任期编号比自己的大,则投票支持候选人,并更新自己的任期编号为候选人的任期编号。
  10. 如果其他节点已经投票给了另一个候选人,或者已经投票给了当前领导者,它将拒绝投票。
  11. 领导者选举完成(Leader Election Complete)
  12. 如果候选人收到了大多数节点的投票支持,它就会成为新的领导者。
  13. 新的领导者会开始发送心跳消息以维持其领导地位,并开始进行日志复制操作。

在什么情况下会触发领导者选举?

回答要点:一个节点只要长时间没有收到符合条件的leader发送的心跳就会认为leader掉线,就会发起选举。

示例回答

在Raft算法中,领导者选举会在以下情况下触发:

  1. 当系统启动时,所有节点都处于初始状态,没有领导者。
  2. 当领导者节点因网络分区、宕机或其他原因失效时,导致系统中没有活跃的领导者。
  3. 当节点故障恢复或者被网络分区时,它可能会检测到当前没有领导者,因此会成为候选人并发起选举。

日志复制

Raft是如何通过日志复制来保证数据一致性的?

回答要点:主要是两个机制(特点):

1.Leader Append Entries:领导者追加日志条目,即只有leader可以接受外部请求并将请求打包成日志,并向follower同步自己的日志,这样保证提交过的日志不会被覆盖掉。

2.commit机制,领导者发现大多数节点都已经成功复制了某个日志条目后,该日志条目被视为已经提交,从而保证了数据的一致性。

示例回答:

Raft算法通过日志复制来确保数据一致性。在Raft中,每个节点都维护一个日志(log)来记录状态机中的操作指令。领导者负责接收客户端的写请求,将操作指令追加到自己的日志中,并将这些操作指令发送给其他节点,要求它们复制这些日志条目。

以下是Raft通过日志复制来保证数据一致性的基本流程:

  1. Leader Append Entries(领导者追加日志条目)
  2. 领导者接收到客户端的写请求后,将这些操作指令追加到自己的日志中。
  3. 领导者将这些操作指令组织成一个日志条目(log entry),并向其他节点发送一个追加日志条目的请求(Append Entries RPC)。
  4. Follower Log Replication(跟随者日志复制)
  5. 跟随者节点接收到领导者发送的追加日志条目的请求后,会按照领导者的日志条目顺序将这些日志条目追加到自己的日志中。
  6. 如果跟随者节点成功复制了这些日志条目,则向领导者发送成功响应(Response);如果由于某种原因(例如网络故障)导致复制失败,则向领导者发送失败响应。
  7. Commit(提交)
  8. 当领导者发现大多数节点都已经成功复制了某个日志条目后,该日志条目被视为已经提交。
  9. 领导者将提交的日志条目应用到自己的状态机中,以执行相应的操作指令。

安全性保障

Raft是如何确保安全性的?讨论一致性、可用性和分区容错性之间的权衡。

回答要点 :这里主要是想考察分布式CAP理论的一个关键:CAP中如果发生故障,只能CP和AP二选一,无法满足CAP的三角,而Raft选择的是CP,即满足一致性。

示例回答:

在权衡一致性、可用性和分区容错性时,Raft算法倾向于优先保证一致性和分区容错性。它通过保证大多数节点的确认和限制领导者选举条件来确保一致性,通过选举机制和日志复制来保证分区容错性。同时,Raft也兼顾了系统的可用性,确保在领导者失效后能够快速进行新的领导者选举,并继续提供服务。

选举超时:

什么是选举超时?它的作用是什么?

回答要点: follower和candidate都会有选举超时的机制。

在follower时:选举超时的意义是发起选举,变成candidate;

在candidate时:candidate会选举超时,如果选举成功就会变成leader;如果选举失败就会变成candidate(选举超时)或者follower(发现合适的leader)。那么选举超时的作用就很明显了,防止无止境的等待导致所有人都成不了leader。

拓展:想一想为什么选举超时时间要每次随机设置而不设置成一个固定的值???

示例回答:

选举超时的作用包括:

  1. 触发领导者选举:选举超时用于在当前没有活跃领导者或者领导者失效时触发新的领导者选举。当节点在选举超时时间内没有收到来自当前领导者的心跳消息时,会成为候选人并发起选举过程。
  2. 防止脑裂(Split-Brain):选举超时帮助避免了系统中出现多个领导者的情况,从而避免了脑裂问题的发生。如果系统中的节点在选举超时时间内没有收到来自当前领导者的心跳消息,它们会同时成为候选人并发起选举,但只有一个候选人最终会获得大多数节点的选票,成为新的领导者。
  3. 确保领导者切换的及时性:选举超时可以确保在领导者失效后,系统能够及时地启动新的领导者选举过程,从而减少服务中断的时间,提高系统的可用性。

选举超时的时间是如何设置的?

回答要点:回答要点在上个问题的拓展里面,大家可以先想想。答案是:一个一定范围内的随机值,其要根据心跳时间,rpc延迟,数据操作延迟综合考虑。

范围:一般来说选举超时时间要大于一次完整心跳的日志同步处理时间。

为何随机:选举超时的目的是防止无止境的等待导致所有人都成不了leader,如果超时时间又一样,那么大家又一起选举,又会不断循环,那么一个随机值可以让某些节点早点重新发起选举,防止大家一起选举导致死循环。

示例回答:

选举超时时间的设置通常包括以下考虑因素:

  1. 网络延迟和稳定性:选举超时时间需要足够长以允许节点在正常情况下能够收到来自领导者的心跳消息。考虑到网络的延迟和不稳定性,超时时间应该设置得足够长,以避免因网络延迟而误判领导者失效。
  2. 系统负载和响应速度:选举超时时间也应考虑系统的负载情况和响应速度。如果系统负载较重或者节点的处理速度较慢,可能需要将选举超时时间设置得稍长一些,以允许节点有足够的时间处理收到的消息。
  3. 避免脑裂问题:为了避免系统中出现多个领导者导致的脑裂问题,选举超时时间应该设置得足够随机化,以确保不同节点不会在同一时间内触发选举。

日志条目的提交

Raft中的日志条目是如何提交的?

回答要点: 要半数以上的节点(包括leader)接收了这个日志,那么才能提交(commit),后续才能apply到状态机。

示例回答:

  1. Leader接收客户端请求
    • 当客户端向Raft系统提交请求时,请求会首先发送到Raft集群中的Leader节点。
  2. Leader将请求转换为日志条目
    • Leader将接收到的客户端请求转换为一条日志条目,并附加到其本地日志中。
  3. Leader广播日志条目
    • Leader向其它节点发送包含新日志条目的心跳RPC请求(AppendEntries RPC)。
  4. Follower节点接收并附加日志条目
    • Follower节点接收到Leader的附加日志请求后,将新的日志条目附加到其本地日志中。
  5. Follower节点响应Leader
    • Follower节点在成功附加日志后,向Leader发送成功的响应。
  6. Leader确认提交
    • 当Leader收到大多数节点的附加成功响应时,将日志条目视为已提交。
  7. Leader提交到状态机
    • Leader将已提交的日志条目应用到其状态机中,以执行相应的操作。
  8. Leader通知Followers提交:Leader会通知其它节点已提交的日志索引,以便它们也可以将相应的日志条目提交到其状态机中。
  9. Follower提交到状态机:Follower节点收到Leader的提交通知后,将对应的已提交日志条目应用到其状态机中。

什么条件下才能够提交一个日志条目?

回答要点: 一个很容易疏忽的是必须要本term有新的日志提交才能继续提交日志,这个在前面文章中也提醒过。

示例回答: 略。

Raft如何处理集群拓扑的变更?

回答要点:

示例回答:

  1. 生成配置变更请求: 当需要改变集群的拓扑结构时,例如添加或移除节点,Leader节点会收到来自客户端的配置变更请求。
  2. 将配置变更请求转化为配置变更日志条目: Leader节点将配置变更请求转换为一条特殊的日志条目,即配置变更日志条目。该日志条目包含新的集群配置信息。
  3. 向集群中的节点分发配置变更日志条目: Leader节点通过Raft的日志复制机制将配置变更日志条目发送给所有的Follower节点。
  4. Follower节点接收配置变更日志条目: Follower节点收到配置变更日志条目后,将其附加到本地日志中。
  5. 提交配置变更日志条目: 当配置变更日志条目被大多数节点确认并附加到本地日志后,Leader节点将其提交到状态机,实际应用这个配置变更。
  6. 应用配置变更: Leader节点和所有的Follower节点根据新的配置信息更新其内部状态,以反映集群拓扑结构的变更。这可能涉及到更新节点的角色(Leader、Follower、Candidate)、维护新的节点列表等。

实际应用

Raft算法在实际场景中的应用有哪些?

回答要点: 列举一些常见的使用Raft算法作为底层的即可。

示例回答:

  1. 一些常见的配置中心,为了保证可用会采用Raft,比如zookeeper的底层实现了基于Raft修改的算法,ETCD等。
  2. 一些分布式数据库,比如TIKV

Raft与其他分布式的比较

与Paxos算法相比,Raft有哪些优势和不同之处?

回答要点: Raft相对于Paxos算法来说,更易理解、实现和维护,具有更直观的Leader机制和选举过程。(这个需要大家多了解一下其他分布式算法的设计思想了。)

示例回答:

  1. Leader机制Raft引入了Leader节点,而Paxos中没有Leader节点的概念。Leader节点负责协调和领导整个一致性过程,而Follower节点只需按照Leader的指示执行操作。
  2. 日志复制:在Raft中,所有的日志条目都通过Leader节点进行复制和提交,而Paxos中的日志复制是通过多个角色相互协作完成的。
  3. 角色切换:Raft中Leader节点失效后,集群可以快速选举新的Leader节点,而Paxos中角色的切换较为复杂,需要进行更多的投票和协调。
  4. 更强的可读性:Raft算法更加直观和易于理解,它的设计目标之一就是提供更好的可读性和可理解性,相比之下,Paxos算法相对更加抽象和复杂。

常见问题与挑战:

回答要点: leader的瓶颈(使用多读来解决),节点故障等等

示例回答:

  1. Leader瓶颈:Raft算法中的Leader节点负责所有的客户端请求的处理和日志复制,这可能会成为系统的瓶颈。如果Leader节点负载过重或者发生故障,会导致整个系统的性能下降。
  2. 网络分区:Raft算法需要保证在网络分区情况下的一致性,这可能会导致在网络分区恢复后需要进行数据的合并和冲突解决,增加了一致性维护的复杂性。
  3. 节点故障处理:当节点发生故障时,Raft需要进行Leader的重新选举,这可能导致一段时间内系统的不可用性和性能下降,尤其是在节点频繁发生故障时。
  4. 日志复制延迟:Raft算法要求日志复制必须在大多数节点上完成后才能提交,这可能导致日志复制的延迟,影响系统的实时性能。
  5. 节点动态变更:Raft算法在节点动态加入或退出时需要进行配置变更,这可能会导致系统的不稳定和数据的不一致,需要谨慎处理。
  6. 一致性保证:虽然Raft算法保证了强一致性,但在一些特殊情况下(如网络分区、节点故障等),可能会导致一致性级别的降低或者一致性协议的不满足,需要额外的机制来解决。
  7. 性能调优:在实际应用中,Raft算法需要根据具体的场景进行性能调优,包括调整心跳超时时间、选举超时时间、日志复制的策略等参数,以满足系统的性能需求。

如何处理网络分区的情况?

回答要点: 这个要结合多种情况分析,比如leader宕机、非leader宕机;少数节点分区、多数节点分区。然后这几种情况还可以相互组合,这个的话就要分类讨论了,面试估计是说不完的。

示例回答:

分情况讨论,略。

容错性

Raft算法如何处理节点故障?

回答要点:

和网络分区是一样的,可以看成是网络分区的一种特殊情况,即一个节点自己是一个分区。

此外再加上故障恢复后有哪些数据(日志,投票,term等)是需要持久化的,哪些是不需要的(commitIndex,applyIndex等)。

示例回答:

略。

在集群中的多个节点同时故障时,系统会有什么表现?

回答要点:考虑故障节点的数量,抓住“半数”这个概念。 其他同上面的“分区问题”和“故障问题”。

示例回答: 略。

RPC

你的RPC如何设计的?

回答要点: 这里最好的回答是回答出现有的rpc框架的一些优秀的设计,因为我的rpc实现只是raft-kv的一个配件,所以还是 同步阻塞 的,这里推荐大家看看异步rpc如何实现,也欢迎来改进项目中的RPC实现参与开源:项目地址

列出可以考虑的优化点:异步;rpc长连接短连接的思考;负载均衡;服务治理、服务发现。

示例回答:

负载均衡有没有做?用的什么算法如何考虑的?

回答要点: 1.常见的负载均衡算法及其对比;2.第四层和第七层不同层的负载均衡;3.瘦客户端和胖客户端的不同方式的负载均衡。

示例回答:

最开始实现rpc模块的时候实现过负载均衡算法,当然后面用于raft通信,因为raft通信是所有leader与所有节点都要建立连接,所以后面就没有再用负载均衡了,将这个功能关闭了。

当时实现的负载均衡的算法有:

  1. 轮询算法(Round Robin)
  2. 轮询算法是最简单的负载均衡算法之一,它按照请求的顺序依次将每个请求分配到不同的服务器上。当有新的请求到来时,负载均衡器会依次将请求发送到不同的服务器,直到所有的服务器都被轮询过一遍,然后再从头开始。
  3. 最小连接数算法(Least Connections)
  4. 最小连接数算法会将新的请求分配到当前连接数最少的服务器上,以确保各服务器的负载尽可能均衡。这种算法考虑了服务器的负载情况,优先将请求发送到负载较低的服务器上。
  5. 最少响应时间算法(Least Response Time)
  6. 最少响应时间算法会将请求发送到响应时间最短的服务器上,以保证响应时间的最小化。这种算法通常需要负载均衡器记录每个服务器的响应时间,并动态调整请求的分配策略。
  7. 哈希算法(Hashing)
  8. 哈希算法根据请求的某些属性(如客户端IP地址、URL等)计算哈希值,并将请求发送到对应哈希值的服务器上。这种算法能够确保相同请求始终被发送到同一台服务器上,适用于需要保持会话一致性的场景。
  9. 加权轮询算法(Weighted Round Robin)
  10. 加权轮询算法在轮询算法的基础上引入了权重的概念,不同的服务器具有不同的权重值。根据权重值的不同,负载均衡器会调整请求的分配比例,以实现负载均衡。
  11. 拓展:hash环也是一种重要的负载均衡算法,也可以提及。

服务治理和发现有没有做?怎么做的?

回答要点: 一般是用第三方组件(比如zookeeper)来做,当然raft-kv本身就可以用来做服务治理和服务发现,所以rpc就没有单独做。

示例回答:

你这个RPC框架的序列化和反序列化中protobuf细节有没有了解

回答要点: 头部变长编码+自定义的压缩算法。

这里就可以牵扯到rpc中的编码方法,目前是头部定长4字节,可以优化成一个标志位+变长编码的方式:参与issue链接

示例回答:

大概了解了一下,主要是其头部的变长编码+Google自己实现的一个高效的压缩算法。

测试

在集群数量变多的时候,Raft性能可能会下降,这方面有没有思考过?

回答要点: 允许从follower读;rpc合并等raft落地的框架的优化。

示例回答:

有没有对性能进行过测试?用的什么工具?怎么测试的?

回答要点: perf火焰图。

示例回答:

这里回答一下火焰图的基本使用就差不多了,如果大家没有使用过的话推荐大家去看篇博文入门了解基本操作和原理,我这里给出一个初步的结果,如果只有一个客户端:并发几十,大部分的损耗在rpc这边。多个客户端的结果没有测试。


KV-Server记录
https://gstarmin.github.io/2024/04/28/KV-Server记录/
作者
Starmin
发布于
2024年4月28日
许可协议