Build a columnar OLAP store (ClickHouse / Druid style) (13 scenes)
Scene 05 · Writes must be bulk, not per-row
Every INSERT touches every column file; per-row inserts on a 30-column table pay 30× the per-column overhead. Batches or async_insert.
Previously

Vectorized execution wants thousands of values per call. That same rule applies to writes — they have to arrive in batches, not one row at a time, or the per-column overhead crushes the engine.

Scene 05
Writes must be bulk, not per-row
Diagram
A client streams 1000 INSERTs/sec into the OLAP engine. The right rack shows 30 column files (c0.bin … c29.bin) — every INSERT fans out to all 30, no matter how many rows it carries. **Bulk insert / batched write** — many rows committed as one operation; the per-column overhead is paid once per batch. **Async insert (server-side buffer)** — the engine accumulates drips per query shape and flushes them as one slab on a size/time threshold. The footer meters track per-column overhead (writes/sec) and sustained throughput (rows/sec); the warn banner appears when the part stack overflows.
CLIENT1000 rows/secarriving from appENGINE — PER-ROW1 row in →30 column writes outBACK-PRESSURE: too many parts (3000+)COLUMN FILES — 30c0.binc1.binc2.binc3.binc4.binc5.binc6.binc7.binc8.binc9.binc10.binc11.binc12.binc13.binc14.binc15.binc16.binc17.binc18.binc19.binc20.binc21.binc22.binc23.binc24.binc25.binc26.binc27.binc28.binc29.bin30 writes / rowPER-COLUMN OVERHEAD (writes/sec)30,000/sSUSTAINED THROUGHPUT~30 rows/secmode: per-rowPer-row INSERT into a 30-column table: each row fires 30 column writes. Engine drowns.
1000 rows/sec arrive at the engine. Watch the client-side buffer fill to 1000 rows, then flush as one batched INSERT — the engine pays a single 30-column fan-out for the whole batch, not 30 fan-outs per row.
Implementation
Engine.insertPerRow(rows)
the pathological path — 30 fan-outs per row
1def insertPerRow(rows):
2 for row in rows: # N iterations
3 for col in table.columns: # 30 columns
4 f = open(part_dir / f'{col}.bin')
5 f.write(codec.encode(row[col]))
6 f.sync()
7 f.close()
8 registerPart(part_dir) # 1 part per row
9 # cost = N rows * 30 columns = 30N file ops
Engine.insertBulk(batch)
the amortized path — 30 fan-outs per batch, not per row
1def insertBulk(batch):
2 part_dir = newPartDir()
3 for col in table.columns: # 30 columns, ONCE
4 f = open(part_dir / f'{col}.bin')
5 for row in batch: # N rows, hot loop
6 f.write(codec.encode(row[col]))
7 f.sync()
8 f.close()
9 registerPart(part_dir) # 1 part per batch
10 # cost = 30 file ops, regardless of batch size
Server.onInsert(query, rows)
async_insert — server-side buffer flushes one slab
1def onInsert(query, rows):
2 if not cfg.async_insert:
3 return insertBulk(rows) # synchronous path
4 buf = buffers[shape(query)] # one buf per query shape
5 buf.append(rows)
6 if (buf.bytes >= async_insert_max_data_size # ~1 MB
7 or buf.age_ms >= async_insert_busy_timeout_ms # ~200 ms
8 or buf.queries >= async_insert_max_query_number):
9 insertBulk(buf.drain()) # one fan-out per FLUSH
10 if cfg.wait_for_async_insert:
11 return waitForFlush(buf) # durable, back-pressures
12 return Ack() # fire-and-forget