PhxPaxos源码分析——Paxos算法实现

这篇主要来分析Paxos算法实现的部分,我想这应该也是读者最感兴趣的。在看这篇文章之前,如果之前对Paxos算法没有了解的童鞋可以看下这篇文章:Paxos算法原理与推导,相信了解Paxos算法后再来通过源码看算法实现应该会很酸爽。

Paxos算法中最重要的两个角色是ProposerAcceptor。当然Leaner也很重要,特别是在PhxPaxos的实现中,Leaner具有重要的功能。但是因为《Paxos Made Simple》论文中主要还是Proposer和Acceptor,因此这篇文章还是以这两个角色为主,通过源码来回顾论文中Paxos算法的过程,同时也看看工程实现和论文的描述有什么区别。

这里先贴出Paxos算法的过程,方便大家对照接下来的工程实现。

  • Prepare阶段:

    (a) Proposer选择一个提案编号N,然后向半数以上的Acceptor发送编号为N的Prepare请求。

    (b) 如果一个Acceptor收到一个编号为N的Prepare请求,且N大于该Acceptor已经响应过的所有Prepare请求的编号,那么它就会将它已经接受过的编号最大的提案(如果有的话)作为响应反馈给Proposer,同时该Acceptor承诺不再接受任何编号小于N的提案。

  • Accept阶段:

    (a) 如果Proposer收到半数以上Acceptor对其发出的编号为N的Prepare请求的响应,那么它就会发送一个针对[N,V]提案的Accept请求给半数以上的Acceptor。注意:V就是收到的响应中编号最大的提案的value,如果响应中不包含任何提案,那么V就由Proposer自己决定。

    (b) 如果Acceptor收到一个针对编号为N的提案的Accept请求,只要该Acceptor没有对编号大于N的Prepare请求做出过响应,它就接受该提案。

Proposer

因为Proposer需要维护或者说记录一些状态信息,包括自己的提案编号ProposalID、提出的Value、其他Proposer提出的最大的提案编号HighestOtherProposalID、Acceptor已经接受过的编号最大的提案的值等,因此这里专门有一个ProposerState类来管理这些信息。同样Acceptor也有一个AcceptorState类来管理Acceptor相关的信息。

先来看下ProposerState的定义:

class ProposerState
{
public:
ProposerState(const Config * poConfig);
~ProposerState();
void Init();
void SetStartProposalID(const uint64_t llProposalID);
void NewPrepare();
void AddPreAcceptValue(const BallotNumber & oOtherPreAcceptBallot, const std::string & sOtherPreAcceptValue);
/////////////////////////
const uint64_t GetProposalID();
const std::string & GetValue();
void SetValue(const std::string & sValue);
void SetOtherProposalID(const uint64_t llOtherProposalID);
void ResetHighestOtherPreAcceptBallot();
public:
uint64_t m_llProposalID;
uint64_t m_llHighestOtherProposalID;
std::string m_sValue;
BallotNumber m_oHighestOtherPreAcceptBallot;
Config * m_poConfig;
};

基本都是对这些信息的set跟get,很容易理解。直接来看Proposer类的定义:

