Build a distributed search engine (Elasticsearch / OpenSearch style) (12 scenes)
Scene 10 · Top-N aggregations can miss the winner
Each shard returns its own top shard_size buckets and the coordinator merges — a term that is 11th on every shard is invisible in a top-10 result, and shard_size is the knob.
Previously
Scoring drift was approximation per document — same data, reranked. The same per-shard truncation reappears in aggregations, with a sharper consequence: not just reordered, but missing.
Scene 10
Top-N aggregations can miss the winner
Diagram
The same scatter-gather chrome from search, in `agg` mode. Each shard tile now ships (term, doc_count) BUCKETS instead of (doc_id, score) tuples. The coordinator merges by doc_count and returns the global top-`size`. `shard_size` is how many local buckets each shard returns; `size` is how many global buckets the coordinator returns. Per-bucket `doc_count_error_upper_bound` chips show how much the merge could be undercounting because a term lived below some shard's cutoff.
We're running a `terms` agg on the `tags` field across 5 shards (~100k books each), asking for the top 5 most-popular tags globally. By default each shard returns its local top 5. Watch the merge: 5 shards × 5 buckets = 25 candidates, coordinator picks the global top 5. Looks clean — but one tag is silently missing.
Implementation
Coordinator.termsAgg
fan out for top-shard_size; merge; slice top-size
1def terms_agg(field, size, shard_size):2 per_shard = []3 for shard in cluster.shards_for(index):4 per_shard.append(shard.local_top_k(field, k=shard_size))5 merged = merge_buckets(per_shard)6 return merged[:size]
Shard.localTopK
count + sort + slice — anything below k is invisible
1def local_top_k(field, k):2 counts = Counter()3 for doc in self.docs:4 for term in doc[field]:5 counts[term] += 16 return counts.most_common(k) # tail truncated here
Coordinator.mergeBuckets
sum counts; sort; emit doc_count_error_upper_bound
1def merge_buckets(per_shard):2 totals, seen = {}, {}3 for shard_idx, buckets in enumerate(per_shard):4 for term, count in buckets:5 totals[term] = totals.get(term, 0) + count6 seen.setdefault(term, set()).add(shard_idx)7 out = []8 for term, count in sorted(totals.items(), key=-count):9 # error: shards that did NOT return this term could hide10 # up to their smallest-returned count for it.11 err = sum(per_shard[s][-1].count12 for s in range(len(per_shard))13 if s not in seen[term])14 out.append({term, count, doc_count_error_upper_bound: err})15 return out