A more general implementation of Lamport's Byzantine Consensus algorithm in Python

Marton Trencseni - Tue 12 August 2025 - Programming

Introduction

In the previous article I introduced Lamport's famous Byzantine Generals problem and implemented his "oral messages" algorithm in Python. In this follow-up I will discuss and implement software engineering generalizations and optimizations of the same algorithm:

  • support any of the nodes initiating the consensus algorithm, i.e. acting as the king (not just node 0)
  • support arbitrary values in the consensus algorithm (not just attack and retreat)
  • support multiple rounds of the Byzantine Consensus algorithm, potentially running at the same time
  • support timeouts

The code is on Github: mtrencseni/dca/byzantine/multi

Core algorithm

The first step is to abstract out the core Byzantine Consensus algorithm, with implementation-specific details such as networking and specific values (attack, retreat) factored out. This is the ByzantineConsensus class:

class ByzantineConsensus:
    def __init__(self, *, node_id: int, n: int, m: int,
                 majority_func,     # majority_func(iterable[str]) -> str
                 send_func,         # send_func(target_id: int, msg: dict) -> None
                 next_value_func):  # next_value_func(v: str) -> str   
        self.id              = node_id
        self.n               = n
        self.m               = m
        self._majority       = majority_func
        self._send           = send_func
        self._next_value     = next_value_func 
        # state
        self.received_values = {}          # path-tuple -> value
        self._done           = False       # message cascade finished?
        self._value          = None
        # pre-compute expectation
        self._total_expected = sum(self._expected_messages(k) for k in range(m + 1))

    def all_messages_received(self) -> bool:
        return len(self.received_values) >= self._total_expected

    def is_done(self) -> bool:
        return self._done

    def value(self):
        return self._value

    def onmessage(self, msg: dict) -> None:
        if self._done:
            raise RuntimeError("node already finished this round")
        path  = tuple(msg["path"])
        value = msg["value"]
        k = 1 + self.m - len(path)
        self.received_values[path] = value
        # forward if required
        if k > 0: # more stages to go
            value = self._next_value(value)
            if value is not None:
                self._broadcast(path + (self.id,),
                                value,
                                k - 1)
        # check completion
        if self.all_messages_received():
            self._done = True

    def decide(self, timeout_default=None):
        if self._value is not None:
            return self._value
        if not self._done and timeout_default is None:
            raise RuntimeError("cannot decide before cascade finishes and without default value")
        if len(self.received_values) == 0:
            return timeout_default
        root = min(self.received_values.keys(), key=len) # shortest path
        root = [root[0]]
        self._value = self._om(list(root), timeout_default)
        return self._value

    def start(self, initial_value: str) -> None:
        self._value = initial_value
        self._broadcast((self.id,), initial_value, self.m)
        self._done = True # king never receives later messages

    def _broadcast(self, path, value, k_remaining):
        for i in range(self.n):
            if i in path or i == self.id: # do not resend along path
                continue
            self._send(i, {"path": list(path), "value": value})

    def _expected_messages(self, k):
        return perm(3 * self.m + 1 - 2, self.m - k)

    def _om(self, path, default_value):
        k = 1 + self.m - len(path)
        v = self.received_values.get(tuple(path), default_value)
        if k == 0:
            return v
        child_vals = [self._om(path + [i], default_value)
                      for i in range(self.n)
                      if i not in path + [self.id]]
        return self._majority([v] + child_vals)

The class takes the following values in its constructor:

  • node_id, n, m
  • majority_func()
  • send_func(): the class uses this to send messages, abstracting away the implementation details
  • next_value_func(): the class passes on next_value_func(value) in the message cascade, can be used to implement traitor behaviour

In order to simulate traitors not sending along anything, if next_value_func() returns None nothing is sent along the message cascade, and non-traitor nodes are forced to time out and assume default values. Note that the implementation does not assume that the king is node 0!

Node code

There are many ways to instantiate ByzantineConsensus objects, here is one way:

def majority(values, tie_breaker=min):
    if not values:
        raise ValueError("majority() called with no values")
    counter      = Counter(values)
    top_count    = max(counter.values())
    tied_values  = [v for v, c in counter.items() if c == top_count]
    # single clear winner
    if len(tied_values) == 1:
        return tied_values[0]
    # tie -> delegate to tie-breaker
    return tie_breaker(tied_values)

def async_order(target_id, msg):
    def _post():
        try: session.post(f"http://127.0.0.1:{8000+target_id}/order", json=msg)
        except: pass
    executor.submit(_post)

def traitor_timeout(v):
    return None if is_traitor else v

def new_bcr(round_id):
    return ByzantineConsensus(
        node_id=node_id, n=n, m=m,
        majority_func=majority,
        send_func=lambda target_id, msg: async_order(target_id, {**msg, "round_id": round_id}),
        next_value_func=traitor_timeout)

