A more general implementation of Lamport's Byzantine Consensus algorithm in Python
Marton Trencseni - Tue 12 August 2025 - Programming
Introduction
In the previous article I introduced Lamport's famous Byzantine Generals problem and implemented his "oral messages" algorithm in Python. In this follow-up I will discuss and implement software engineering generalizations and optimizations of the same algorithm:
- support any of the nodes initiating the consensus algorithm, i.e. acting as the king (not just node 0)
- support arbitrary values in the consensus algorithm (not just
attack
andretreat
) - support multiple rounds of the Byzantine Consensus algorithm, potentially running at the same time
- support timeouts
The code is on Github: mtrencseni/dca/byzantine/multi
Core algorithm
The first step is to abstract out the core Byzantine Consensus algorithm, with implementation-specific details such as networking and specific values (attack, retreat
) factored out. This is the ByzantineConsensus
class:
class ByzantineConsensus:
def __init__(self, *, node_id: int, n: int, m: int,
majority_func, # majority_func(iterable[str]) -> str
send_func, # send_func(target_id: int, msg: dict) -> None
next_value_func): # next_value_func(v: str) -> str
self.id = node_id
self.n = n
self.m = m
self._majority = majority_func
self._send = send_func
self._next_value = next_value_func
# state
self.received_values = {} # path-tuple -> value
self._done = False # message cascade finished?
self._value = None
# pre-compute expectation
self._total_expected = sum(self._expected_messages(k) for k in range(m + 1))
def all_messages_received(self) -> bool:
return len(self.received_values) >= self._total_expected
def is_done(self) -> bool:
return self._done
def value(self):
return self._value
def onmessage(self, msg: dict) -> None:
if self._done:
raise RuntimeError("node already finished this round")
path = tuple(msg["path"])
value = msg["value"]
k = 1 + self.m - len(path)
self.received_values[path] = value
# forward if required
if k > 0: # more stages to go
value = self._next_value(value)
if value is not None:
self._broadcast(path + (self.id,),
value,
k - 1)
# check completion
if self.all_messages_received():
self._done = True
def decide(self, timeout_default=None):
if self._value is not None:
return self._value
if not self._done and timeout_default is None:
raise RuntimeError("cannot decide before cascade finishes and without default value")
if len(self.received_values) == 0:
return timeout_default
root = min(self.received_values.keys(), key=len) # shortest path
root = [root[0]]
self._value = self._om(list(root), timeout_default)
return self._value
def start(self, initial_value: str) -> None:
self._value = initial_value
self._broadcast((self.id,), initial_value, self.m)
self._done = True # king never receives later messages
def _broadcast(self, path, value, k_remaining):
for i in range(self.n):
if i in path or i == self.id: # do not resend along path
continue
self._send(i, {"path": list(path), "value": value})
def _expected_messages(self, k):
return perm(3 * self.m + 1 - 2, self.m - k)
def _om(self, path, default_value):
k = 1 + self.m - len(path)
v = self.received_values.get(tuple(path), default_value)
if k == 0:
return v
child_vals = [self._om(path + [i], default_value)
for i in range(self.n)
if i not in path + [self.id]]
return self._majority([v] + child_vals)
The class takes the following values in its constructor:
node_id, n, m
majority_func()
send_func()
: the class uses this to send messages, abstracting away the implementation detailsnext_value_func()
: the class passes onnext_value_func(value)
in the message cascade, can be used to implement traitor behaviour
In order to simulate traitors not sending along anything, if next_value_func()
returns None
nothing is sent along the message cascade, and non-traitor nodes are forced to time out and assume default values. Note that the implementation does not assume that the king is node 0!
Node code
There are many ways to instantiate ByzantineConsensus
objects, here is one way:
def majority(values, tie_breaker=min):
if not values:
raise ValueError("majority() called with no values")
counter = Counter(values)
top_count = max(counter.values())
tied_values = [v for v, c in counter.items() if c == top_count]
# single clear winner
if len(tied_values) == 1:
return tied_values[0]
# tie -> delegate to tie-breaker
return tie_breaker(tied_values)
def async_order(target_id, msg):
def _post():
try: session.post(f"http://127.0.0.1:{8000+target_id}/order", json=msg)
except: pass
executor.submit(_post)
def traitor_timeout(v):
return None if is_traitor else v
def new_bcr(round_id):
return ByzantineConsensus(
node_id=node_id, n=n, m=m,
majority_func=majority,
send_func=lambda target_id, msg: async_order(target_id, {**msg, "round_id": round_id}),
next_value_func=traitor_timeout)
Here I defined a majority()
function that does not assume what values are used. async_order()
is the same as before, but a round_id
is injected into the message on sending, so several ByzantineConsensus
instances can send and receive messages at the same time. I use the traitor_timeout()
function for next_value_func()
, so traitors essentially stay silent during the whole algorithm.
Multiple rounds
To support multiple rounds, the driver sends along a round_id
when calling the general's /start
endpoint. This round_id
is then contained in all messages for that round of the consensus algorithm, and all generals create new instances of ByzantineConsensus
upon seeing a new round_id
.
The /start
endpoint is called by the driver on the king (for each round):
@app.route("/start", methods=["POST"])
def start():
msg = request.get_json()
if "round_id" not in msg:
return "specify round_id", 400
if "round_id" in bcr:
return "round_id already seen", 400
bcr[msg["round_id"]] = new_bcr(msg["round_id"])
bcr[msg["round_id"]].start(msg["order"])
return jsonify(ok=True)
Non-king nodes will see this round_id
for the first time when the king calls their /order
endpoint:
@app.route("/order", methods=["POST"])
def order():
msg = request.get_json()
if "round_id" not in msg:
return "specify round_id", 400
if msg["round_id"] not in bcr:
bcr[msg["round_id"]] = new_bcr(msg["round_id"])
bcr[msg["round_id"]].onmessage(msg)
return jsonify(ok=True)
In both cases, the function new_bcr()
given earlier is called to create a new instance of ByzantineConsensus
for this round. Note that upon receiving a message, the general calls ByzantineConsensus.onmessage()
; this is the core entry point during the message cascade.
Deciding
As in the previous implementation, the driver calls /status
to check if generals have received all messages, and the generals call the appropriate ByzantineConsensus
object's is_done()
function:
@app.route("/status")
def status():
msg = request.get_json()
if "round_id" not in msg:
return "specify round_id", 400
if msg["round_id"] not in bcr:
return "no such round_id seen", 400
return jsonify(done=bcr[msg["round_id"]].is_done())
When all generals have finished, or some timeout was triggered, the driver calls /decide
on the generals, which has a similar logic:
@app.route("/decide", methods=["POST"])
def decide():
msg = request.get_json()
if "round_id" not in msg:
return "specify round_id", 400
if msg["round_id"] not in bcr:
return "no such round_id seen", 400
return jsonify(value=bcr[msg["round_id"]].decide(timeout_default=msg.get("timeout_default", "")))
A notable difference here is that the general passes in a default value to ByzantineConsensus.decide()
to use for potential missing messages.
Multiple rounds with multiple kings
With these changes, the driver code can now support multiple rounds, with different kings, with different values being agreed upon:
for round_id in range(num_rounds):
king_id = random.sample(range(n), 1)[0]
print(f"\nROUND #{round_id}, node {king_id} is king")
# kick off message cascade by telling the king to issue his order
activities = ["drink beer", "eat dinner", "sleep", "watch a movie", "go clubbing"]
order = random.sample(activities, 1)[0]
requests.post(f"http://127.0.0.1:{8000+king_id}/start", json={"round_id": round_id, "order": order})
# wait until all generals report done
timeout_ts = time.monotonic() + timeout
while True:
if time.monotonic() > timeout_ts:
print("Timeout, calling decide(), generals will assume default values for missing messages")
break
...
For example, running the code at $(M,N)=(2, 7)$ with 3 rounds, with the traitors not sending onward messages, hence forcing timeouts:
$ python3 driver.py 2 3
Traitors: {1, 4}
ROUND #0, node 3 is king
Timeout, calling decide(), generals will assume default values for missing messages
Decisions:
General 0 decided sleep
General 1 is a traitor
General 2 decided sleep
General 3 decided sleep
General 4 is a traitor
General 5 decided sleep
General 6 decided sleep
SUCCESS: all non-traitor generals decided to sleep!
ROUND #1, node 0 is king
Timeout, calling decide(), generals will assume default values for missing messages
Decisions:
General 0 decided go clubbing
General 1 is a traitor
General 2 decided go clubbing
General 3 decided go clubbing
General 4 is a traitor
General 5 decided go clubbing
General 6 decided go clubbing
SUCCESS: all non-traitor generals decided to go clubbing!
ROUND #2, node 3 is king
Timeout, calling decide(), generals will assume default values for missing messages
Decisions:
General 0 decided watch a movie
General 1 is a traitor
General 2 decided watch a movie
General 3 decided watch a movie
General 4 is a traitor
General 5 decided watch a movie
General 6 decided watch a movie
SUCCESS: all non-traitor generals decided to watch a movie!
Conclusion
By isolating consensus logic from I/O and making behaviour pluggable, the implementation becomes easier to test, extend, and drop into different systems. The same core now supports many orchestrations (multiple kings, concurrent rounds, different transports) while preserving Lamport’s guarantees under the oral-messages model. This foundation enables swapping out I/O (e.g., HTTP → TCP) and tuning engineering defaults (timeouts/retries), and explore different traitor behaviours.