KV-Server记录
RPC模块
项目使用到的RPC代码高度依赖于protobuf。RPC
是一种使得分布式系统中的不同模块之间能够透明地进行远程调用的技术,使得开发者可以更方便地构建分布式系统,而不用过多关注底层通信细节,调用另一台机器的方法会表现的像调用本地的方法一样。
那么无论对外表现如何,只要设计多个主机之间的通信,必不可少的就是网络通讯这一步
我们可以看看一次RPC请求到底干了什么?
img
由于底层网络通信框架使用的是运输层协议,只能发送字节流,因此会涉及到对象的序列化/反序列化问题,即上图中所示的黄色部分,而常见的网络数据传输格式包括如下三种:
XML:一种通用和轻量级的数据交换格式语言,是指可扩展标记语言以文本结构进行存储。
JSON:一种通用和轻量级的数据交换格式,也是以文本的结构进行存储,是一种简单的消息格式。JSON
作为数据包格式传输时具有更高的效率,这是因为 JSON 不像 XML
那样需要有严格的闭合标签,这就让有效数据量与总数据包比有着显著的提升,从而减少同等数据流量的情况下网络的传输压力。
Protobuf:是 Google
开发的一种独立和轻量级的数据交换格式,以二进制结构进行存储,用于不同服务之间序列化数据。它是一种轻便高效的结构化数据存储格式,可以用于结构化数据串行化,或者序列化,可用于通讯协议、数据存储等领域的语言无关、平台无关、可扩展的序列化结构数据格式。
而该项目便是使用 Protobuf
来进行消息的序列化和反序列化,同时使用其来实现RPC框架,其底层的通信流程如下图所示:
img
打包流程 :
序列化函数参数得到 argsStr,其长度为 argsSize;
打包 service_name、method_name 和 argsSize 得到 rpcHeader;
序列化 rpcHeader 得到 rpcHeaderStr,其长度为 headerSize;
将 headerSize 存储到数据包的前 4 个字节,后面的依次是 rpcHeaderStr
和 argsStr;
通过网络发送数据包;
解包流程 :
通过网络接收数据包;
首先取出数据包的前 4 个字节,读取出 headerSize 的大小;
从第 5 个字节开始,读取 headerSize 字节大小的数据,即为 rpcHeaderStr
的内容;
反序列化 rpcHeaderStr,得到 service_name、method_name 和
argsSize;
从 4+headerSize 字节开始,读取 argsSize 字节大小的数据,即为 argsStr
的内容;
反序列化 argsStr 得到函数参数 args;
消息内容
raft节点之间rpc通信函数:
1 2 3 4 5 6 7 service raftRpc { rpc AppendEntries(AppendEntriesArgs) returns (AppendEntriesReply) ; rpc InstallSnapshot (InstallSnapshotRequest) returns (InstallSnapshotResponse) ; rpc RequestVote (RequestVoteArgs) returns (RequestVoteReply) ; }
日志实体 :
1 2 3 4 5 6 message LogEntry { bytes Command =1 ; int32 LogTerm =2 ; int32 LogIndex = 3 ; }
RpcArgs内容 :
1 2 3 4 5 6 7 8 9 10 message AppendEntriesArgs { int32 Term =1 ; int32 LeaderId =2 ; int32 PrevLogIndex =3 ; int32 PrevLogTerm =4 ; repeated LogEntry Entries = 5 ; int32 LeaderCommit = 6 ; }
RpcReply内容 :
1 2 3 4 5 6 7 message AppendEntriesReply { int32 Term =1 ; bool Success =2 ; int32 UpdateNextIndex = 3 ; int32 AppState =4 ; }
请求投票参数 :
1 2 3 4 5 6 message RequestVoteArgs { int32 Term =1 ; int32 CandidateId =2 ; int32 LastLogIndex =3 ; int32 LastLogTerm =4 ; }
请求投票回复 :
1 2 3 4 5 6 7 8 9 message RequestVoteReply { int32 Term =1 ; bool VoteGranted =2 ; int32 VoteState =3 ; }
发起快照请求 :
1 2 3 4 5 6 7 message InstallSnapshotRequest { int32 LeaderId =1 ; int32 Term =2 ; int32 LastSnapShotIncludeIndex =3 ; int32 LastSnapShotIncludeTerm =4 ; bytes Data =5 ; }
发起快照回复 :
1 2 3 4 message InstallSnapshotResponse { int32 Term = 1 ; }
Raft类主要内容
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 class Raft : {private : std::mutex m_mtx; std::vector<std::shared_ptr< RaftRpc >> m_peers; std::shared_ptr<Persister> m_persister; int m_me; int m_currentTerm; int m_votedFor; std::vector<mprrpc:: LogEntry> m_logs; int m_commitIndex; int m_lastApplied; std::vector<int > m_nextIndex; std::vector<int > m_matchIndex; enum Status { Follower, Candidate, Leader }; Status m_status; std::shared_ptr<LockQueue<ApplyMsg>> applyChan; std::chrono::_V2::system_clock::time_point m_lastResetElectionTime; std::chrono::_V2::system_clock::time_point m_lastResetHearBeatTime; int m_lastSnapshotIncludeIndex; int m_lastSnapshotIncludeTerm;public : void AppendEntries1 (const mprrpc::AppendEntriesArgs *args, mprrpc::AppendEntriesReply *reply) ; void applierTicker () ; bool CondInstallSnapshot (int lastIncludedTerm, int lastIncludedIndex, std::string snapshot) ; void doElection () ; void doHeartBeat () ; void electionTimeOutTicker () ; std::vector<ApplyMsg> getApplyLogs () ; int getNewCommandIndex () ; void getPrevLogInfo (int server, int *preIndex, int *preTerm) ; void GetState (int *term, bool *isLeader) ; void InstallSnapshot ( const mprrpc::InstallSnapshotRequest *args, mprrpc::InstallSnapshotResponse *reply) ; void leaderHearBeatTicker () ; void leaderSendSnapShot (int server) ; void leaderUpdateCommitIndex () ; bool matchLog (int logIndex, int logTerm) ; void persist () ; void RequestVote (const mprrpc::RequestVoteArgs *args, mprrpc::RequestVoteReply *reply) ; bool UpToDate (int index, int term) ; int getLastLogIndex () ; void getLastLogIndexAndTerm (int *lastLogIndex, int *lastLogTerm) ; int getLogTermFromLogIndex (int logIndex) ; int GetRaftStateSize () ; int getSlicesIndexFromLogIndex (int logIndex) ; bool sendRequestVote (int server , std::shared_ptr<mprrpc::RequestVoteArgs> args , std::shared_ptr<mprrpc::RequestVoteReply> reply, std::shared_ptr<int > votedNum) ; bool sendAppendEntries (int server ,std::shared_ptr<mprrpc::AppendEntriesArgs> args , std::shared_ptr<mprrpc::AppendEntriesReply> reply , std::shared_ptr<int > appendNums ) ; void pushMsgToKvServer (ApplyMsg msg) ; void readPersist (std::string data) ; std::string persistData () ; void Start (Op command,int * newLogIndex,int * newLogTerm,bool * isLeader ) ; void Snapshot (int index , std::string snapshot ) ;public : void init (std::vector<std::shared_ptr< RaftRpc >> peers,int me,std::shared_ptr<Persister> persister,std::shared_ptr<LockQueue<ApplyMsg>> applyCh) ;
重点需要关注的:
Raft的主要流程:领导选举(sendRequestVote RequestVote
) 日志同步、心跳(sendAppendEntries
AppendEntries
)
定时器的维护:主要包括raft向状态机定时写入(applierTicker
)、心跳维护定时器(leaderHearBeatTicker
)、选举超时定时器(electionTimeOutTicker
)。
持久化相关:包括哪些内容需要持久化,什么时候需要持久化(persist)
m_nextIndex
保存leader下一次应该从哪一个日志开始发送给follower;
m_matchIndex
表示follower在哪一个日志是已经匹配了的(由于日志安全性,某一个日志匹配,那么这个日志及其之前的日志都是匹配的)
init() 启动初始化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 void Raft::init (std::vector<std::shared_ptr<RaftRpc>> peers, int me, std::shared_ptr<Persister> persister, std::shared_ptr<LockQueue<ApplyMsg>> applyCh) { m_peers = peers; m_persister = persister; m_me = me; m_mtx.lock (); this ->applyChan = applyCh; m_currentTerm = 0 ; m_status = Follower; m_commitIndex = 0 ; m_lastApplied = 0 ; m_logs.clear (); for (int i =0 ;i<m_peers.size ();i++){ m_matchIndex.push_back (0 ); m_nextIndex.push_back (0 ); } m_votedFor = -1 ; m_lastSnapshotIncludeIndex = 0 ; m_lastSnapshotIncludeTerm = 0 ; m_lastResetElectionTime = now (); m_lastResetHearBeatTime = now (); readPersist (m_persister->ReadRaftState ()); if (m_lastSnapshotIncludeIndex > 0 ){ m_lastApplied = m_lastSnapshotIncludeIndex; } m_mtx.unlock (); std::thread t (&Raft::leaderHearBeatTicker, this ) ; t.detach (); std::thread t2 (&Raft::electionTimeOutTicker, this ) ; t2.detach (); std::thread t3 (&Raft::applierTicker, this ) ; t3.detach (); }
从上面可以看到一共产生了三个定时器,分别维护:选举、日志同步和心跳、raft节点与kv-server的联系。相互之间是比较隔离的。
Leader选举
主要涉及函数及其流程:
img
electionTimeOutTicker :负责查看是否该发起选举,如果该发起选举就执行doElection发起选举。
doElection :实际发起选举,构造需要发送的rpc,并多线程调用sendRequestVote处理rpc及其相应。
sendRequestVote :负责发送选举中的RPC,在发送完rpc后还需要负责接收并处理对端发送回来的响应。
RequestVote :接收别人发来的选举请求,主要检验是否要给对方投票。
electionTimeOutTicker
选举超时由于electionTimeOutTicker
维护。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 void Raft::electionTimeOutTicker () { while (true ) { m_mtx.lock (); auto nowTime = now (); auto suitableSleepTime = getRandomizedElectionTimeout () + m_lastResetElectionTime - nowTime; m_mtx.unlock (); if (suitableSleepTime.count () > 1 ) { std::this_thread::sleep_for (suitableSleepTime); } if ((m_lastResetElectionTime - nowTime).count () > 0 ) { continue ; } doElection (); } }
在死循环中,首先计算距离下一次超时应该睡眠的时间suitableSleepTime,然后睡眠这段时间,醒来后查看睡眠的这段时间选举超时定时器是否被触发,如果没有触发就发起选举。
“举超时定时器是否被触发”:选举定时器的触发条件:收到leader发来的appendEntryRPC
、给其他的节点选举投票
在死循环中,首先计算距离上次重置选举计时器的时间加上随机化的选举超时时间,然后线程根据这个时间决定是否睡眠。若超时时间未到,线程进入睡眠状态,若在此期间选举计时器被重置,则继续循环。若超时时间已到,调用doElection()
函数启动领导者选举过程。
doElection ()函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 void Raft::doElection () { lock_guard<mutex> g (m_mtx) ; if (m_status != Leader) { DPrintf ("[ ticker-func-rf(%d) ] 选举定时器到期且不是leader,开始选举 \n" , m_me); m_status = Candidate; m_currentTerm += 1 ; m_votedFor = m_me; persist (); std::shared_ptr<int > votedNum = std::make_shared <int >(1 ); m_lastResetElectionTime = now (); for (int i = 0 ; i < m_peers.size (); i++) { if (i == m_me) { continue ; } int lastLogIndex = -1 , lastLogTerm = -1 ; getLastLogIndexAndTerm (&lastLogIndex, &lastLogTerm); std::shared_ptr<mprrpc::RequestVoteArgs> requestVoteArgs = std::make_shared <mprrpc::RequestVoteArgs>(); requestVoteArgs->set_term (m_currentTerm); requestVoteArgs->set_candidateid (m_me); requestVoteArgs->set_lastlogindex (lastLogIndex); requestVoteArgs->set_lastlogterm (lastLogTerm); std::shared_ptr<mprrpc::RequestVoteReply> requestVoteReply = std::make_shared <mprrpc::RequestVoteReply>(); std::thread t (&Raft::sendRequestVote, this , i, requestVoteArgs, requestVoteReply, votedNum) ; t.detach (); } } }
Raft::AppendEntries1()函数
如果对方的任期号比自己当前任期号小,说明对方已经是过期领导人了,那么更新reply状态并直接return。
1 2 3 4 5 6 7 8 if (args->term () < m_currentTerm) { reply->set_success (false ); reply->set_term (m_currentTerm); reply->set_updatenextindex (-100 ); DPrintf ("[func-AppendEntries-rf{%d}] 拒绝了 因为Leader{%d}的term{%v}< rf{%d}.term{%d}\n" , m_me, args->leaderid (), args->term (), m_me, m_currentTerm); return ; }
如果对方任期号比自己当前任期号大,那么就说明leader更新了,不管自己之前是不是leader,现在都要成为follwer。
1 2 3 4 5 6 7 8 9 10 if (args->term () > m_currentTerm) { m_status = Follower; m_currentTerm = args->term (); m_votedFor = -1 ; }
如果上面两种情况都不满足,那么说明自己收到的就是当前leader发的包。因为只有leader会向follwer更新日志或者发心跳包。这里的args其实就是leader发送过来的参数。
如果leader的prevlogindex(前一个日志的日志号)大于当前节点lastLogIndex(最新的日志号),那么说明leader发送的日志过于新了,当前节点要从lastLogIndex
+ 1一个一个更新。
1 2 3 4 5 6 7 if (args->prevlogindex () > getLastLogIndex ()) { reply->set_success (false ); reply->set_term (m_currentTerm); reply->set_updatenextindex (getLastLogIndex () + 1 ); return ; }
如果leader的prevlogindex比当前节点快照的最后一条日志号小的话,那么就说明leader共发送的日志比当前节点已经快照的最后一个日志号都要小。
1 2 3 4 5 6 7 8 else if (args->prevlogindex () < m_lastSnapshotIncludeIndex) { reply->set_success (false ); reply->set_term (m_currentTerm); reply->set_updatenextindex ( m_lastSnapshotIncludeIndex + 1 ); }
这里为什么?
1 2 3 4 5 6 7 myAssert ( getLastLogIndex () >= args->prevlogindex () + args->entries_size (), format("[func-AppendEntries1-rf{%d}]rf.getLastLogIndex(){%d} != args.PrevLogIndex{%d}+len(args.Entries){%d}" , m_me, getLastLogIndex (), args->prevlogindex (), args->entries_size ()));
Raft::sendAppendEntries()函数
函数原型 :
1 2 3 bool Raft::sendAppendEntries (int server, std::shared_ptr<raftRpcProctoc::AppendEntriesArgs> args, std::shared_ptr<raftRpcProctoc::AppendEntriesReply> reply, std::shared_ptr<int > appendNums)
args是要发送的AE
rpc,reply是返回,m_peers是一个代理类的数组,直接调用m_peers[server]的AppendEntries函数,并得到返回值,如果ok为false直接报错并返回,否则对reply进行处理。
1 bool ok = m_peers[server]->AppendEntries (args.get (), reply.get ());
加锁,同时收到多个reply的并发处理。
1 std::lock_guard<std::mutex> lg1 (m_mtx) ;
如果reply->term() >
m_currentTerm,说明自己的任期号已经不是最新的了,将自己转为follower:
1 2 3 4 5 6 if (reply->term () > m_currentTerm) { m_status = Follower; m_currentTerm = reply->term (); m_votedFor = -1 ; return ok; }
如果reply->term() <
m_currentTerm,说明收到的是过期回复,直接返回即可。
1 2 3 4 5 else if (reply->term () < m_currentTerm) { DPrintf ("[func -sendAppendEntries rf{%d}] 节点:{%d}的term{%d}<rf{%d}的term{%d}\n" , m_me, server, reply->term (), m_me, m_currentTerm); return ok; }
如果reply->success()不为真:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 if (!reply->success ()) { if (reply->updatenextindex () != -100 ) { DPrintf ("[func -sendAppendEntries rf{%d}] 返回的日志term相等,但是不匹配,回缩nextIndex[%d]:{%d}\n" , m_me, server, reply->updatenextindex ()); m_nextIndex[server] = reply->updatenextindex (); } }
否则说明更新成功,同时更新server的m_matchIndex和m_nextIndex,并将appendNums所对应的值加一。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 else { *appendNums = *appendNums + 1 ; DPrintf ("---------------------------tmp------------------------- 節點{%d}返回true,當前*appendNums{%d}" , server, *appendNums); m_matchIndex[server] = std::max (m_matchIndex[server], args->prevlogindex () + args->entries_size ()); m_nextIndex[server] = m_matchIndex[server] + 1 ; int lastLogIndex = getLastLogIndex (); myAssert (m_nextIndex[server] <= lastLogIndex + 1 , format("error msg:rf.nextIndex[%d] > lastLogIndex+1, len(rf.logs) = %d lastLogIndex{%d} = %d" , server, m_logs.size (), server, lastLogIndex)); if (*appendNums >= 1 + m_peers.size () / 2 ) { *appendNums = 0 ; if (args->entries_size () > 0 && args->entries (args->entries_size () - 1 ).logterm () == m_currentTerm) { DPrintf ( "---------------------------tmp------------------------- 当前term有log成功提交,更新leader的m_commitIndex " "from{%d} to{%d}" , m_commitIndex, args->prevlogindex () + args->entries_size ()); m_commitIndex = std::max (m_commitIndex, args->prevlogindex () + args->entries_size ()); } myAssert (m_commitIndex <= lastLogIndex, format("[func-sendAppendEntries,rf{%d}] lastLogIndex:%d rf.commitIndex:%d\n" , m_me, lastLogIndex, m_commitIndex)); }
Raft::sendRequestVote()函数
函数总览:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 bool Raft::sendRequestVote (int server, std::shared_ptr<mprrpc::RequestVoteArgs> args, std::shared_ptr<mprrpc::RequestVoteReply> reply, std::shared_ptr<int > votedNum) { bool ok = m_peers[server]->RequestVote (args.get (),reply.get ()); if (!ok) { return ok; } lock_guard<mutex> lg (m_mtx) ; if (reply->term () > m_currentTerm){ m_status = Follower; m_currentTerm = reply->term (); m_votedFor = -1 ; persist (); return true ; } else if ( reply->term () < m_currentTerm ) { return true ; } if (!reply->votegranted ()){ return true ; } *votedNum = *votedNum + 1 ; if (*votedNum >= m_peers.size ()/2 +1 ) { *votedNum = 0 ; m_status = Leader; int lastLogIndex = getLastLogIndex (); for (int i = 0 ; i <m_nextIndex.size () ; i++) { m_nextIndex[i] = lastLogIndex + 1 ; m_matchIndex[i] = 0 ; } std::thread t (&Raft::doHeartBeat, this ) ; t.detach (); persist (); } return true ; }
首先发送投票请求,如果失败直接返回:
1 2 3 4 5 6 7 bool ok = m_peers[server]->RequestVote (args.get (), reply.get ());if (!ok) { return ok; }
如果成功,加锁后对reply进行处理:
1 2 3 4 5 6 7 8 9 10 std::lock_guard<std::mutex> lg (m_mtx) ; if (reply->term () > m_currentTerm) { m_status = Follower; m_currentTerm = reply->term (); m_votedFor = -1 ; persist (); return true ; } else if (reply->term () < m_currentTerm) { return true ; }
如果以上两种情况都不满足,那么就是合法的投票回复:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 if (!reply->votegranted ()) { return true ; } *votedNum = *votedNum + 1 ; if (*votedNum >= m_peers.size () / 2 + 1 ) { *votedNum = 0 ; if (m_status == Leader) { myAssert (false , format("[func-sendRequestVote-rf{%d}] term:{%d} 同一个term当两次领导,error" , m_me, m_currentTerm)); } m_status = Leader; int lastLogIndex = getLastLogIndex (); for (int i = 0 ; i < m_nextIndex.size (); i++) { m_nextIndex[i] = lastLogIndex + 1 ; m_matchIndex[i] = 0 ; } std::thread t (&Raft::doHeartBeat, this ) ; t.detach (); persist (); }
Raft::RequestVote()函数
函数总览:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 void Raft::RequestVote ( const mprrpc::RequestVoteArgs *args, mprrpc::RequestVoteReply *reply) { lock_guard<mutex> lg (m_mtx) ; Defer ec1 ([this ]() -> void { this ->persist(); }) ; if (args->term () < m_currentTerm) { reply->set_term (m_currentTerm); reply->set_votestate (Expire); reply->set_votegranted (false ); return ; } if (args->term () > m_currentTerm) { m_status = Follower; m_currentTerm = args->term (); m_votedFor = -1 ; } int lastLogTerm = getLastLogIndex (); if (!UpToDate (args->lastlogindex (), args->lastlogterm ())) { reply->set_term (m_currentTerm); reply->set_votestate (Voted); reply->set_votegranted (false ); return ; } if (m_votedFor != -1 && m_votedFor != args->candidateid ()) { reply->set_term (m_currentTerm); reply->set_votestate (Voted); reply->set_votegranted (false ); return ; } else { m_votedFor = args->candidateid (); m_lastResetElectionTime = now (); reply->set_term (m_currentTerm); reply->set_votestate (Normal); reply->set_votegranted (true ); return ; } }
日志复制/心跳
img
可以从流程图看到,函数实现上我尽量将心跳日志复制的流程统一,方便理解和后期统一修改理解AppendEntry
相关内容,snapshot的逻辑是类似的。
leaderHearBeatTicker :负责查看是否该发送心跳了,如果该发起就执行doHeartBeat。
doHeartBeat :实际发送心跳,判断到底是构造需要发送的rpc,并多线程调用sendRequestVote处理rpc及其相应。
sendAppendEntries :负责发送日志的RPC,在发送完rpc后还需要负责接收并处理对端发送回来的响应。
leaderSendSnapShot :负责发送快照的RPC,在发送完rpc后还需要负责接收并处理对端发送回来的响应。
AppendEntries :接收leader发来的日志请求,主要检验用于检查当前日志是否匹配并同步leader的日志到本机。
InstallSnapshot :接收leader发来的快照请求,同步快照到本机。
leaderHearBeatTicker
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 void Raft::leaderHearBeatTicker () { while (true ) { auto nowTime = now (); m_mtx.lock (); auto suitableSleepTime = std::chrono::milliseconds (HeartBeatTimeout) + m_lastResetHearBeatTime - nowTime; m_mtx.unlock (); if (suitableSleepTime.count () < 1 ) { suitableSleepTime = std::chrono::milliseconds (1 ); } std::this_thread::sleep_for (suitableSleepTime); if ((m_lastResetHearBeatTime - nowTime).count () > 0 ) { continue ; } doHeartBeat (); } }
其基本逻辑和选举定时器electionTimeOutTicker一模一样,不一样之处在于设置的休眠时间不同,这里是根据HeartBeatTimeout来设置,而electionTimeOutTicker中是根据getRandomizedElectionTimeout()
设置。
doHeartBeat
这里目前逻辑写的不统一,发送快照leaderSendSnapShot和发送日志sendAppendEntries的rpc值的构造没有统一,且写在一坨。
可以抽离出来。目前先将就,关注主要逻辑。
带注释版:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 void Raft::doHeartBeat () { std::lock_guard<mutex> g (m_mtx) ; if (m_status == Leader) { auto appendNums = std::make_shared <int >(1 ); for (int i = 0 ; i < m_peers.size (); i++) { if (i == m_me){ continue ; } if (m_nextIndex[i] <= m_lastSnapshotIncludeIndex) { std::thread t (&Raft::leaderSendSnapShot, this , i) ; t.detach (); continue ; } int preLogIndex = -1 ; int PrevLogTerm = -1 ; getPrevLogInfo (i, &preLogIndex, &PrevLogTerm); std::shared_ptr<mprrpc::AppendEntriesArgs> appendEntriesArgs = std::make_shared <mprrpc::AppendEntriesArgs>(); appendEntriesArgs->set_term (m_currentTerm); appendEntriesArgs->set_leaderid (m_me); appendEntriesArgs->set_prevlogindex (preLogIndex); appendEntriesArgs->set_prevlogterm (PrevLogTerm); appendEntriesArgs->clear_entries (); appendEntriesArgs->set_leadercommit (m_commitIndex); if (preLogIndex != m_lastSnapshotIncludeIndex) { for (int j = getSlicesIndexFromLogIndex (preLogIndex) + 1 ; j < m_logs.size (); ++j) { mprrpc::LogEntry *sendEntryPtr = appendEntriesArgs->add_entries (); *sendEntryPtr = m_logs[j]; } } else { for (const auto & item: m_logs) { mprrpc::LogEntry *sendEntryPtr = appendEntriesArgs->add_entries (); *sendEntryPtr = item; } } int lastLogIndex = getLastLogIndex (); const std::shared_ptr<mprrpc::AppendEntriesReply> appendEntriesReply = std::make_shared <mprrpc::AppendEntriesReply>(); std::thread t (&Raft::sendAppendEntries, this , i, appendEntriesArgs, appendEntriesReply, appendNums) ; t.detach (); } m_lastResetHearBeatTime = now (); } }
加速日志匹配
这部分在AppendEntries
函数里,涉及代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 reply->set_updatenextindex (args->prevlogindex ());for (int index = args->prevlogindex (); index >= m_lastSnapshotIncludeIndex; --index) { if (getLogTermFromLogIndex (index) != getLogTermFromLogIndex (args->prevlogindex ())) { reply->set_updatenextindex (index + 1 ); break ; } } reply->set_success (false ); reply->set_term (m_currentTerm);return ;
如果日志不匹配的话可以一个一个往前的倒退。但是这样的话可能会设计很多个rpc之后才能找到匹配的日志,那么就一次多倒退几个数。
倒退几个呢?这里认为如果某一个日志不匹配,那么这一个日志所在的term的所有日志大概率都不匹配 ,那么就倒退到
最后一个日志所在的term的最后那个命令。
持久化
持久化就是把不能丢失的数据保存到磁盘。
持久化的内容为两部分:
raft节点的部分信息;
m_currentTerm
:当前节点的Term,避免重复到一个Term,可能会遇到重复投票等问题。
m_votedFor
:当前Term给谁投过票,避免故障后重复投票。
m_logs
:raft节点保存的全部的日志信息。
kvDb的快照
m_lastSnapshotIncludeIndex
:快照的信息,快照最新包含哪个日志Index
m_lastSnapshotIncludeTerm
:快照的信息,快照最新包含哪个日志Term,与m_lastSnapshotIncludeIndex
是对应的。
不妨想一想,其他的信息为什么不用持久化,比如说:身份、commitIndex、applyIndex等等。
applyIndex不持久化是经典raft的实现,在一些工业实现上可能会优化,从而持久化。
即applyIndex不持久化不会影响“共识”。
Snapshot是kvDb的快照,也可以看成是日志,因此:全部的日志 = m_logs +
snapshot
因为Snapshot是kvDB生成的,kvDB肯定不知道raft的存在,而什么term、什么日志Index都是raft才有的概念,因此snapshot中肯定没有term和index信息。
所以需要raft自己来保存这些信息。
故,快照与m_logs联合起来理解即可。
为什么要持久化这些内容?
两部分原因:共识安全、优化。
除了snapshot相关的部分,其他部分都是为了共识安全。
而snapshot是因为日志一个一个的叠加,会导致最后的存储非常大,因此使用snapshot来压缩日志。
不严谨的一种理解方式:
为什么snashot可以压缩日志?
日志是追加写的,对于一个变量的重复修改可能会重复保存,理论上对一个变量的反复修改会导致日志不断增大。
而snapshot是原地写,即只保存一个变量最后的值,自然所需要的空间就小了。
什么时候持久化
需要持久化的内容发送改变的时候就要注意持久化。
比如term
增加,日志增加等等。
具体的可以查看代码仓库中的void Raft::persist()
相关内容。
谁来调用持久化
谁来调用都可以,只要能保证需要持久化的内容能正确持久化。
仓库代码中选择的是raft类自己来完成持久化。因为raft类最方便感知自己的term之类的信息有没有变化。
注意,虽然持久化很耗时,但是持久化这些内容的时候不要放开锁,以防其他线程改变了这些值,导致其它异常。
具体怎么实现持久化|使用哪个函数持久化
其实持久化是一个非常难的事情,因为持久化需要考虑:速度、大小、二进制安全。
因此仓库实现目前采用的是使用boost库中的持久化实现,将需要持久化的数据序列化转成std::string
类型再写入磁盘。
当然其他的序列化方式也少可行的。
可以看到这一块还是有优化空间的,因此可以尝试对这里优化优化。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 std::string Raft::persistData () { BoostPersistRaftNode boostPersistRaftNode; boostPersistRaftNode.m_currentTerm = m_currentTerm; boostPersistRaftNode.m_votedFor = m_votedFor; boostPersistRaftNode.m_lastSnapshotIncludeIndex = m_lastSnapshotIncludeIndex; boostPersistRaftNode.m_lastSnapshotIncludeTerm = m_lastSnapshotIncludeTerm; for (auto &item: m_logs) { boostPersistRaftNode.m_logs.push_back (item.SerializeAsString ()); } std::stringstream ss; boost::archive::text_oarchive oa (ss) ; oa<<boostPersistRaftNode; return ss.str (); }
kvServer怎么处理外部请求
img
图中是raftServer,这里叫成kvServer,是一样的。
kvServer其实是个中间组件,负责沟通kvDB和raft节点。
那么外部请求怎么打进来呢?
哦吼,当然是Server来负责呀,加入后变成了:
img
kvServer怎么和上层kvDB沟通,怎么和下层raft节点沟通?
通过这两个成员变量实现:
1 2 std::shared_ptr<LockQueue<ApplyMsg> > applyChan; std::unordered_map<std::string, std::string> m_kvDB;
kvDB:使用的是unordered_map来代替上层的kvDB,因此没啥好说的.
raft节点:其中LockQueue
是一个并发安全的队列,这种方式其实是模仿的go中的channel机制。
在raft类中这里 可以看到,raft类中也拥有一个applyChan,kvSever和raft类都持有同一个applyChan,来完成相互的通信。
从上面的结构图中可以看到kvServer负责与外部clerk通信。
那么一个外部请求的处理可以简单的看成两步:
接收外部请求。
本机内部与raft和kvDB协商如何处理该请求。
返回外部响应。
接收与响应外部请求
对于1和3,请求和返回的操作我们可以通过http、自定义协议等等方式实现,但是既然我们已经写出了rpc通信的一个简单的实现(源代码可见:这里 ),那就使用rpc来实现吧。
而且rpc可以直接完成请求和响应这一步,后面就不用考虑外部通信的问题了,好好处理好本机的流程即可。
相关函数是:
1 2 3 4 5 6 7 8 9 10 void PutAppend (google::protobuf::RpcController *controller, const ::raftKVRpcProctoc::PutAppendArgs *request, ::raftKVRpcProctoc::PutAppendReply *response, ::google::protobuf::Closure *done) override ;void Get (google::protobuf::RpcController *controller, const ::raftKVRpcProctoc::GetArgs *request, ::raftKVRpcProctoc::GetReply *response, ::google::protobuf::Closure *done) override ;
见名知意,请求分成两种:get和put(也就是set)。
如果是putAppend,clerk中就调用PutAppend
的rpc。
如果是Get,clerk中就调用Get
的rpc。
与raft节点沟通
在正式开始之前我们必须要先了解 线性一致性
的相关概念。建议阅读线性一致性 .
这里讲一讲raft如何做的。
每个 client
都需要一个唯一的标识符,它的每个不同命令需要有一个顺序递增的
commandId,clientId 和这个 commandId,clientId
可以唯一确定一个不同的命令,从而使得各个 raft
节点可以记录保存各命令是否已应用以及应用以后的结果。
即对于每个clinet,都有一个唯一标识,对于每个client,只执行递增的命令。
在保证线性一致性的情况下如何写kv
具体的思想在上面已经讲过,这里展示一下关键的代码实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 if (!chForRaftIndex->timeOutPop (CONSENSUS_TIMEOUT, &raftCommitOp)) { if (ifRequestDuplicate(op.ClientId, op.RequestId)) { reply->set_err (OK); } else { reply->set_err (ErrWrongLeader); } } else { if (raftCommitOp.ClientId == op.ClientId && raftCommitOp.RequestId == op.RequestId) { reply->set_err (OK); } else { reply->set_err (ErrWrongLeader); } }
需要注意的是,这里的命令执行成功是指:本条命令在整个raft集群达到同步的状态,而不是一台机器上的raft保存了该命令。
全部代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 void KvServer::Get (const raftKVRpcProctoc::GetArgs *args, raftKVRpcProctoc::GetReply *reply) { Op op; op.Operation = "Get" ; op.Key = args->key (); op.Value = "" ; op.ClientId = args->clientid (); op.RequestId = args->requestid (); int raftIndex = -1 ; int _ = -1 ; bool isLeader = false ; m_raftNode->Start (op, &raftIndex, &_, &isLeader); if (!isLeader) { reply->set_err (ErrWrongLeader); return ; } m_mtx.lock (); if (waitApplyCh.find (raftIndex) == waitApplyCh.end ()) { waitApplyCh.insert (std::make_pair (raftIndex, new LockQueue <Op>())); } auto chForRaftIndex = waitApplyCh[raftIndex]; m_mtx.unlock (); Op raftCommitOp; if (!chForRaftIndex->timeOutPop (CONSENSUS_TIMEOUT, &raftCommitOp)) { int _ = -1 ; bool isLeader = false ; m_raftNode->GetState (&_, &isLeader); if (ifRequestDuplicate(op.ClientId, op.RequestId) && isLeader) { std::string value; bool exist = false ; ExecuteGetOpOnKVDB (op, &value, &exist); if (exist) { reply->set_err (OK); reply->set_value (value); } else { reply->set_err (ErrNoKey); reply->set_value ("" ); } } else { reply->set_err (ErrWrongLeader); } } else { if (raftCommitOp.ClientId == op.ClientId && raftCommitOp.RequestId == op.RequestId) { std::string value; bool exist = false ; ExecuteGetOpOnKVDB (op, &value, &exist); if (exist) { reply->set_err (OK); reply->set_value (value); } else { reply->set_err (ErrNoKey); reply->set_value ("" ); } } else { reply->set_err (ErrWrongLeader); } } m_mtx.lock (); auto tmp = waitApplyCh[raftIndex]; waitApplyCh.erase (raftIndex); delete tmp; m_mtx.unlock (); }
在保证线性一致性的情况下如何读kv
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 if (!chForRaftIndex->timeOutPop (CONSENSUS_TIMEOUT, &raftCommitOp)) { int _ = -1 ; bool isLeader = false ; m_raftNode->GetState (&_, &isLeader); if (ifRequestDuplicate(op.ClientId, op.RequestId) && isLeader) { std::string value; bool exist = false ; ExecuteGetOpOnKVDB (op, &value, &exist); if (exist) { reply->set_err (OK); reply->set_value (value); } else { reply->set_err (ErrNoKey); reply->set_value ("" ); } } else { reply->set_err (ErrWrongLeader); } } else { if (raftCommitOp.ClientId == op.ClientId && raftCommitOp.RequestId == op.RequestId) { std::string value; bool exist = false ; ExecuteGetOpOnKVDB (op, &value, &exist); if (exist) { reply->set_err (OK); reply->set_value (value); } else { reply->set_err (ErrNoKey); reply->set_value ("" ); } } else { }
个人感觉读与写不同的是,读就算操作过也可以重复执行,不会违反线性一致性。
因为毕竟不会改写数据库本身的内容。
以GET请求为例看一看流程 :
以一个读操作为例看一看流程:
首先外部RPC调用GET,
1 2 3 4 5 void KvServer::Get (google::protobuf::RpcController *controller, const ::raftKVRpcProctoc::GetArgs *request, ::raftKVRpcProctoc::GetReply *response, ::google::protobuf::Closure *done) { KvServer::Get (request,response); done->Run (); }
然后是根据请求参数生成Op,生成Op是因为raft和raftServer沟通用的是类似于go中的channel的机制,然后向下执行即可。
注意:在这个过程中需要判断当前节点是不是leader,如果不是leader的话就返回ErrWrongLeader
,让其他clerk换一个节点尝试。
RPC如何实现调用
这里以Raft类为例讲解下如何使用rpc远程调用的。
写protoc文件,并生成对应的文件,Raft类使用的protoc文件和生成的文件见:这里
继承生成的文件的类
class Raft : public raftRpcProctoc::raftRpc
重写rpc方法即可:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 void AppendEntries (google::protobuf::RpcController *controller, const ::raftRpcProctoc::AppendEntriesArgs *request, ::raftRpcProctoc::AppendEntriesReply *response, ::google::protobuf::Closure *done) override ; void InstallSnapshot (google::protobuf::RpcController *controller, const ::raftRpcProctoc::InstallSnapshotRequest *request, ::raftRpcProctoc::InstallSnapshotResponse *response, ::google::protobuf::Closure *done) override ; void RequestVote (google::protobuf::RpcController *controller, const ::raftRpcProctoc::RequestVoteArgs *request, ::raftRpcProctoc::RequestVoteReply *response, ::google::protobuf::Closure *done) override ;
一些问题思考
m_nextIndex和m_matchIndex
m_nextIndex
保存leader下一次应该从哪一个日志开始发送给follower;m_matchIndex
表示follower在哪一个日志是已经匹配了的(由于日志安全性,某一个日志匹配,那么这个日志及其之前的日志都是匹配的)。
m_nextIndex
与m_matchIndex
是否有冗余,即使用一个m_nextIndex
可以吗?
显然是不行的,m_nextIndex
的作用是用来寻找m_matchIndex
,不能直接取代。我们可以从这两个变量的变化看,在当选leader后,m_nextIndex
初始化为最新日志index,m_matchIndex
初始化为0,如果日志不匹配,那么m_nextIndex
就会不断的缩减,直到遇到匹配的日志,这时候m_nextIndex
应该一直为m_matchIndex+1
。
如果一直不发生故障,那么后期m_nextIndex就没有太大作用了,但是raft考虑需要考虑故障的情况,因此需要使用两个变量。
锁,能否在其中的某个地方提前放锁,或者使用多把锁来尝试提升性能?
多线程发送,能不能直接在doHeartBeat或者doElection函数里面直接一个一个发送消息呢?
可以有的优化空间
线程池,而不是每次rpc都不断地创建新线程
日志
从节点读取日志
面试问题
Raft算法的基本原理
回答要点 :解释Raft算法的基本工作原理,包括领导者选举、日志复制和安全性保障。
示例回答 :
Raft算法是一种分布式算法,旨在解决分布式系统中的一致性问题,相对于Paxos算法而言更易于理解和实现。
Raft算法将系统中的所有节点分为三类角色:领导者(leader
)、跟随者(follower
)和候选人(candidate
)。其选举机制确保系统中的一个节点被选为领导者(leader
),领导者负责处理客户端的请求,并将更新复制到其他节点。
Raft算法的基本原理包括以下几个关键步骤:
领导者选举(Leader
Election):在系统启动时或者当前领导者失效时,节点会发起选举过程。节点会在一个随机的超时时间内等待收到来自其他节点的心跳消息。如果在超时时间内没有收到心跳消息,节点就会成为候选人并发起选举。候选人向其他节点发送投票请求,并在得到大多数节点的投票后成为新的领导者。
日志复制(Log
Replication):一旦领导者选举完成,新的领导者就会接收客户端的请求,并将更新的日志条目复制到其他节点。当大多数节点都成功复制了这些日志条目时,更新被认为是提交的,并且可以应用到节点的状态机中。
安全性(Safety):Raft算法通过确保在选举中只有一个领导者(单一领导者)、大多数节点的一致性以及只有领导者可以处理客户端请求等方式保证分布式系统的安全性。
通过以上机制,Raft算法确保了分布式系统中的一致性、可用性和分区容错性。
注意:如果这么回答如果面试官懂一些分布式算法的话,那么后续可能会提问Raft与其他分布式算法的关系。
领导者选举
回答要点 :这里最好结合前面几个章节的流程图,结合自己理解回答。
示例回答 :
img
Raft中的领导者选举过程如下:
候选人状态(Candidate State) :
节点在没有检测到领导者的情况下成为候选人,并将自己的任期编号(term)增加1。
候选人向其他节点发送投票请求,并请求其他节点投票支持自己成为新的领导者。
如果候选人在规定时间内收到了大多数节点的选票支持(即获得了大多数节点的投票),则成为新的领导者。
选举过程(Election Process) :
在发起选举后,候选人会等待一定的随机时间(选举超时时间)来收集其他节点的投票。
如果在这个超时时间内没有收到大多数节点的选票,候选人将会重新开始一个新的选举周期,增加自己的任期编号,并再次发起选举 。
投票过程(Voting Process) :
其他节点收到来自候选人的投票请求后,会检查自己的当前任期编号。如果候选人的任期编号比自己的大,则投票支持候选人,并更新自己的任期编号为候选人的任期编号。
如果其他节点已经投票给了另一个候选人,或者已经投票给了当前领导者,它将拒绝投票。
领导者选举完成(Leader Election Complete) :
如果候选人收到了大多数节点的投票支持,它就会成为新的领导者。
新的领导者会开始发送心跳消息以维持其领导地位,并开始进行日志复制操作。
在什么情况下会触发领导者选举?
回答要点 :一个节点只要长时间没有收到符合条件的leader发送的心跳就会认为leader掉线,就会发起选举。
示例回答 :
在Raft算法中,领导者选举会在以下情况下触发:
当系统启动时,所有节点都处于初始状态,没有领导者。
当领导者节点因网络分区、宕机或其他原因失效时,导致系统中没有活跃的领导者。
当节点故障恢复或者被网络分区时,它可能会检测到当前没有领导者,因此会成为候选人并发起选举。
日志复制
Raft是如何通过日志复制来保证数据一致性的?
回答要点 :主要是两个机制(特点):
1.Leader Append
Entries:领导者追加日志条目,即只有leader可以接受外部请求并将请求打包成日志 ,并向follower同步自己的日志,这样保证提交过的日志不会被覆盖掉。
2.commit机制,领导者发现大多数节点 都已经成功复制了某个日志条目后,该日志条目被视为已经提交,从而保证了数据的一致性。
示例回答:
Raft算法通过日志复制来确保数据一致性。在Raft中,每个节点都维护一个日志(log)来记录状态机中的操作指令。领导者负责接收客户端的写请求,将操作指令追加到自己的日志中,并将这些操作指令发送给其他节点,要求它们复制这些日志条目。
以下是Raft通过日志复制来保证数据一致性的基本流程:
Leader Append Entries(领导者追加日志条目) :
领导者接收到客户端的写请求后,将这些操作指令追加到自己的日志中。
领导者将这些操作指令组织成一个日志条目(log
entry),并向其他节点发送一个追加日志条目的请求(Append Entries
RPC)。
Follower Log Replication(跟随者日志复制) :
跟随者节点接收到领导者发送的追加日志条目的请求后,会按照领导者的日志条目顺序将这些日志条目追加到自己的日志中。
如果跟随者节点成功复制了这些日志条目,则向领导者发送成功响应(Response);如果由于某种原因(例如网络故障)导致复制失败,则向领导者发送失败响应。
Commit(提交) :
当领导者发现大多数节点都已经成功复制了某个日志条目后,该日志条目被视为已经提交。
领导者将提交的日志条目应用到自己的状态机中,以执行相应的操作指令。
安全性保障
Raft是如何确保安全性的?讨论一致性、可用性和分区容错性之间的权衡。
回答要点
:这里主要是想考察分布式CAP理论的一个关键:CAP中如果发生故障,只能CP和AP二选一,无法满足CAP的三角,而Raft选择的是CP,即满足一致性。
示例回答:
在权衡一致性、可用性和分区容错性时,Raft算法倾向于优先保证一致性和分区容错性。它通过保证大多数节点的确认和限制领导者选举条件来确保一致性,通过选举机制和日志复制来保证分区容错性。同时,Raft也兼顾了系统的可用性,确保在领导者失效后能够快速进行新的领导者选举,并继续提供服务。
选举超时:
什么是选举超时?它的作用是什么?
回答要点:
follower和candidate都会有选举超时的机制。
在follower时:选举超时的意义是发起选举,变成candidate;
在candidate时:candidate会选举超时,如果选举成功就会变成leader;如果选举失败就会变成candidate(选举超时)或者follower(发现合适的leader)。那么选举超时的作用就很明显了,防止无止境的等待导致所有人都成不了leader。
拓展:想一想为什么选举超时时间要每次随机设置而不设置成一个固定的值???
示例回答:
选举超时的作用包括:
触发领导者选举 :选举超时用于在当前没有活跃领导者或者领导者失效时触发新的领导者选举。当节点在选举超时时间内没有收到来自当前领导者的心跳消息时,会成为候选人并发起选举过程。
防止脑裂(Split-Brain) :选举超时帮助避免了系统中出现多个领导者的情况,从而避免了脑裂问题的发生。如果系统中的节点在选举超时时间内没有收到来自当前领导者的心跳消息,它们会同时成为候选人并发起选举,但只有一个候选人最终会获得大多数节点的选票,成为新的领导者。
确保领导者切换的及时性 :选举超时可以确保在领导者失效后,系统能够及时地启动新的领导者选举过程,从而减少服务中断的时间,提高系统的可用性。
选举超时的时间是如何设置的?
回答要点: 回答要点在上个问题的拓展里面,大家可以先想想。答案是:一个一定范围内的随机值,其要根据心跳时间,rpc延迟,数据操作延迟综合考虑。
范围:一般来说选举超时时间要大于一次完整心跳的日志同步处理时间。
为何随机:选举超时的目的是防止无止境的等待导致所有人都成不了leader,如果超时时间又一样,那么大家又一起选举,又会不断循环,那么一个随机值可以让某些节点早点重新发起选举,防止大家一起选举导致死循环。
示例回答:
选举超时时间的设置通常包括以下考虑因素:
网络延迟和稳定性 :选举超时时间需要足够长以允许节点在正常情况下能够收到来自领导者的心跳消息。考虑到网络的延迟和不稳定性,超时时间应该设置得足够长,以避免因网络延迟而误判领导者失效。
系统负载和响应速度 :选举超时时间也应考虑系统的负载情况和响应速度。如果系统负载较重或者节点的处理速度较慢,可能需要将选举超时时间设置得稍长一些,以允许节点有足够的时间处理收到的消息。
避免脑裂问题 :为了避免系统中出现多个领导者导致的脑裂问题,选举超时时间应该设置得足够随机化,以确保不同节点不会在同一时间内触发选举。
日志条目的提交
Raft中的日志条目是如何提交的?
回答要点:
要半数以上的节点(包括leader)接收了这个日志,那么才能提交(commit),后续才能apply到状态机。
示例回答:
Leader接收客户端请求 :
当客户端向Raft系统提交请求时,请求会首先发送到Raft集群中的Leader节点。
Leader将请求转换为日志条目 :
Leader将接收到的客户端请求转换为一条日志条目,并附加到其本地日志中。
Leader广播日志条目 :
Leader向其它节点发送包含新日志条目的心跳RPC请求(AppendEntries
RPC)。
Follower节点接收并附加日志条目 :
Follower节点接收到Leader的附加日志请求后,将新的日志条目附加到其本地日志中。
Follower节点响应Leader :
Follower节点在成功附加日志后,向Leader发送成功的响应。
Leader确认提交 :
当Leader收到大多数节点的附加成功响应时,将日志条目视为已提交。
Leader提交到状态机 :
Leader将已提交的日志条目应用到其状态机中,以执行相应的操作。
Leader通知Followers提交 :Leader会通知其它节点已提交的日志索引,以便它们也可以将相应的日志条目提交到其状态机中。
Follower提交到状态机 :Follower节点收到Leader的提交通知后,将对应的已提交日志条目应用到其状态机中。
什么条件下才能够提交一个日志条目?
回答要点:
一个很容易疏忽的是必须要本term有新的日志提交才能继续提交日志,这个在前面文章中也提醒过。
示例回答: 略。
Raft如何处理集群拓扑的变更?
回答要点:
示例回答:
生成配置变更请求 :
当需要改变集群的拓扑结构时,例如添加或移除节点,Leader节点会收到来自客户端的配置变更请求。
将配置变更请求转化为配置变更日志条目 :
Leader节点将配置变更请求转换为一条特殊的日志条目,即配置变更日志条目。该日志条目包含新的集群配置信息。
向集群中的节点分发配置变更日志条目 :
Leader节点通过Raft的日志复制机制将配置变更日志条目发送给所有的Follower节点。
Follower节点接收配置变更日志条目 :
Follower节点收到配置变更日志条目后,将其附加到本地日志中。
提交配置变更日志条目 :
当配置变更日志条目被大多数节点确认并附加到本地日志后,Leader节点将其提交到状态机,实际应用这个配置变更。
应用配置变更 :
Leader节点和所有的Follower节点根据新的配置信息更新其内部状态,以反映集群拓扑结构的变更。这可能涉及到更新节点的角色(Leader、Follower、Candidate)、维护新的节点列表等。
实际应用
Raft算法在实际场景中的应用有哪些?
回答要点:
列举一些常见的使用Raft算法作为底层的即可。
示例回答:
一些常见的配置中心,为了保证可用会采用Raft,比如zookeeper的底层实现了基于Raft修改的算法,ETCD等。
一些分布式数据库,比如TIKV
Raft与其他分布式的比较
与Paxos算法相比,Raft有哪些优势和不同之处?
回答要点:
Raft相对于Paxos算法来说,更易理解、实现和维护,具有更直观的Leader机制和选举过程。(这个需要大家多了解一下其他分布式算法的设计思想了。)
示例回答:
Leader机制 :Raft引入了Leader节点,而Paxos中没有Leader节点的概念。 Leader节点负责协调和领导整个一致性过程,而Follower节点只需按照Leader的指示执行操作。
日志复制 :在Raft中,所有的日志条目都通过Leader节点进行复制和提交,而Paxos中的日志复制是通过多个角色相互协作完成的。
角色切换 :Raft中Leader节点失效后,集群可以快速选举新的Leader节点,而Paxos中角色的切换较为复杂,需要进行更多的投票和协调。
更强的可读性 :Raft算法更加直观和易于理解,它的设计目标之一就是提供更好的可读性和可理解性,相比之下,Paxos算法相对更加抽象和复杂。
常见问题与挑战:
回答要点:
leader的瓶颈(使用多读来解决),节点故障等等
示例回答:
Leader瓶颈 :Raft算法中的Leader节点负责所有的客户端请求的处理和日志复制,这可能会成为系统的瓶颈。如果Leader节点负载过重或者发生故障,会导致整个系统的性能下降。
网络分区 :Raft算法需要保证在网络分区情况下的一致性,这可能会导致在网络分区恢复后需要进行数据的合并和冲突解决,增加了一致性维护的复杂性。
节点故障处理 :当节点发生故障时,Raft需要进行Leader的重新选举,这可能导致一段时间内系统的不可用性和性能下降,尤其是在节点频繁发生故障时。
日志复制延迟 :Raft算法要求日志复制必须在大多数节点上完成后才能提交,这可能导致日志复制的延迟,影响系统的实时性能。
节点动态变更 :Raft算法在节点动态加入或退出时需要进行配置变更,这可能会导致系统的不稳定和数据的不一致,需要谨慎处理。
一致性保证 :虽然Raft算法保证了强一致性,但在一些特殊情况下(如网络分区、节点故障等),可能会导致一致性级别的降低或者一致性协议的不满足,需要额外的机制来解决。
性能调优 :在实际应用中,Raft算法需要根据具体的场景进行性能调优,包括调整心跳超时时间、选举超时时间、日志复制的策略等参数,以满足系统的性能需求。
如何处理网络分区的情况?
回答要点:
这个要结合多种情况分析,比如leader宕机、非leader宕机;少数节点分区、多数节点分区。然后这几种情况还可以相互组合,这个的话就要分类讨论了,面试估计是说不完的。
示例回答:
分情况讨论,略。
容错性
Raft算法如何处理节点故障?
回答要点:
和网络分区是一样的,可以看成是网络分区的一种特殊情况,即一个节点自己是一个分区。
此外再加上故障恢复后有哪些数据(日志,投票,term等)是需要持久化的,哪些是不需要的(commitIndex,applyIndex等)。
示例回答:
略。
在集群中的多个节点同时故障时,系统会有什么表现?
回答要点: 考虑故障节点的数量,抓住“半数”这个概念。
其他同上面的“分区问题”和“故障问题”。
示例回答: 略。
RPC
你的RPC如何设计的?
回答要点:
这里最好的回答是回答出现有的rpc框架的一些优秀的设计,因为我的rpc实现只是raft-kv的一个配件,所以还是
同步阻塞
的,这里推荐大家看看异步rpc如何实现,也欢迎来改进项目中的RPC实现参与开源:项目地址
列出可以考虑的优化点:异步;rpc长连接短连接的思考;负载均衡 ;服务治理、服务发现。
示例回答:
略
负载均衡有没有做?用的什么算法如何考虑的?
回答要点:
1.常见的负载均衡算法及其对比;2.第四层和第七层不同层的负载均衡;3.瘦客户端和胖客户端的不同方式的负载均衡。
示例回答:
最开始实现rpc模块的时候实现过负载均衡算法,当然后面用于raft通信,因为raft通信是所有leader与所有节点都要建立连接,所以后面就没有再用负载均衡了,将这个功能关闭了。
当时实现的负载均衡的算法有:
轮询算法(Round Robin) :
轮询算法是最简单的负载均衡算法之一,它按照请求的顺序依次将每个请求分配到不同的服务器上。当有新的请求到来时,负载均衡器会依次将请求发送到不同的服务器,直到所有的服务器都被轮询过一遍,然后再从头开始。
最小连接数算法(Least Connections) :
最小连接数算法会将新的请求分配到当前连接数最少的服务器上,以确保各服务器的负载尽可能均衡。这种算法考虑了服务器的负载情况,优先将请求发送到负载较低的服务器上。
最少响应时间算法(Least Response Time) :
最少响应时间算法会将请求发送到响应时间最短的服务器上,以保证响应时间的最小化。这种算法通常需要负载均衡器记录每个服务器的响应时间,并动态调整请求的分配策略。
哈希算法(Hashing) :
哈希算法根据请求的某些属性(如客户端IP地址、URL等)计算哈希值,并将请求发送到对应哈希值的服务器上。这种算法能够确保相同请求始终被发送到同一台服务器上,适用于需要保持会话一致性的场景。
加权轮询算法(Weighted Round Robin) :
加权轮询算法在轮询算法的基础上引入了权重的概念,不同的服务器具有不同的权重值。根据权重值的不同,负载均衡器会调整请求的分配比例,以实现负载均衡。
拓展:hash环也是一种重要的负载均衡算法,也可以提及。
服务治理和发现有没有做?怎么做的?
回答要点:
一般是用第三方组件(比如zookeeper)来做,当然raft-kv本身就可以用来做服务治理和服务发现,所以rpc就没有单独做。
示例回答:
无
你这个RPC框架的序列化和反序列化中protobuf细节有没有了解
回答要点: 头部变长编码+自定义的压缩算法。
这里就可以牵扯到rpc中的编码方法,目前是头部定长4字节,可以优化成一个标志位+变长编码的方式:参与issue链接
示例回答:
大概了解了一下,主要是其头部的变长编码+Google自己实现的一个高效的压缩算法。
测试
在集群数量变多的时候,Raft性能可能会下降,这方面有没有思考过?
回答要点:
允许从follower读;rpc合并等raft落地的框架的优化。
示例回答: 无
有没有对性能进行过测试?用的什么工具?怎么测试的?
回答要点: perf火焰图。
示例回答:
这里回答一下火焰图的基本使用就差不多了,如果大家没有使用过的话推荐大家去看篇博文入门了解基本操作和原理,我这里给出一个初步的结果,如果只有一个客户端:并发几十,大部分的损耗在rpc这边。多个客户端的结果没有测试。