Build a Message Queue (RabbitMQ / SQS) (14 scenes)
Scene 05 · Ack — the consumer's 'you may delete'
Dequeue is two steps: lease (hide from peers) + ack (delete). Same word as Kafka's ack, but consumer→broker direction — and auto-ack loses messages.
Previously
The broker hands each message to exactly one worker — but right now it deletes the cell at handoff. That can't be right: if the worker crashes mid-processing, the work is lost. So we split the dequeue into a lease and an ack.
Scene 05
Ack — the consumer's 'you may delete'
Diagram
A FIFO strip in the middle, producer top-right, workers stacked on the right. Bright cells are available; grey dashed cells are in-flight (leased to the worker whose badge appears on the cell) and remain in the broker's memory until the consumer acks. Acking removes the cell; a crash without ack returns the lease to the head.
Watch the lease–ack handshake. The cell turns grey the moment the broker hands it to a worker; it only disappears when the worker's ack comes back. The broker is holding the cell in an 'in flight' state in between.
Implementation
Broker.lease
pop the head, bind it to a worker, hide from peers
1def lease(worker_id, auto_ack=False):2 cell = queue.pop_head() # off the available strip3 cell.state = 'in-flight'4 cell.worker_id = worker_id # peers can't see it5 in_flight[cell.id] = cell6 if auto_ack:7 del in_flight[cell.id] # collapse to one step8 cell.state = 'acked' # gone before processing9 return cell
Worker.process
do the side effect, then ack — note: ack is consumer-side
1def process(cell):2 try:3 do_side_effect(cell.payload) # the actual work4 broker.ack(self.id, cell.id) # 'you may delete'5 except Exception:6 # no ack sent — broker still holds the lease7 # lease eventually returns to the head on crash8 raise
Broker.ack
consumer→broker RPC: verify ownership, then delete
1def ack(worker_id, cell_id):2 cell = in_flight.get(cell_id)3 if cell is None:4 return # already gone / never leased5 if cell.worker_id != worker_id:6 raise PreconditionFailed() # not yours to ack7 del in_flight[cell_id] # truly deleted now8 cell.state = 'acked'