Here I defined a majority() function that does not assume what values are used. async_order() is the same as before, but a round_id is injected into the message on sending, so several ByzantineConsensus instances can send and receive messages at the same time. I use the traitor_timeout() function for next_value_func(), so traitors essentially stay silent during the whole algorithm.

Multiple rounds

To support multiple rounds, the driver sends along a round_id when calling the general's /start endpoint. This round_id is then contained in all messages for that round of the consensus algorithm, and all generals create new instances of ByzantineConsensus upon seeing a new round_id.

The /start endpoint is called by the driver on the king (for each round):

@app.route("/start", methods=["POST"])
def start():
    msg = request.get_json()
    if "round_id" not in msg:
        return "specify round_id", 400
    if "round_id" in bcr:
        return "round_id already seen", 400
    bcr[msg["round_id"]] = new_bcr(msg["round_id"])
    bcr[msg["round_id"]].start(msg["order"])
    return jsonify(ok=True)

Non-king nodes will see this round_id for the first time when the king calls their /order endpoint:

@app.route("/order", methods=["POST"])
def order():
    msg = request.get_json()
    if "round_id" not in msg:
        return "specify round_id", 400
    if msg["round_id"] not in bcr:
        bcr[msg["round_id"]] = new_bcr(msg["round_id"])
    bcr[msg["round_id"]].onmessage(msg)
    return jsonify(ok=True)

In both cases, the function new_bcr() given earlier is called to create a new instance of ByzantineConsensus for this round. Note that upon receiving a message, the general calls ByzantineConsensus.onmessage(); this is the core entry point during the message cascade.

Deciding

As in the previous implementation, the driver calls /status to check if generals have received all messages, and the generals call the appropriate ByzantineConsensus object's is_done() function:

@app.route("/status")
def status():
    msg = request.get_json()
    if "round_id" not in msg:
        return "specify round_id", 400
    if msg["round_id"] not in bcr:
        return "no such round_id seen", 400
    return jsonify(done=bcr[msg["round_id"]].is_done())

When all generals have finished, or some timeout was triggered, the driver calls /decide on the generals, which has a similar logic:

@app.route("/decide", methods=["POST"])
def decide():
    msg = request.get_json()
    if "round_id" not in msg:
        return "specify round_id", 400
    if msg["round_id"] not in bcr:
        return "no such round_id seen", 400
    return jsonify(value=bcr[msg["round_id"]].decide(timeout_default=msg.get("timeout_default", "")))

A notable difference here is that the general passes in a default value to ByzantineConsensus.decide() to use for potential missing messages.

Multiple rounds with multiple kings

With these changes, the driver code can now support multiple rounds, with different kings, with different values being agreed upon:

for round_id in range(num_rounds):
    king_id = random.sample(range(n), 1)[0]
    print(f"\nROUND #{round_id}, node {king_id} is king")
    # kick off message cascade by telling the king to issue his order
    activities = ["drink beer", "eat dinner", "sleep", "watch a movie", "go clubbing"]
    order = random.sample(activities, 1)[0]
    requests.post(f"http://127.0.0.1:{8000+king_id}/start", json={"round_id": round_id, "order": order})
    # wait until all generals report done
    timeout_ts = time.monotonic() + timeout
    while True:
        if time.monotonic() > timeout_ts:
            print("Timeout, calling decide(), generals will assume default values for missing messages")
            break
    ...

For example, running the code at $(M,N)=(2, 7)$ with 3 rounds, with the traitors not sending onward messages, hence forcing timeouts:

$ python3 driver.py 2 3
Traitors: {1, 4}

ROUND #0, node 3 is king
Timeout, calling decide(), generals will assume default values for missing messages
Decisions:
General 0 decided sleep
General 1 is a traitor
General 2 decided sleep
General 3 decided sleep
General 4 is a traitor
General 5 decided sleep
General 6 decided sleep
SUCCESS: all non-traitor generals decided to sleep!

ROUND #1, node 0 is king
Timeout, calling decide(), generals will assume default values for missing messages
Decisions:
General 0 decided go clubbing
General 1 is a traitor
General 2 decided go clubbing
General 3 decided go clubbing
General 4 is a traitor
General 5 decided go clubbing
General 6 decided go clubbing
SUCCESS: all non-traitor generals decided to go clubbing!

ROUND #2, node 3 is king
Timeout, calling decide(), generals will assume default values for missing messages
Decisions:
General 0 decided watch a movie
General 1 is a traitor
General 2 decided watch a movie
General 3 decided watch a movie
General 4 is a traitor
General 5 decided watch a movie
General 6 decided watch a movie
SUCCESS: all non-traitor generals decided to watch a movie!

Conclusion

By isolating consensus logic from I/O and making behaviour pluggable, the implementation becomes easier to test, extend, and drop into different systems. The same core now supports many orchestrations (multiple kings, concurrent rounds, different transports) while preserving Lamport’s guarantees under the oral-messages model. This foundation enables swapping out I/O (e.g., HTTP → TCP) and tuning engineering defaults (timeouts/retries), and explore different traitor behaviours.