Build a distributed search engine (Elasticsearch / OpenSearch style) (12 scenes)
Scene 10 · Top-N aggregations can miss the winner
Each shard returns its own top shard_size buckets and the coordinator merges — a term that is 11th on every shard is invisible in a top-10 result, and shard_size is the knob.
Previously

Scoring drift was approximation per document — same data, reranked. The same per-shard truncation reappears in aggregations, with a sharper consequence: not just reordered, but missing.

Scene 10
Top-N aggregations can miss the winner
Diagram
The same scatter-gather chrome from search, in `agg` mode. Each shard tile now ships (term, doc_count) BUCKETS instead of (doc_id, score) tuples. The coordinator merges by doc_count and returns the global top-`size`. `shard_size` is how many local buckets each shard returns; `size` is how many global buckets the coordinator returns. Per-bucket `doc_count_error_upper_bound` chips show how much the merge could be undercounting because a term lived below some shard's cutoff.
COORDINATORcoordinator · merging top buck…phase 2 · aggshard0top-N bucketsLOCAL STATSn=100000 df=?classic · 80k80.0children · 78k78.0systems · 76k76.0fiction · 74k74.0databases · 7…72.0shard1top-N bucketsLOCAL STATSn=100000 df=?systems · 80k80.0fiction · 78k78.0networking · …76.0java · 74k74.0search · 72k72.0shard2top-N bucketsLOCAL STATSn=100000 df=?fiction · 80k80.0children · 78k78.0search · 76k76.0java · 74k74.0networking · …72.0shard3top-N bucketsLOCAL STATSn=100000 df=?systems · 80k80.0databases · 7…78.0networking · …76.0search · 74k74.0java · 72k72.0shard4top-N bucketsLOCAL STATSn=100000 df=?fiction · 80k80.0children · 78k78.0databases · 7…76.0java · 74k74.0networking · …72.0GLOBAL TOP-N · merged at coordinatordoc_count#1 fiction · 312k · err ≤ 72k312.0#2 networking · 296k · err ≤ 72k296.0#3 java · 294k · err ≤ 72k294.0#4 systems · 236k · err ≤ 144k236.0#5 children · 234k · err ≤ 144k234.0(no pagination meter)terms agg on `tags` · size=5 · shard_size=5.
We're running a `terms` agg on the `tags` field across 5 shards (~100k books each), asking for the top 5 most-popular tags globally. By default each shard returns its local top 5. Watch the merge: 5 shards × 5 buckets = 25 candidates, coordinator picks the global top 5. Looks clean — but one tag is silently missing.
Implementation
Coordinator.termsAgg
fan out for top-shard_size; merge; slice top-size
1def terms_agg(field, size, shard_size):
2 per_shard = []
3 for shard in cluster.shards_for(index):
4 per_shard.append(shard.local_top_k(field, k=shard_size))
5 merged = merge_buckets(per_shard)
6 return merged[:size]
Shard.localTopK
count + sort + slice — anything below k is invisible
1def local_top_k(field, k):
2 counts = Counter()
3 for doc in self.docs:
4 for term in doc[field]:
5 counts[term] += 1
6 return counts.most_common(k) # tail truncated here
Coordinator.mergeBuckets
sum counts; sort; emit doc_count_error_upper_bound
1def merge_buckets(per_shard):
2 totals, seen = {}, {}
3 for shard_idx, buckets in enumerate(per_shard):
4 for term, count in buckets:
5 totals[term] = totals.get(term, 0) + count
6 seen.setdefault(term, set()).add(shard_idx)
7 out = []
8 for term, count in sorted(totals.items(), key=-count):
9 # error: shards that did NOT return this term could hide
10 # up to their smallest-returned count for it.
11 err = sum(per_shard[s][-1].count
12 for s in range(len(per_shard))
13 if s not in seen[term])
14 out.append({term, count, doc_count_error_upper_bound: err})
15 return out