Build a distributed search engine (Elasticsearch / OpenSearch style) (12 scenes)
Scene 07 · Scatter, reduce, fetch
_search is two round trips: scatter top-(from+size) doc-ids+scores from every shard, reduce, then fetch only the global winners — which is where the 10,000 page wall comes from.
Scene 07
Scatter, reduce, fetch
The receiving node becomes the conductor. Watch phase 1 fan out 'distributed systems' to one copy of every shard, the coordinator merge the local top-Ks into a global top-N, then phase 2 fetch _source ONLY from the shards that actually contributed a winner.
Implementation
Coordinator.search
phase 1 scatter for top-(from+size); phase 2 fetch winners
1def search(query, from, size):2 # phase 1 — scatter (doc_id, score) only3 results = []4 for shard in all_primary_shards:5 copy = adaptive_replica_selection(shard)6 results.append(copy.localSearch(query, from + size))7 top = merge(results, from, size)8 # phase 2 — fetch _source for global winners only9 bodies = [shard_of(h).fetch(h.doc_id) for h in top]10 return assemble(top, bodies)
Shard.localSearch
BM25-rank local matches; ship (doc_id, score) only
1def localSearch(query, top_k):2 pq = PriorityQueue(top_k)3 for doc_id in postings_for(query):4 score = bm25(query, doc_id)5 pq.offer((doc_id, score))6 return pq.drain() # no _source yet
Coordinator.merge
buffer N × (from+size) candidates, then slice
1def merge(per_shard, from, size):2 # buffer = num_shards * (from + size)3 # this is the from + size <= 10 000 wall4 pool = []5 for hits in per_shard:6 pool.extend(hits)7 pool.sort(key=score, reverse=True)8 return pool[from : from + size]