class Proposer : public Base
{
public:
Proposer(
const Config * poConfig,
const MsgTransport * poMsgTransport,
const Instance * poInstance,
const Learner * poLearner,
const IOLoop * poIOLoop);
~Proposer();
//设置起始的ProposalID
void SetStartProposalID(const uint64_t llProposalID);
//初始化新的一轮Paxos过程,每一轮叫做一个Paxos Instance,每一轮确定一个值
virtual void InitForNewPaxosInstance();
//Proposer发起提案的入口函数。参数sValue即Proposer自己想提出的value,当然最终提出的value不一定是这个,需要根据Acceptor再Prepare阶段的回复来确定
int NewValue(const std::string & sValue);
//判断Proposer是否处于Prepare阶段或Accept阶段
bool IsWorking();
/////////////////////////////
//对应Paxos算法中的Prepare阶段
void Prepare(const bool bNeedNewBallot = true);
//Prepare阶段等待Acceptor的回复,统计投票并确定是否进入Accept阶段
void OnPrepareReply(const PaxosMsg & oPaxosMsg);
//Prepare阶段被拒绝
void OnExpiredPrepareReply(const PaxosMsg & oPaxosMsg);
//对应Paxos算法中的Accept阶段
void Accept();
//Accept阶段等待Acceptor的回复,统计投票并确定值(Value)是否被选定(Chosen)
void OnAcceptReply(const PaxosMsg & oPaxosMsg);
//Accept阶段被拒绝
void OnExpiredAcceptReply(const PaxosMsg & oPaxosMsg);
//Prepare阶段超时
void OnPrepareTimeout();
//Accept阶段超时
void OnAcceptTimeout();
//退出Prepare阶段
void ExitPrepare();
//退出Accept阶段
void ExitAccept();
//取消跳过Prepare阶段,也就是必须先Prepare阶段再Accept阶段
void CancelSkipPrepare();
/////////////////////////////
void AddPrepareTimer(const int iTimeoutMs = 0);
void AddAcceptTimer(const int iTimeoutMs = 0);
public:
ProposerState m_oProposerState;
MsgCounter m_oMsgCounter;
Learner * m_poLearner;
bool m_bIsPreparing;
bool m_bIsAccepting;
IOLoop * m_poIOLoop;
uint32_t m_iPrepareTimerID;
int m_iLastPrepareTimeoutMs;
uint32_t m_iAcceptTimerID;
int m_iLastAcceptTimeoutMs;
uint64_t m_llTimeoutInstanceID;
bool m_bCanSkipPrepare;
bool m_bWasRejectBySomeone;
TimeStat m_oTimeStat;
};

NewValue

下面就从NewValue方法入手:

int Proposer :: NewValue(const std::string & sValue)
{
BP->GetProposerBP()->NewProposal(sValue);
if (m_oProposerState.GetValue().size() == 0)
{
m_oProposerState.SetValue(sValue);
}
m_iLastPrepareTimeoutMs = START_PREPARE_TIMEOUTMS;
m_iLastAcceptTimeoutMs = START_ACCEPT_TIMEOUTMS;
//如果可以跳过Prepare阶段并且没有被Acceptor拒绝过,则直接进入Accept阶段
if (m_bCanSkipPrepare && !m_bWasRejectBySomeone)
{
BP->GetProposerBP()->NewProposalSkipPrepare();
PLGHead("skip prepare, directly start accept");
Accept();
}
//否则先进入Prepare阶段
else
{
//if not reject by someone, no need to increase ballot
Prepare(m_bWasRejectBySomeone);
}
return 0;
}

这里可以直接进入Accept阶段的前提是该Proposer已经发起过Prepare请求且得到半数以上的同意(即通过了Prepare阶段),并且没有被任何Acceptor拒绝(说明没有Acceptor响应过比该Proposer的提案编号更高的提案)。那么,什么情况下可以跳过Prepare请求呢,这里应该对应的是选出一个master的情况?相当于raft里的leader?

Prepare

接下来直接看Prepare阶段:

