Build a Message Queue (RabbitMQ / SQS) (14 scenes)
Scene 02 · Kafka isn't this — reads delete here
Same picture as Kafka from a distance, opposite rules up close: the queue's read removes the cell; no other consumer will ever see it.
Previously
We have a buffer between the request and the worker. Same picture as Kafka from a distance, but opposite rules up close: in Kafka the consumer is a bookmark and the bytes stay; here the read removes the cell.
Scene 02
Kafka isn't this — reads delete here
Diagram
Two strips share one producer. **destructive read** — a read that removes the message from the queue, so no other consumer can ever receive it. The top strip (Kafka log) keeps every cell lit; consumers are cursors that can sit anywhere. The bottom strip (queue) loses cells from the head as they are read, and the queue itself tracks where the head currently is.
Same five colored messages, two strips. On the log, both cursors crawl forward and every cell stays lit. On the queue, the first cell vanishes the instant C1 takes it — that's a destructive read.
Implementation
Log.read(consumer_id)
Non-destructive: serve bytes, advance that consumer's cursor.
1def read(consumer_id):2 offset = cursors[consumer_id] # per-consumer bookmark3 if offset >= len(log):4 return None # caught up, bytes stay5 record = log[offset] # cell is NOT removed6 cursors[consumer_id] = offset + 17 return record89def seek(consumer_id, new_offset):10 cursors[consumer_id] = new_offset # replay from anywhere
Queue.dequeue()
Destructive: return the head, then remove it. No cursor.
1def dequeue():2 if head == tail:3 return None # queue empty4 record = cells[head]5 del cells[head] # bytes are GONE6 head += 1 # broker owns this, not consumer7 return record89# No seek(). No replay. A second consumer can only ever10# receive cells that have not yet been dequeued.
Fanout(message) — same call, opposite shape
Where the bytes live when N consumers want a copy.
1def log_fanout(record):2 log.append(record) # one physical cell3 # each consumer group reads via its own cursor45def queue_fanout(record, queues):6 for q in queues: # one queue per consumer group7 q.enqueue(copy(record)) # N physical copies8 # each group dequeues from its own queue