ClickHouse Data Ingestion
Insert Paths, the Parts Problem, and Why Batch Size Determines Everything
ClickHouse can ingest hundreds of millions of rows per second on commodity hardware — but only
if you respect the engine's central constraint: every INSERT creates a new MergeTree part on
disk, and parts are merged asynchronously in the background. Insert too small, too often, and
the merge backlog grows unboundedly until queries grind to a halt with the dreaded
Too many parts error. Insert too large and you exceed memory limits or stall the
ingestion pipeline. Modern ClickHouse provides several tools to navigate this:
async_insert for server-side batching, the Buffer engine for
in-memory staging, the Kafka / S3 engines for pull-based ingestion, and
distributed inserts with two-phase replication. Picking the right path for the
right workload is most of what makes ClickHouse fast.
Why Ingestion Is the Hard Part
MergeTree stores data as immutable parts on disk. Each INSERT writes a new part. Background threads merge small parts into larger ones over time. The merge cost is logarithmic per row, but the system has a hard cap (default 300 active parts per partition). Ingestion that consistently exceeds the merge rate causes the cap to be hit, and writes start failing.
A part with 1000 rows and 30 columns produces ~30 small files on disk (one per column, plus indexes and marks). Filesystem overhead per file dwarfs the data itself. Large inserts amortize this; tiny inserts pay it on every row. The optimal batch size is tens to hundreds of thousands of rows, not tens.
Real ingestion comes from thousands of upstream services, each producing small batches. A naive architecture has them all hitting ClickHouse directly — which guarantees the parts problem. The solution is server-side or proxy-side aggregation that turns N small inserts into one large one without each client having to coordinate.
The Five Insert Paths
Key Numbers
The HTTP / Native / gRPC Insert Path
The most direct path: a client sends rows in a recognized format. Three transports:
# HTTP (most common, easiest to integrate)
curl -X POST 'http://ch:8123/?query=INSERT INTO events FORMAT JSONEachRow' \
-H 'X-ClickHouse-User: default' \
--data-binary @events.json
# Native TCP (binary protocol; lowest latency, used by clickhouse-client)
clickhouse-client --query "INSERT INTO events FORMAT Native" < events.bin
# gRPC (newer, useful for service mesh integration)
echo '{"query":"INSERT INTO events VALUES (1,'\''a'\'')","query_id":"1"}' | \
grpcurl -d @ -plaintext ch:9100 clickhouse.grpc.ClickHouse/ExecuteQuery
Format choice matters. ClickHouse supports 70+ formats. RowBinary is the fastest for
machine-to-machine (no parsing of JSON or CSV). Native is the same column-major format
ClickHouse uses internally; if your producer can speak it, you skip the conversion entirely.
JSONEachRow is the most ergonomic; it's slower than RowBinary but handles schema
evolution gracefully (extra fields ignored, missing fields default).
| Format | Throughput | Use case |
|---|---|---|
| Native | ~700 MB/s ingest | ClickHouse-to-ClickHouse, batch tools |
| RowBinary | ~600 MB/s | Fastest from non-CH producers |
| Parquet | ~400 MB/s | Interop with Spark/data lakes |
| JSONEachRow | ~120 MB/s | Service-to-service, schemaless events |
| CSV | ~150 MB/s | Bulk file imports |
| Protobuf | ~250 MB/s | gRPC ecosystems |
Batch Sizing: The Most Common Mistake
The default behavior of most "INSERT one row" client libraries is catastrophic for ClickHouse.
Each INSERT statement creates one part, regardless of row count. 1000 INSERTs of 1 row each =
1000 tiny parts; the merge thread can't keep up; Too many parts errors begin within
minutes.
-- Wrong: 1000 parts, each 1 row
INSERT INTO events VALUES (1, 'a');
INSERT INTO events VALUES (2, 'b');
... -- repeat 998 more times
-- Right: 1 part, 1000 rows
INSERT INTO events VALUES (1, 'a'), (2, 'b'), ..., (1000, 'aaaa');
-- Or with FORMAT (vastly preferable for large batches):
INSERT INTO events FORMAT JSONEachRow
{"id":1,"v":"a"}
{"id":2,"v":"b"}
...
-- Rule of thumb: aim for ~10-100 MB or ~1M rows per INSERT.
-- Below 1000 rows is almost always wrong unless using async_insert.
The hard floor is set by min_insert_block_size_rows (default 1,048,576) and
min_insert_block_size_bytes (default 256 MB). Inserts smaller than this are accepted
but flagged as suboptimal. The hard ceiling is set by max_insert_block_size (default
~1M rows) which controls how the server splits an oversized insert into multiple parts; very
large inserts get sliced.
async_insert: Server-Side Batching
When client-side batching is hard (e.g., a fleet of microservices each emitting 10 rows/sec),
ClickHouse can batch on the server side. Enable per-query or per-user with async_insert=1:
-- Per-query: client sends, server buffers, flushes async
INSERT INTO events SETTINGS async_insert=1, wait_for_async_insert=1
VALUES (1, 'a'), (2, 'b'), (3, 'c');
-- Per-user (preferred for fleet of producers):
ALTER USER ingest_writer SETTINGS async_insert = 1;
-- Tunables on the server:
-- async_insert_max_data_size = 1048576 -- flush at 1 MB
-- async_insert_busy_timeout_ms = 200 -- flush at 200ms
-- async_insert_max_query_number = 450 -- flush after N queries
-- Behavior:
-- Server accumulates inserts from multiple connections into a per-table buffer.
-- When ANY threshold (size, time, count) trips, buffer flushes as one part.
-- Server returns success to clients only after their data is safely persisted.
The trade-off: a higher latency between INSERT-acknowledged and data-queryable. By default,
wait_for_async_insert=1 means clients wait until the buffer flushes before getting
an OK — so individual insert latency rises to ~200ms. With
wait_for_async_insert=0, the server returns immediately after queuing — lower
latency but the client doesn't know if the data was actually persisted before a server crash.
async_insert is the pragmatic answer for "we have 10,000 microservices and don't want to build a Kafka pipeline." It works extremely well up to ~10K writes/second per CH node. Beyond that scale, the buffer contention itself becomes the bottleneck and you should switch to Kafka.
The Buffer Engine: In-Memory Staging
CREATE TABLE events_buffer AS events
ENGINE = Buffer(default, events,
16, -- num_layers (parallelism)
10, -- min_time seconds before flush
100, -- max_time seconds before flush
10000, -- min_rows
1000000, -- max_rows
10000000, -- min_bytes
100000000 -- max_bytes
);
-- Producers write to events_buffer; data lives in RAM.
-- When ANY of (max_time, max_rows, max_bytes) is hit, flush to events.
-- Reads UNION with the buffer: SELECT from events sees both, transparently.
-- Use cases:
-- - smooth out write bursts
-- - bridge a slow merger that's catching up
-- - avoid `Too many parts` during traffic spikes
-- Caveat: if the server crashes, in-buffer data is LOST. No durability.
-- For at-least-once, prefer async_insert with wait_for_async_insert=1
-- or use Kafka with offset commit-after-CH-flush. Buffer is older than async_insert and is largely deprecated in favor of it for new workloads, but it's still the right answer when you need a per-table buffer with custom sizing (e.g., a very high-volume table that needs different parameters than the rest of the system).
The Kafka Engine: Pull-Based Ingestion
The cleanest production pattern for sustained high-throughput ingestion. ClickHouse runs Kafka consumers in-process, polling topics, materializing rows into a target MergeTree table via a materialized view.
-- 1. The Kafka source table (acts like a stream)
CREATE TABLE events_kafka (
timestamp DateTime,
user_id UInt64,
event String,
payload String
) ENGINE = Kafka(
'kafka1:9092,kafka2:9092', -- brokers
'events', -- topic
'clickhouse-events', -- consumer_group
'JSONEachRow', -- format
1 -- num_consumers per node
);
-- 2. The destination MergeTree
CREATE TABLE events (
timestamp DateTime,
user_id UInt64,
event String,
payload String
) ENGINE = MergeTree
ORDER BY (timestamp, user_id)
PARTITION BY toYYYYMM(timestamp);
-- 3. The materialized view: glue Kafka -> MergeTree
CREATE MATERIALIZED VIEW events_mv TO events AS
SELECT timestamp, user_id, event, payload
FROM events_kafka;
-- ClickHouse polls Kafka in 5s windows, accumulates messages,
-- batches them as one INSERT into events. Offsets are committed
-- after the INSERT succeeds -- at-least-once semantics.
-- Tunables:
-- kafka_max_block_size = 65536 -- rows per INSERT
-- kafka_poll_timeout_ms = 5000 -- max wait per poll
-- kafka_flush_interval_ms = 7500 -- max wait before forcing flush
-- kafka_skip_broken_messages = 100 -- tolerate malformed JSON The Kafka engine puts the durability question on Kafka's side. ClickHouse only ACKs the offset after the data is in MergeTree; if CH crashes mid-batch, the next consumer picks up from the last committed offset and re-processes. Duplicates are possible at-least-once; combine with deduplication (next section) for exactly-once-ish.
S3 Engine: Bulk File Ingestion
-- Bulk-load a directory of Parquet files from S3 in parallel
INSERT INTO events
SELECT *
FROM s3(
'https://bucket.s3.us-east-1.amazonaws.com/year=2025/month=09/*.parquet',
'AWS_ACCESS_KEY',
'AWS_SECRET_KEY',
'Parquet'
);
-- ClickHouse reads files in parallel (controlled by max_threads).
-- Predicate pushdown into Parquet (skip row groups by min/max stats).
-- Schema is inferred from Parquet metadata; column names must match.
-- For repeated ingestion, define an S3 table:
CREATE TABLE events_s3 (
timestamp DateTime,
user_id UInt64,
event String
) ENGINE = S3(
'https://bucket.s3.us-east-1.amazonaws.com/events/*.parquet',
'Parquet'
);
INSERT INTO events SELECT * FROM events_s3 WHERE timestamp >= '2025-09-01';
-- Throughput: 50-200 MB/s per CH node depending on file size + network.
-- For huge backfills, run multiple INSERTs in parallel against
-- non-overlapping S3 prefixes. Materialized Views as ETL Pipelines
A materialized view in ClickHouse is not the cached query of Postgres. It is an insert trigger: when rows are inserted into the source table, the view's SELECT runs on those rows, and the result is inserted into the target table. This makes MVs the canonical way to do real-time aggregations, denormalization, and per-row transformation.
-- Source: raw events (high volume)
CREATE TABLE events_raw (...) ENGINE = MergeTree ORDER BY ...;
-- Target: hourly aggregates (smaller, query-optimized)
CREATE TABLE events_hourly (
hour DateTime,
event_type String,
count AggregateFunction(count),
unique_users AggregateFunction(uniq, UInt64)
) ENGINE = AggregatingMergeTree
ORDER BY (hour, event_type);
-- The MV: runs on every INSERT into events_raw
CREATE MATERIALIZED VIEW events_hourly_mv TO events_hourly AS
SELECT
toStartOfHour(timestamp) AS hour,
event_type,
countState() AS count,
uniqState(user_id) AS unique_users
FROM events_raw
GROUP BY hour, event_type;
-- Query (final aggregation merges intermediate states):
SELECT
hour,
event_type,
countMerge(count) AS total,
uniqMerge(unique_users) AS uniques
FROM events_hourly
WHERE hour >= now() - INTERVAL 24 HOUR
GROUP BY hour, event_type; The pattern: ingest raw events at MergeTree firehose rate, compute aggregates incrementally via AggregatingMergeTree + MV, query small aggregate tables for dashboards. This is how production analytics platforms (Cloudflare, ClickHouse Cloud's own observability) handle billions of events per day on a few dozen nodes.
Dictionaries: External Lookup Tables
Joins are expensive in ClickHouse (no traditional indexes; hash-join build phase dominates for small dimensions). For lookups against small reference data — user_id → user_name, country_code → country_name — dictionaries are the right tool.
-- A dictionary loads from any source (PostgreSQL, MySQL, ClickHouse, file)
CREATE DICTIONARY users_dict (
user_id UInt64,
user_name String,
country String,
plan String
)
PRIMARY KEY user_id
SOURCE(POSTGRESQL(
host 'pg' port 5432 user 'ro' password '...'
db 'app' table 'users'
))
LAYOUT(HASHED()) -- in-memory hash table
LIFETIME(MIN 300 MAX 600); -- refresh every ~5-10 minutes
-- In a query (zero-copy lookup, sub-microsecond):
SELECT
timestamp,
dictGet('users_dict', 'user_name', user_id) AS name,
dictGet('users_dict', 'country', user_id) AS country
FROM events
WHERE timestamp >= today();
-- LAYOUT options:
-- HASHED -- in-memory hash; up to ~1B keys per node
-- SPARSE_HASHED -- compact hash; slower but smaller
-- CACHE -- LRU; query-time fetch from source on miss
-- COMPLEX_KEY_HASHED -- multi-column primary key
-- RANGE_HASHED -- temporal validity (time-bounded values) Refreshable dictionaries are the load-bearing primitive for real-time enrichment. Update the underlying Postgres table; within LIFETIME the dictionary is re-loaded; queries see the new value. Combined with materialized views, you can denormalize dimensional data into the fact table on insert, eliminating runtime joins entirely.
Distributed Inserts and Replication
For multi-node clusters, ingestion has two orthogonal concerns: sharding (which node owns which row) and replication (how many copies of each row exist).
-- Pattern 1: Distributed engine for sharded inserts
CREATE TABLE events_local ON CLUSTER my_cluster (
timestamp DateTime, user_id UInt64, event String
) ENGINE = ReplicatedMergeTree(
'/clickhouse/tables/{shard}/events_local',
'{replica}'
)
ORDER BY (timestamp, user_id)
PARTITION BY toYYYYMM(timestamp);
CREATE TABLE events ON CLUSTER my_cluster AS events_local
ENGINE = Distributed(
my_cluster, -- cluster name
default, -- database
events_local, -- target table
cityHash64(user_id) -- sharding key
);
-- Inserts go to the Distributed table:
INSERT INTO events VALUES (...);
-- ClickHouse routes each row to the correct shard based on sharding_key.
-- Two-phase: write to local temp directory on the receiving node,
-- then forward to the destination shard. If forwarding fails, retries
-- happen from the temp directory.
-- ReplicatedMergeTree on each shard:
-- One replica is leader; INSERT writes a part there.
-- The part is registered in ZooKeeper / Keeper.
-- Other replicas pull the part by name.
-- Quorum writes (insert_quorum) optionally wait for N replicas to ACK. For ingestion, the choice between writing to Distributed or directly to the local node depends on traffic shape. Direct local writes (with the client computing the shard) are faster but require client-side awareness of cluster topology. Distributed writes are simpler but pay the forwarding cost. Most production setups use both: Kafka consumers know the topology and write directly to local; ad-hoc writes go through Distributed.
Deduplication and Idempotent Inserts
ClickHouse's MergeTree supports automatic insert-block deduplication: if the exact same block of rows is inserted twice (same content, same hash), the second one is silently dropped.
-- Server tracks the hash of the last ~100 inserted blocks per partition
-- in ZooKeeper / Keeper. Re-insertion of an identical block is a no-op.
INSERT INTO events VALUES (1, 'a'), (2, 'b');
INSERT INTO events VALUES (1, 'a'), (2, 'b'); -- silently deduplicated
-- Tunables:
-- replicated_deduplication_window = 100 -- how many blocks to remember
-- replicated_deduplication_window_seconds = 604800 -- 7 days
-- Plus: explicit insert_deduplication_token for client-driven idempotency
INSERT INTO events SETTINGS insert_deduplication_token = 'batch-2025-09-15-001'
VALUES (1, 'a'), (2, 'b');
-- Re-running the same INSERT with the same token is a no-op,
-- even if the data slightly differs. Useful for retry-safe pipelines.
-- For row-level deduplication (one row per natural key, not block):
-- use ReplacingMergeTree, which dedupes during merge based on sorting key. Backpressure and Throttling
max_insert_block_size(default 1,048,576): server-side cap on rows per part. A too-large insert is split into multiple parts.max_partitions_per_insert_block(default 100): one INSERT can touch at most N partitions. Default protects against accidental cross-partition inserts that would create 100s of tiny parts.parts_to_throw_insert(default 300): when active parts in a partition exceed this, INSERTs fail withToo many parts. The signal: your merge throughput is below your insert throughput. Either increase merge parallelism, decrease insert frequency, or scale out.parts_to_delay_insert(default 150): INSERTs are artificially delayed (sleep) to give merges time to catch up. Soft backpressure.max_threadson insert: parallelism for the SELECT side of an INSERT SELECT. Doesn't affect single-block inserts.
-- Diagnose ingestion stress:
SELECT
database,
table,
count() AS active_parts,
sum(rows) AS rows,
sum(bytes_on_disk) AS bytes
FROM system.parts
WHERE active = 1 AND database = 'default'
GROUP BY database, table
ORDER BY active_parts DESC;
-- If any table shows > 200 active parts and growing:
-- 1. Check merge mutations rate
-- 2. Increase background_pool_size
-- 3. Reduce insert frequency / increase batch size
-- 4. Look at "OPTIMIZE TABLE foo FINAL" as last resort (expensive) Tradeoffs & Failure Modes
- "Too many parts" in production. Almost always caused by client-side per-row inserts. Fix: enable async_insert per-user, or move ingestion to Kafka engine.
- Async_insert latency surprise. A team enables async_insert without realizing the wait_for_async_insert default makes individual writes 200ms. For low-volume APIs this is fine; for transactional workloads it's not. Set wait_for_async_insert=0 if you accept eventual consistency.
- Buffer engine data loss on crash. Buffer is in-memory only; a crash loses un-flushed data. Don't use for anything where loss is unacceptable; use async_insert with wait_for_async_insert=1 instead.
- Kafka offset commit before flush. Misconfigured Kafka engine can ACK offsets before MergeTree write succeeds, causing data loss on crash. Default config is correct; verify if you've changed kafka_commit_every_batch settings.
- Skewed sharding key. If your sharding_key isn't well distributed (e.g., cityHash64 of timestamp), one shard takes 90% of writes. Use a high-cardinality identifier (user_id, request_id) and verify with system.query_log.
- Replication lag. ReplicatedMergeTree replicas pull parts asynchronously.
A replica that lags far enough may run out of disk for the queue. Monitor
system.replication_queue; alert on backlog > 1000. - Materialized view side-effects. An MV with a heavy GROUP BY runs on every
INSERT into the source table. A 100M-row INSERT can OOM the MV's aggregation. Set
max_memory_usage_for_userconservatively for the user owning the MV. - Schema mismatch in Kafka engine. A producer changes the JSON schema; the Kafka engine starts dropping malformed rows silently (or noisily, depending on kafka_skip_broken_messages). Add a DLQ topic + alerts.
- Dictionary refresh staleness. A dictionary's LIFETIME is too long; queries return stale dimensional data. Tune to match your tolerance; for critical dimensions use LIFETIME(0) (refresh on every query, expensive) or COMPLEX_KEY_DIRECT (skip cache entirely).
FAQ
What's the actual maximum ingest throughput on commodity hardware?
ClickBench and various community benchmarks put it at roughly 100–600M rows/second per modern x86 server (16+ cores, NVMe SSD), depending on schema width and format. Per-row throughput varies enormously: 5-column events hit 600M+ rows/s; 200-column wide rows are 10x slower. Network is often the bottleneck before CPU on cloud VMs — provisioned bandwidth caps out around 25 Gbps even on big instances.
Should I use Kafka or async_insert?
Volume rule of thumb: under ~10K events/sec total across all producers, async_insert. Above that, Kafka. Kafka adds operational complexity but gives you durability of a log, replay, back-pressure, and can buffer multi-hour outages. async_insert is "good enough" for many internal apps; Kafka is the answer when ingestion is mission-critical or volume is sustained high.
Is INSERT SELECT atomic?
The INSERT-side is atomic per part: a part is either fully written or absent. The SELECT side is read at a single transaction snapshot per query (within a partition; across partitions there's no global snapshot). For a long-running INSERT SELECT, partial visibility is possible: some parts have been written and are queryable while later parts are still being computed. Wrap in a transaction (CH 24+ supports limited transactions) for stricter semantics, or write to a temp table first then RENAME.
How do I bulk-load 100 TB of Parquet from S3 quickly?
Run multiple parallel INSERT SELECT FROM s3() queries, each targeting a non-overlapping prefix (e.g. one per date partition). Use a cluster of CH nodes; each node ingests independently into its local replica. Disable insert quorum during the bulk load and re-enable after. Rough rate: 1–2 TB/hour per node. Plan for the merge backlog to settle for a few hours after bulk load completes.
What happens if I INSERT into a Distributed table while one shard is down?
By default, the data destined for the down shard is staged in the source node's
/var/lib/clickhouse/data/<db>/<dist_table>/ directory and forwarded
when the shard recovers. With insert_distributed_sync=1, the INSERT instead
fails immediately if the shard is unreachable — preferable for data pipelines that need
to know about partial failures.
Why is my INSERT slow even though the server has CPU available?
Common causes: (1) format parsing — JSONEachRow is 5x slower than RowBinary; (2) WAL fsync
on slow disk — check system.metrics for InsertedBytes vs disk bandwidth; (3)
waiting on replication quorum — insert_quorum=2 means waiting for a replica ACK; (4)
materialized view computation — a heavy MV runs synchronously per INSERT and can
dominate.
How do I do exactly-once ingestion from Kafka?
ClickHouse's Kafka engine is at-least-once. For exactly-once: enable
insert_deduplication_token using a deterministic key derived from
(kafka_topic, kafka_partition, kafka_offset). Re-delivered messages have the same token; CH
deduplicates the block. Combined with ReplacingMergeTree on (event_id) for row-level dedup,
you get effective exactly-once semantics even across crashes.
Can I ingest into a partition that's in the middle of being merged?
Yes. Merges happen in parallel with ingestion. The merge produces a new larger part that
replaces the merged sources atomically; new INSERTs write parts beside it. The only overlap
issue is throughput contention on the disk and the merge-thread pool — under heavy
ingestion, increase background_pool_size and
background_merges_mutations_concurrency_ratio.