void Proposer :: Prepare(const bool bNeedNewBallot)
{
PLGHead("START Now.InstanceID %lu MyNodeID %lu State.ProposalID %lu State.ValueLen %zu",
GetInstanceID(), m_poConfig->GetMyNodeID(), m_oProposerState.GetProposalID(),
m_oProposerState.GetValue().size());
BP->GetProposerBP()->Prepare();
m_oTimeStat.Point();
ExitAccept();
//表明Proposer正处于Prepare阶段
m_bIsPreparing = true;
//不能跳过Prepare阶段
m_bCanSkipPrepare = false;
//目前还未被任意一个Acceptor拒绝
m_bWasRejectBySomeone = false;
m_oProposerState.ResetHighestOtherPreAcceptBallot();
//如果需要产生新的投票,就调用NewPrepare产生新的ProposalID,新的ProposalID为当前已知的最大ProposalID+1
if (bNeedNewBallot)
{
m_oProposerState.NewPrepare();
}
PaxosMsg oPaxosMsg;
//设置Prepare消息的各个字段
oPaxosMsg.set_msgtype(MsgType_PaxosPrepare);
oPaxosMsg.set_instanceid(GetInstanceID());
oPaxosMsg.set_nodeid(m_poConfig->GetMyNodeID());
oPaxosMsg.set_proposalid(m_oProposerState.GetProposalID());
//MsgCount是专门用来统计票数的,根据计算的结果确定是否通过Prepare阶段或者Accept阶段
m_oMsgCounter.StartNewRound();
//Prepare超时定时器
AddPrepareTimer();
PLGHead("END OK");
//将Prepare消息发送到各个节点
BroadcastMessage(oPaxosMsg);
}

Proposer在Prepare阶段主要做了这么几件事:

  1. 重置各个状态位,表明当前正处于Prepare阶段。
  2. 获取提案编号ProposalID。当bNeedNewBallot为true时需要将ProposalID+1。否则沿用之前的ProposalID。bNeedNewBallot是在NewValue中调用Prepare方法时传入的m_bWasRejectBySomeone参数。也就是如果之前没有被任何Acceptor拒绝(说明还没有明确出现更大的ProposalID),则不需要获取新的ProposalID。对应的场景是Prepare阶段超时了,在超时时间内没有收到过半Acceptor同意的消息,因此需要重新执行Prepare阶段,此时只需要沿用原来的ProposalID即可。
  3. 发送Prepare请求。该请求PaxosMsg是Protocol Buffer定义的一个message,包含MsgType、InstanceID、NodeID、ProposalID等字段。在BroadcastMessage(oPaxosMsg)中还会将oPaxosMsg序列化后才发送出去。

PaxosMsg的定义如下,Prepare和Accept阶段Proposer和Acceptor的所有消息都用PaxosMsg来表示:

message PaxosMsg
{
required int32 MsgType = 1;
optional uint64 InstanceID = 2;
optional uint64 NodeID = 3;
optional uint64 ProposalID = 4;
optional uint64 ProposalNodeID = 5;
optional bytes Value = 6;
optional uint64 PreAcceptID = 7;
optional uint64 PreAcceptNodeID = 8;
optional uint64 RejectByPromiseID = 9;
optional uint64 NowInstanceID = 10;
optional uint64 MinChosenInstanceID = 11;
optional uint32 LastChecksum = 12;
optional uint32 Flag = 13;
optional bytes SystemVariables = 14;
optional bytes MasterVariables = 15;
};

OnPrepareReply

Proposer发出Prepare请求后就开始等待Acceptor的回复。当Proposer所在节点收到PaxosPrepareReply消息后,就会调用Proposer的OnPrepareReply(oPaxosMsg),其中oPaxosMsg是Acceptor回复的消息。

