Build a Message Queue (RabbitMQ / SQS) (14 scenes)
Scene 11 · Routing — one queue per consumer group
Fanout in queue-world is an exchange/SNS topic that copies each message into N physical queues — opposite of Kafka's N cursors on one log.
Previously

Within a queue we partitioned for ordered parallelism. Across queues we need the opposite: one event reaching multiple independent consumer pools, each with its own backlog and retry policy.

Scene 11
Routing — one event, one queue per consumer group
Diagram
A producer publishes to a central exchange (the router); bindings on the exchange decide which queues receive a copy. Each bound queue has its own consumer pool, its own depth, and its own backlog — they do not share storage. Other exchange types (topic, headers) exist but only fanout and direct are exercised here.
producerpublish(...)exchange(fanout)SNS topic · RabbitMQ exchangeemail-queuedepth = 02 consumersworker-1, worker-2audit-queuedepth = 01 consumerworker-1analytics-queue (slow)depth = 01 consumerworker-1
Watch the producer publish one message at a time. The exchange flashes, then the same payload duplicates into every bound queue at once. Notice that analytics piles up while email and audit drain — each queue has its own backlog.
Implementation
Exchange.publish(message)
for each matching binding, copy into that queue's tail
1def publish(msg, routing_key):
2 for queue, bind_key in bindings[self]:
3 if self.type == 'fanout':
4 queue.tail.append(msg) # everyone
5 elif self.type == 'direct':
6 if bind_key == routing_key:
7 queue.tail.append(msg) # exact match
8 elif self.type == 'topic':
9 if pattern_match(bind_key, routing_key):
10 queue.tail.append(msg) # wildcard
11 # N matches => N physical copies, N backlogs
Exchange.bind(queue, routing_key)
add a binding at runtime; new traffic copied immediately
1def bind(queue, routing_key=''):
2 bindings[self].append((queue, routing_key))
3 # producer code unchanged — no redeploy
4 # queue starts receiving from the NEXT publish
5 # (it does NOT replay history — each queue
6 # stores its own messages)
ExchangeType.matches
the three matching modes; only one decides delivery
1def matches(exchange_type, bind_key, routing_key):
2 if exchange_type == 'fanout':
3 return True # ignore key entirely
4 if exchange_type == 'direct':
5 return bind_key == routing_key
6 if exchange_type == 'topic':
7 # bind_key has wildcards: 'order.*'
8 return pattern_match(bind_key, routing_key)