Build a distributed search engine (Elasticsearch / OpenSearch style) (12 scenes)
Scene 07 · Scatter, reduce, fetch
_search is two round trips: scatter top-(from+size) doc-ids+scores from every shard, reduce, then fetch only the global winners — which is where the 10,000 page wall comes from.
Scene 07
Scatter, reduce, fetch
COORDINATORcoord · "distributed systems"phase 0 · searchs0top-K candsbook 11.2book 50.7cand 10.1cand 20.1s1top-K candsbook 40.4cand 10.1cand 20.1s2top-K candscand 10.1cand 20.1s3top-K candsbook 23.1cand 10.1cand 20.1s4top-K candsbook 32.8cand 10.1cand 20.1GLOBAL TOP-N · merged at coordinatorscore(waiting for shard responses)FROM + SIZE · candidate bytesfrom=0 size=105,000 bytes shipped to coordCoordinator idle — 5 shards, from=0, size=10
The receiving node becomes the conductor. Watch phase 1 fan out 'distributed systems' to one copy of every shard, the coordinator merge the local top-Ks into a global top-N, then phase 2 fetch _source ONLY from the shards that actually contributed a winner.
Implementation
Coordinator.search
phase 1 scatter for top-(from+size); phase 2 fetch winners
1def search(query, from, size):
2 # phase 1 — scatter (doc_id, score) only
3 results = []
4 for shard in all_primary_shards:
5 copy = adaptive_replica_selection(shard)
6 results.append(copy.localSearch(query, from + size))
7 top = merge(results, from, size)
8 # phase 2 — fetch _source for global winners only
9 bodies = [shard_of(h).fetch(h.doc_id) for h in top]
10 return assemble(top, bodies)
Shard.localSearch
BM25-rank local matches; ship (doc_id, score) only
1def localSearch(query, top_k):
2 pq = PriorityQueue(top_k)
3 for doc_id in postings_for(query):
4 score = bm25(query, doc_id)
5 pq.offer((doc_id, score))
6 return pq.drain() # no _source yet
Coordinator.merge
buffer N × (from+size) candidates, then slice
1def merge(per_shard, from, size):
2 # buffer = num_shards * (from + size)
3 # this is the from + size <= 10 000 wall
4 pool = []
5 for hits in per_shard:
6 pool.extend(hits)
7 pool.sort(key=score, reverse=True)
8 return pool[from : from + size]