void Proposer :: OnPrepareReply(const PaxosMsg & oPaxosMsg)
{
PLGHead("START Msg.ProposalID %lu State.ProposalID %lu Msg.from_nodeid %lu RejectByPromiseID %lu",
oPaxosMsg.proposalid(), m_oProposerState.GetProposalID(),
oPaxosMsg.nodeid(), oPaxosMsg.rejectbypromiseid());
BP->GetProposerBP()->OnPrepareReply();
//如果Proposer不是在Prepare阶段,则忽略该消息
if (!m_bIsPreparing)
{
BP->GetProposerBP()->OnPrepareReplyButNotPreparing();
//PLGErr("Not preparing, skip this msg");
return;
}
//如果ProposalID不同,也忽略
if (oPaxosMsg.proposalid() != m_oProposerState.GetProposalID())
{
BP->GetProposerBP()->OnPrepareReplyNotSameProposalIDMsg();
//PLGErr("ProposalID not same, skip this msg");
return;
}
//加入一个收到的消息,用于MsgCounter统计
m_oMsgCounter.AddReceive(oPaxosMsg.nodeid());
//如果该消息不是拒绝,即Acceptor同意本次Prepare请求
if (oPaxosMsg.rejectbypromiseid() == 0)
{
BallotNumber oBallot(oPaxosMsg.preacceptid(), oPaxosMsg.preacceptnodeid());
PLGDebug("[Promise] PreAcceptedID %lu PreAcceptedNodeID %lu ValueSize %zu",
oPaxosMsg.preacceptid(), oPaxosMsg.preacceptnodeid(), oPaxosMsg.value().size());
//加入MsgCounter用于统计投票
m_oMsgCounter.AddPromiseOrAccept(oPaxosMsg.nodeid());
//将Acceptor返回的它接受过的编号最大的提案记录下来(如果有的话),用于确定Accept阶段的Value
m_oProposerState.AddPreAcceptValue(oBallot, oPaxosMsg.value());
}
//Acceptor拒绝了Prepare请求
else
{
PLGDebug("[Reject] RejectByPromiseID %lu", oPaxosMsg.rejectbypromiseid());
//同样也要记录到MsgCounter用于统计投票
m_oMsgCounter.AddReject(oPaxosMsg.nodeid());
//记录被Acceptor拒绝过,待会儿如果重新进入Prepare阶段的话就需要获取更大的ProposalID
m_bWasRejectBySomeone = true;
//记录下别的Proposer提出的更大的ProposalID。这样重新发起Prepare请求时才知道需要用多大的ProposalID
m_oProposerState.SetOtherProposalID(oPaxosMsg.rejectbypromiseid());
}
//本次Prepare请求通过了。也就是得到了半数以上Acceptor的同意
if (m_oMsgCounter.IsPassedOnThisRound())
{
int iUseTimeMs = m_oTimeStat.Point();
BP->GetProposerBP()->PreparePass(iUseTimeMs);
PLGImp("[Pass] start accept, usetime %dms", iUseTimeMs);
m_bCanSkipPrepare = true;
//进入Accept阶段
Accept();
}
//本次Prepare请求没有通过
else if (m_oMsgCounter.IsRejectedOnThisRound()
|| m_oMsgCounter.IsAllReceiveOnThisRound())
{
BP->GetProposerBP()->PrepareNotPass();
PLGImp("[Not Pass] wait 30ms and restart prepare");
//随机等待一段时间后重新发起Prepare请求
AddPrepareTimer(OtherUtils::FastRand() % 30 + 10);
}
PLGHead("END");
}

该阶段Proposer主要做了以下事情:

  1. 判断消息是否有效。包括ProposalID是否相同,自身是否处于Prepare阶段等。因为网络是不可靠的,有些消息可能延迟很久,等收到的时候已经不需要了,所以需要做这些判断。

  2. 将收到的消息加入MsgCounter用于统计。

  3. 根据收到的消息更新自身状态。包括Acceptor承诺过的ProposalID,以及Acceptor接受过的编号最大的提案等。

  4. 根据MsgCounter统计的Acceptor投票结果决定是进入Acceptor阶段还是重新发起Prepare请求。这里如果判断需要重新发起Prepare请求的话,也不是立即进行,而是等待一段随机的时间,这样做的好处是减少不同Proposer之间的冲突,采取的策略跟raft中leader选举冲突时在一段随机的选举超时时间后重新发起选举的做法类似。

注:这里跟Paxos算法中提案编号对应的并不是ProposalID,而是BallotNumber。BallotNumber由ProposalID和NodeID组成。还实现了运算符重载。如果ProposalID大,则BallotNumber(即提案编号)大。在ProposalID相同的情况下,NodeID大的BallotNumber大。

Accept

接下来Proposer就进入Accept阶段:

