Build a columnar OLAP store (ClickHouse / Druid style) (13 scenes)
Scene 05 · Writes must be bulk, not per-row
Every INSERT touches every column file; per-row inserts on a 30-column table pay 30× the per-column overhead. Batches or async_insert.
Previously
Vectorized execution wants thousands of values per call. That same rule applies to writes — they have to arrive in batches, not one row at a time, or the per-column overhead crushes the engine.
Scene 05
Writes must be bulk, not per-row
Diagram
A client streams 1000 INSERTs/sec into the OLAP engine. The right rack shows 30 column files (c0.bin … c29.bin) — every INSERT fans out to all 30, no matter how many rows it carries. **Bulk insert / batched write** — many rows committed as one operation; the per-column overhead is paid once per batch. **Async insert (server-side buffer)** — the engine accumulates drips per query shape and flushes them as one slab on a size/time threshold. The footer meters track per-column overhead (writes/sec) and sustained throughput (rows/sec); the warn banner appears when the part stack overflows.
1000 rows/sec arrive at the engine. Watch the client-side buffer fill to 1000 rows, then flush as one batched INSERT — the engine pays a single 30-column fan-out for the whole batch, not 30 fan-outs per row.
Implementation
Engine.insertPerRow(rows)
the pathological path — 30 fan-outs per row
1def insertPerRow(rows):2 for row in rows: # N iterations3 for col in table.columns: # 30 columns4 f = open(part_dir / f'{col}.bin')5 f.write(codec.encode(row[col]))6 f.sync()7 f.close()8 registerPart(part_dir) # 1 part per row9 # cost = N rows * 30 columns = 30N file ops
Engine.insertBulk(batch)
the amortized path — 30 fan-outs per batch, not per row
1def insertBulk(batch):2 part_dir = newPartDir()3 for col in table.columns: # 30 columns, ONCE4 f = open(part_dir / f'{col}.bin')5 for row in batch: # N rows, hot loop6 f.write(codec.encode(row[col]))7 f.sync()8 f.close()9 registerPart(part_dir) # 1 part per batch10 # cost = 30 file ops, regardless of batch size
Server.onInsert(query, rows)
async_insert — server-side buffer flushes one slab
1def onInsert(query, rows):2 if not cfg.async_insert:3 return insertBulk(rows) # synchronous path4 buf = buffers[shape(query)] # one buf per query shape5 buf.append(rows)6 if (buf.bytes >= async_insert_max_data_size # ~1 MB7 or buf.age_ms >= async_insert_busy_timeout_ms # ~200 ms8 or buf.queries >= async_insert_max_query_number):9 insertBulk(buf.drain()) # one fan-out per FLUSH10 if cfg.wait_for_async_insert:11 return waitForFlush(buf) # durable, back-pressures12 return Ack() # fire-and-forget