Code
The Python source code corresponding to the pseudo-code we present in this website is available in the following git repository:
https://github.com/denizalti/paxosmmc
You can clone the repo as follows:
$ git clone https://github.com/denizalti/paxosmmc.git
The source code includes the Replica, Acceptor, Commander, Scout and Leader codes as well as Utilities, Message, Process and Environment codes that implement the functionality to run the processes.
Replica
A replica runs in an infinite loop, receiving messages. Replicas receive two kinds of messages: requests and decisions. When it receives a request for command c from a client, the replica adds the request to set requests. Next, the replica invokes the function propose().
Function propose() tries to transfer requests from the set requests to proposals. It uses slot_in to look for unused slots within the window of slots with known configurations. For each such slot s, it first checks if the configuration for s is different from the prior slot by checking if the decision in slot s−WINDOW is a reconfiguration command. If so, the function updates the set of leaders for slot s. Then the function removes a request r from requests and adds proposal (s,r) to the set proposals. Finally, it sends a ⟨propose, s, c⟩ message to all leaders in the configuration of slot s.
Decisions may arrive out-of-order and multiple times. For each decision message, the replica adds the decision to the set decisions. Then, in a loop, it considers which decisions are ready for execution before trying to receive more messages. If there is a decision c’ corresponding to the current slot_out, the replica first checks to see if it has proposed a command c’’ for that slot. If so, the replica removes ⟨slot_out,c’‘⟩ from the set proposals. If c’‘≠ c’, that is, the replica proposed a different command for that slot, the replica returns c’’ to set requests so c’’ is proposed again at a later time. Next, the replica invokes perform(c’).
The function perform() is invoked with the same sequence of commands at all replicas. First, it checks to see if it has already performed the command. Different replicas may end up proposing the same command for different slots, and thus the same command may be decided multiple times. The corresponding operation is evaluated only if the command is new and it is not a reconfiguration request. If so, perform() applies the requested operation to the application state. In either case, the function increments slot_out.
from process import Process
from message import ProposeMessage,DecisionMessage,RequestMessage
from utils import *
import time
class Replica(Process):
def __init__(self, env, id, config):
Process.__init__(self, env, id)
self.slot_in = self.slot_out = 1
self.proposals = {}
self.decisions = {}
self.requests = []
self.config = config
self.env.addProc(self)
def propose(self):
while len(self.requests) != 0 and self.slot_in < self.slot_out+WINDOW:
if self.slot_in > WINDOW and self.slot_in-WINDOW in self.decisions:
if isinstance(self.decisions[self.slot_in-WINDOW],ReconfigCommand):
r,a,l = self.decisions[self.slot_in-WINDOW].config.split(';')
self.config = Config(r.split(','),a.split(','),l.split(','))
print self.id, ": new config:", self.config
if self.slot_in not in self.decisions:
cmd = self.requests.pop(0)
self.proposals[self.slot_in] = cmd
for ldr in self.config.leaders:
self.sendMessage(ldr, ProposeMessage(self.id,self.slot_in,cmd))
self.slot_in +=1
def perform(self, cmd):
for s in range(1, self.slot_out):
if self.decisions[s] == cmd:
self.slot_out += 1
return
if isinstance(cmd, ReconfigCommand):
self.slot_out += 1
return
print self.id, ": perform",self.slot_out, ":", cmd
self.slot_out += 1
def body(self):
print "Here I am: ", self.id
while True:
msg = self.getNextMessage()
if isinstance(msg, RequestMessage):
self.requests.append(msg.command)
elif isinstance(msg, DecisionMessage):
self.decisions[msg.slot_number] = msg.command
while self.slot_out in self.decisions:
if self.slot_out in self.proposals:
if self.proposals[self.slot_out]!=self.decisions[self.slot_out]:
self.requests.append(self.proposals[self.slot_out])
del self.proposals[self.slot_out]
self.perform(self.decisions[self.slot_out])
else:
print "Replica: unknown msg type"
self.propose()
Acceptor
An acceptor runs in an infinite loop, receiving two kinds of request messages from leaders:
-
P1aMessage: Upon receiving a “Phase 1a” request message from a leader for a ballot number, an acceptor makes the following transition. First, the acceptor adopts msg.ballot_number if and only if it exceeds its current ballot number. Then it returns to the leader a “Phase 1b” response message containing its current ballot number and all pvalues accepted thus far by the acceptor.
-
P2aMessage: Upon receiving a “Phase 2a” request message from a leader with a pvalue, an acceptor makes the following transition. If msg.ballot_number equals the current ballot number, then the acceptor accepts the pvalue. The acceptor returns to the leader a “Phase 2b” response message containing its current ballot number.
from utils import PValue
from process import Process
from message import P1aMessage, P1bMessage, P2aMessage, P2bMessage
class Acceptor(Process):
def __init__(self, env, id):
Process.__init__(self, env, id)
self.ballot_number = None
self.accepted = set()
self.env.addProc(self)
def body(self):
print "Here I am: ", self.id
while True:
msg = self.getNextMessage()
if isinstance(msg, P1aMessage):
if msg.ballot_number > self.ballot_number:
self.ballot_number = msg.ballot_number
self.sendMessage(msg.src,
P1bMessage(self.id,
self.ballot_number,
self.accepted))
elif isinstance(msg, P2aMessage):
if msg.ballot_number == self.ballot_number:
self.accepted.add(PValue(msg.ballot_number,
msg.slot_number,
msg.command))
self.sendMessage(msg.src,
P2bMessage(self.id,
self.ballot_number,
msg.slot_number))
Scout
Scouts send and track the P1a and P1b messages, handling the first part of a round.
from process import Process
from message import P1aMessage, P1bMessage, PreemptedMessage, AdoptedMessage
class Scout(Process):
def __init__(self, env, id, leader, acceptors, ballot_number):
Process.__init__(self, env, id)
self.leader = leader
self.acceptors = acceptors
self.ballot_number = ballot_number
self.env.addProc(self)
def body(self):
waitfor = set()
for a in self.acceptors:
self.sendMessage(a, P1aMessage(self.id, self.ballot_number))
waitfor.add(a)
pvalues = set()
while True:
msg = self.getNextMessage()
if isinstance(msg, P1bMessage):
if self.ballot_number == msg.ballot_number and msg.src in waitfor:
pvalues.update(msg.accepted)
waitfor.remove(msg.src)
if len(waitfor) < float(len(self.acceptors))/2:
self.sendMessage(self.leader,
AdoptedMessage(self.id,
self.ballot_number,
pvalues))
return
else:
self.sendMessage(self.leader,
PreemptedMessage(self.id,
msg.ballot_number))
return
else:
print "Scout: unexpected msg"
Commander
Commanders send and track the P2a and P2b messages, handling the first part of a round.
from message import P2aMessage, P2bMessage, PreemptedMessage, DecisionMessage from process import Process from utils import Command
class Commander(Process):
def __init__(self, env, id, leader, acceptors, replicas,
ballot_number, slot_number, command):
Process.__init__(self, env, id)
self.leader = leader
self.acceptors = acceptors
self.replicas = replicas
self.ballot_number = ballot_number
self.slot_number = slot_number
self.command = command
self.env.addProc(self)
def body(self):
waitfor = set()
for a in self.acceptors:
self.sendMessage(a, P2aMessage(self.id, self.ballot_number,
self.slot_number, self.command))
waitfor.add(a)
while True:
msg = self.getNextMessage()
if isinstance(msg, P2bMessage):
if self.ballot_number == msg.ballot_number and msg.src in waitfor:
waitfor.remove(msg.src)
if len(waitfor) < float(len(self.acceptors))/2:
for r in self.replicas:
self.sendMessage(r, DecisionMessage(self.id,
self.slot_number,
self.command))
return
else:
self.sendMessage(self.leader, PreemptedMessage(self.id, msg.ballot_number))
return
Leader
The Leader code is executed by the Replica that becomes the leader for a given round after the P1 and P2 rounds are completed successfully.
from utils import BallotNumber
from process import Process
from commander import Commander
from scout import Scout
from message import ProposeMessage,AdoptedMessage,PreemptedMessage
class Leader(Process):
def __init__(self, env, id, config):
Process.__init__(self, env, id)
self.ballot_number = BallotNumber(0, self.id)
self.active = False
self.proposals = {}
self.config = config
self.env.addProc(self)
def body(self):
print "Here I am: ", self.id
Scout(self.env, "scout:%s:%s" % (str(self.id), str(self.ballot_number)),
self.id, self.config.acceptors, self.ballot_number)
while True:
msg = self.getNextMessage()
if isinstance(msg, ProposeMessage):
if msg.slot_number not in self.proposals:
self.proposals[msg.slot_number] = msg.command
if self.active:
Commander(self.env,
"commander:%s:%s:%s" % (str(self.id),
str(self.ballot_number),
str(msg.slot_number)),
self.id, self.config.acceptors, self.config.replicas,
self.ballot_number, msg.slot_number, msg.command)
elif isinstance(msg, AdoptedMessage):
if self.ballot_number == msg.ballot_number:
pmax = {}
for pv in msg.accepted:
if pv.slot_number not in pmax or \
pmax[pv.slot_number] < pv.ballot_number:
pmax[pv.slot_number] = pv.ballot_number
self.proposals[pv.slot_number] = pv.command
for sn in self.proposals:
Commander(self.env,
"commander:%s:%s:%s" % (str(self.id),
str(self.ballot_number),
str(sn)),
self.id, self.config.acceptors, self.config.replicas,
self.ballot_number, sn, self.proposals.get(sn))
self.active = True
elif isinstance(msg, PreemptedMessage):
if msg.ballot_number > self.ballot_number:
self.ballot_number = BallotNumber(msg.ballot_number.round+1,
self.id)
Scout(self.env, "scout:%s:%s" % (str(self.id),
str(self.ballot_number)),
self.id, self.config.acceptors, self.ballot_number)
self.active = False
else:
print "Leader: unknown msg type"
Environment
This is the main code in which all processes are created and run. This code also simulates a set of clients submitting requests.
import os, signal, sys, time
from acceptor import Acceptor
from leader import Leader
from message import RequestMessage
from process import Process
from replica import Replica
from utils import *
NACCEPTORS = 3
NREPLICAS = 2
NLEADERS = 2
NREQUESTS = 10
NCONFIGS = 2
class Env:
def __init__(self):
self.procs = {}
def sendMessage(self, dst, msg):
if dst in self.procs:
self.procs[dst].deliver(msg)
def addProc(self, proc):
self.procs[proc.id] = proc
proc.start()
def removeProc(self, pid):
del self.procs[pid]
def run(self):
initialconfig = Config([], [], [])
c = 0
for i in range(NREPLICAS):
pid = "replica: %d" % i
Replica(self, pid, initialconfig)
initialconfig.replicas.append(pid)
for i in range(NACCEPTORS):
pid = "acceptor: %d.%d" % (c,i)
Acceptor(self, pid)
initialconfig.acceptors.append(pid)
for i in range(NLEADERS):
pid = "leader: %d.%d" % (c,i)
Leader(self, pid, initialconfig)
initialconfig.leaders.append(pid)
for i in r in initialconfig.replicas:
cmd = Command(pid,0,"operation %d.%d" %.sleep(1)
for c in range(1, NCONFIGS):
# Create new configuration
config = Config(initialconfig.replicas, [], [])
for i in range(NACCEPTORS):
pid = "acceptor: %d.%d" % (c,i)
Acceptor(self, pid)
config.acceptors.append(pid)
for i in range(NLEADERS):
pid = "leader: %d.%d" % (c,i)
Leader(self, pid, config)
config.leaders.append(pid)
# Send reconfiguration request
for r in config.replicas:
pid = "master: %d.%d" % (c,i)
cmd = ReconfigCommand(pid,0,str(config))
self.sendMessage(r, RequestMessage(pid, cmd))
time.sleep(1)
for i in range(WINDOW-1):
pid = "master: %d.%d" % (c,i)
for r in config.replicas:
cmd = Command(pid,0,"operation noop")
self.sendMessage(r, RequestMessage(pid, cmd))
time.sleep(1)
for i in range(NREQUESTS):
pid = "client: %d.%d" % (c,i)
for r inconfig.replicas:
cmd = Command(pid,0,(1)
def terminate_handler(self, signal, frame):
self._graceexit()
def _graceexit(self, exitcode=0):
sys.stdout.flush()
sys.stderr.flush()
os._exit(exitcode)
def main():
e = Env()
e.run()
signal.signal(signal.SIGINT, e.terminate_handler)
signal.signal(signal.SIGTERM, e.terminate_handler)
signal.pause()
if __name__=='__main__':
main()
Process
A process is a thread with a process identifier, a queue of incoming messages, and an “environment” that keeps track of all processes and queues.
import multiprocessing
from threading import Thread
class Process(Thread):
def __init__(self, env, id):
super(Process, self).__init__()
self.inbox = multiprocessing.Manager().Queue()
self.env = env
self.id = id
def run(self):
try:
self.body()
self.env.removeProc(self.id)
except EOFError:
print "Exiting.."
def getNextMessage(self):
return self.inbox.get()
def sendMessage(self, dst, msg):
self.env.sendMessage(dst, msg)
def deliver(self, msg):
self.inbox.put(msg)
#Message Paxos uses a large variety of message types. They are collected below.
class Message:
def __init__(self, src):
self.src = src
def __str__(self):
return str(self.__dict__)
class P1aMessage(Message):
def __init__(self, src, ballot_number):
Message.__init__(self, src)
self.ballot_number = ballot_number
class P1bMessage(Message):
def __init__(self, src, ballot_number, accepted):
Message.__init__(self, src)
self.ballot_number = ballot_number
self.accepted = accepted
class P2aMessage(Message):
def __init__(self, src, ballot_number, slot_number, command):
Message.__init__(self, src)
self.ballot_number = ballot_number
self.slot_number = slot_number
self.command = command
class P2bMessage(Message):
def __init__(self, src, ballot_number, slot_number):
Message.__init__(self, src)
self.ballot_number = ballot_number
self.slot_number = slot_number
class PreemptedMessage(Message):
def __init__(self, src, ballot_number):
Message.__init__(self, src)
self.ballot_number = ballot_number
class AdoptedMessage(Message):
def __init__(self, src, ballot_number, accepted):
Message.__init__(self, src)
self.ballot_number = ballot_number
self.accepted = accepted
class DecisionMessage(= slot_number
self.command = command
class RequestMessage(Message):
def __init__(self, src, command):
Message.__init__(self, src)
self.command = command
class ProposeMessage(Message):
def __init__(self, src, slot_number, command):
Message.__init__(self, src)
self.slot_number = slot_number
self.command = command
Utilities
A ballot number is a lexicographically ordered pair of an integer and the identifier of the ballot’s leader. A pvalue consists of a ballot number, a slot number, and a command. A command consists of the process identifier of the client submitting the request, a client-local request identifier, and an operation (which can be anything). A reconfiguration command consists of the process identifier of the client submitting the request, a client-local request identifier, and a configuration. A configuration consists of a list of replicas, a list of acceptors and a list of leaders.
from collections import namedtuple
WINDOW = 5
class BallotNumber(namedtuple('BallotNumber',['round','leader_id'])):
__slots__ = ()
def __str__(self):
return "BN(%d,%s)" % (self.round, str(self.leader_id))
class PValue(namedtuple('PValue',['ballot_number','slot_number','command'])):
__slots__ = ()
def __str__(self):
return "PV(%s,%s,%s)" % (str(self.ballot_number),
str(self.slot_number),
str(self.command))
class Command(namedtuple('Command',['client','req_id','op'])):
__slots__ = ()
def __str__(self):
return "Command(%s,%s,%s)" % (str(self.client),
str(self.req_id),
str(self.op))
class ReconfigCommand(namedtuple('ReconfigCommand',['client','req_id','config'])):
__slots__ = ()
def __str__(self):
return "ReconfigCommand(%s,%s,%s)" % (str(self.client),
str(self.req_id),
str(self.config))
class Config(namedtuple('Config',['replicas','acceptors','leaders'])):
__slots__ = ()
def __str__(self):
return "%s;%s;self.leaders))