void Proposer :: Accept()
{
PLGHead("START ProposalID %lu ValueSize %zu ValueLen %zu",
m_oProposerState.GetProposalID(), m_oProposerState.GetValue().size(), m_oProposerState.GetValue().size());
BP->GetProposerBP()->Accept();
m_oTimeStat.Point();
ExitPrepare();
m_bIsAccepting = true;
//设置Accept请求的消息内容
PaxosMsg oPaxosMsg;
oPaxosMsg.set_msgtype(MsgType_PaxosAccept);
oPaxosMsg.set_instanceid(GetInstanceID());
oPaxosMsg.set_nodeid(m_poConfig->GetMyNodeID());
oPaxosMsg.set_proposalid(m_oProposerState.GetProposalID());
oPaxosMsg.set_value(m_oProposerState.GetValue());
oPaxosMsg.set_lastchecksum(GetLastChecksum());
m_oMsgCounter.StartNewRound();
AddAcceptTimer();
PLGHead("END");
//发给各个节点
BroadcastMessage(oPaxosMsg, BroadcastMessage_Type_RunSelf_Final);
}

Accept请求中PaxosMsg里的Value是这样确定的:如果Prepare阶段有Acceptor的回复中带有提案值,则该Value为所有的Acceptor的回复中,编号最大的提案的值。否则就是Proposer在最初调用NewValue时传入的值。

OnAcceptReply

void Proposer :: OnAcceptReply(const PaxosMsg & oPaxosMsg)
{
PLGHead("START Msg.ProposalID %lu State.ProposalID %lu Msg.from_nodeid %lu RejectByPromiseID %lu",
oPaxosMsg.proposalid(), m_oProposerState.GetProposalID(),
oPaxosMsg.nodeid(), oPaxosMsg.rejectbypromiseid());
BP->GetProposerBP()->OnAcceptReply();
if (!m_bIsAccepting)
{
//PLGErr("Not proposing, skip this msg");
BP->GetProposerBP()->OnAcceptReplyButNotAccepting();
return;
}
if (oPaxosMsg.proposalid() != m_oProposerState.GetProposalID())
{
//PLGErr("ProposalID not same, skip this msg");
BP->GetProposerBP()->OnAcceptReplyNotSameProposalIDMsg();
return;
}
m_oMsgCounter.AddReceive(oPaxosMsg.nodeid());
if (oPaxosMsg.rejectbypromiseid() == 0)
{
PLGDebug("[Accept]");
m_oMsgCounter.AddPromiseOrAccept(oPaxosMsg.nodeid());
}
else
{
PLGDebug("[Reject]");
m_oMsgCounter.AddReject(oPaxosMsg.nodeid());
m_bWasRejectBySomeone = true;
m_oProposerState.SetOtherProposalID(oPaxosMsg.rejectbypromiseid());
}
if (m_oMsgCounter.IsPassedOnThisRound())
{
int iUseTimeMs = m_oTimeStat.Point();
BP->GetProposerBP()->AcceptPass(iUseTimeMs);
PLGImp("[Pass] Start send learn, usetime %dms", iUseTimeMs);
ExitAccept();
//让Leaner学习被选定(Chosen)的值
m_poLearner->ProposerSendSuccess(GetInstanceID(), m_oProposerState.GetProposalID());
}
else if (m_oMsgCounter.IsRejectedOnThisRound()
|| m_oMsgCounter.IsAllReceiveOnThisRound())
{
BP->GetProposerBP()->AcceptNotPass();
PLGImp("[Not pass] wait 30ms and Restart prepare");
AddAcceptTimer(OtherUtils::FastRand() % 30 + 10);
}
PLGHead("END");
}

这里跟OnPrepareReply的过程基本一致,因此就不加太多注释了。比较大的区别在于最后如果过半的Acceptor接受了该Accept请求,则说明该Value被选定(Chosen)了,就发送消息,让每个节点上的Learner学习该Value。因为Leaner不是本文的重点,这里就不详细介绍了。

Acceptor

Acceptor的逻辑比Proposer更简单。同样先看它的定义:

class Acceptor : public Base
{
public:
Acceptor(
const Config * poConfig,
const MsgTransport * poMsgTransport,
const Instance * poInstance,
const LogStorage * poLogStorage);
~Acceptor();
virtual void InitForNewPaxosInstance();
int Init();
AcceptorState * GetAcceptorState();
//Prepare阶段回复Prepare请求
int OnPrepare(const PaxosMsg & oPaxosMsg);
//Accept阶段回复Accept请求
void OnAccept(const PaxosMsg & oPaxosMsg);
//private:
AcceptorState m_oAcceptorState;
};

OnPrepare

OnPrepare用于处理收到的Prepare请求,逻辑如下:

int Acceptor :: OnPrepare(const PaxosMsg & oPaxosMsg)
{
PLGHead("START Msg.InstanceID %lu Msg.from_nodeid %lu Msg.ProposalID %lu",
oPaxosMsg.instanceid(), oPaxosMsg.nodeid(), oPaxosMsg.proposalid());
BP->GetAcceptorBP()->OnPrepare();
PaxosMsg oReplyPaxosMsg;
oReplyPaxosMsg.set_instanceid(GetInstanceID());
oReplyPaxosMsg.set_nodeid(m_poConfig->GetMyNodeID());
oReplyPaxosMsg.set_proposalid(oPaxosMsg.proposalid());
oReplyPaxosMsg.set_msgtype(MsgType_PaxosPrepareReply);
//构造接收到的Prepare请求里的提案编号
BallotNumber oBallot(oPaxosMsg.proposalid(), oPaxosMsg.nodeid());
//提案编号大于承诺过的提案编号
if (oBallot >= m_oAcceptorState.GetPromiseBallot())
{
PLGDebug("[Promise] State.PromiseID %lu State.PromiseNodeID %lu "
"State.PreAcceptedID %lu State.PreAcceptedNodeID %lu",
m_oAcceptorState.GetPromiseBallot().m_llProposalID,
m_oAcceptorState.GetPromiseBallot().m_llNodeID,
m_oAcceptorState.GetAcceptedBallot().m_llProposalID,
m_oAcceptorState.GetAcceptedBallot().m_llNodeID);
//返回之前接受过的提案的编号
oReplyPaxosMsg.set_preacceptid(m_oAcceptorState.GetAcceptedBallot().m_llProposalID);
oReplyPaxosMsg.set_preacceptnodeid(m_oAcceptorState.GetAcceptedBallot().m_llNodeID);
//如果接受过的提案编号大于0(<=0说明没有接受过提案),则设置接受过的提案的Value
if (m_oAcceptorState.GetAcceptedBallot().m_llProposalID > 0)
{
oReplyPaxosMsg.set_value(m_oAcceptorState.GetAcceptedValue());
}
//更新承诺的提案编号为新的提案编号(因为新的提案编号更大)
m_oAcceptorState.SetPromiseBallot(oBallot);
//信息持久化
int ret = m_oAcceptorState.Persist(GetInstanceID(), GetLastChecksum());
if (ret != 0)
{
BP->GetAcceptorBP()->OnPreparePersistFail();
PLGErr("Persist fail, Now.InstanceID %lu ret %d",
GetInstanceID(), ret);
return -1;
}
BP->GetAcceptorBP()->OnPreparePass();
}
//提案编号小于承诺过的提案编号,需要拒绝
else
{
BP->GetAcceptorBP()->OnPrepareReject();
PLGDebug("[Reject] State.PromiseID %lu State.PromiseNodeID %lu",
m_oAcceptorState.GetPromiseBallot().m_llProposalID,
m_oAcceptorState.GetPromiseBallot().m_llNodeID);
//拒绝该Prepare请求,并返回承诺过的ProposalID
oReplyPaxosMsg.set_rejectbypromiseid(m_oAcceptorState.GetPromiseBallot().m_llProposalID);
}
nodeid_t iReplyNodeID = oPaxosMsg.nodeid();
PLGHead("END Now.InstanceID %lu ReplyNodeID %lu",
GetInstanceID(), oPaxosMsg.nodeid());;
//向发出Prepare请求的Proposer回复消息
SendMessage(iReplyNodeID, oReplyPaxosMsg);
return 0;
}

OnAccept

再来看看OnAccept:

void Acceptor :: OnAccept(const PaxosMsg & oPaxosMsg)
{
PLGHead("START Msg.InstanceID %lu Msg.from_nodeid %lu Msg.ProposalID %lu Msg.ValueLen %zu",
oPaxosMsg.instanceid(), oPaxosMsg.nodeid(), oPaxosMsg.proposalid(), oPaxosMsg.value().size());
BP->GetAcceptorBP()->OnAccept();
PaxosMsg oReplyPaxosMsg;
oReplyPaxosMsg.set_instanceid(GetInstanceID());
oReplyPaxosMsg.set_nodeid(m_poConfig->GetMyNodeID());
oReplyPaxosMsg.set_proposalid(oPaxosMsg.proposalid());
oReplyPaxosMsg.set_msgtype(MsgType_PaxosAcceptReply);
BallotNumber oBallot(oPaxosMsg.proposalid(), oPaxosMsg.nodeid());
//提案编号不小于承诺过的提案编号(注意:这里是“>=”,而再OnPrepare中是“>”,可以先思考下为什么),需要接受该提案
if (oBallot >= m_oAcceptorState.GetPromiseBallot())
{
PLGDebug("[Promise] State.PromiseID %lu State.PromiseNodeID %lu "
"State.PreAcceptedID %lu State.PreAcceptedNodeID %lu",
m_oAcceptorState.GetPromiseBallot().m_llProposalID,
m_oAcceptorState.GetPromiseBallot().m_llNodeID,
m_oAcceptorState.GetAcceptedBallot().m_llProposalID,
m_oAcceptorState.GetAcceptedBallot().m_llNodeID);
//更新承诺的提案编号;接受的提案编号、提案值
m_oAcceptorState.SetPromiseBallot(oBallot);
m_oAcceptorState.SetAcceptedBallot(oBallot);
m_oAcceptorState.SetAcceptedValue(oPaxosMsg.value());
//信息持久化
int ret = m_oAcceptorState.Persist(GetInstanceID(), GetLastChecksum());
if (ret != 0)
{
BP->GetAcceptorBP()->OnAcceptPersistFail();
PLGErr("Persist fail, Now.InstanceID %lu ret %d",
GetInstanceID(), ret);
return;
}
BP->GetAcceptorBP()->OnAcceptPass();
}
//需要拒绝该提案
else
{
BP->GetAcceptorBP()->OnAcceptReject();
PLGDebug("[Reject] State.PromiseID %lu State.PromiseNodeID %lu",
m_oAcceptorState.GetPromiseBallot().m_llProposalID,
m_oAcceptorState.GetPromiseBallot().m_llNodeID);
//拒绝的消息中附上承诺过的ProposalID
oReplyPaxosMsg.set_rejectbypromiseid(m_oAcceptorState.GetPromiseBallot().m_llProposalID);
}
nodeid_t iReplyNodeID = oPaxosMsg.nodeid();
PLGHead("END Now.InstanceID %lu ReplyNodeID %lu",
GetInstanceID(), oPaxosMsg.nodeid());
//将响应发送给Proposer
SendMessage(iReplyNodeID, oReplyPaxosMsg);
}

结语

通过阅读源码可以发现,整个PhxPaxos完全基于Lamport的《Paxos Made Simple》进行工程化,没有进行任何算法变种。这对于学习Paxos算法的人来说真的是一笔宝贵的财富,所以如果对Paxos算法感兴趣,应该深入地去阅读PhxPaxos的源码,相信看完后大家对Paxos会有更深的理解。同时我们也发现,在工程实现上还是有很多细节需要注意,这比单纯理解算法要难得多。

欢迎关注公众号: FullStackPlan 获取更多干货

Copyright © 2016 - 2017 LBD's Blog All Rights Reserved.

访客数 : | 访问量 :