Distributed Tables in ClickHouse
A Distributed table is not storage. It is a view + router that fans a query out to local shard
tables, ships partial state back, and merges. Combine it with ReplicatedMergeTree on each shard and
you have ClickHouse's classic horizontally-scaled topology: writes land on any node, reads scatter-gather, replicas
eat reads, and the cluster keeps going as long as one replica per shard is alive. The catch: distributed JOINs
and aggregations have semantics most engineers misread on first contact, and INSERTs through Distributed have
durability behavior that depends on a single setting flip.
Topology at a glance
The Distributed engine resolves shard / replica hosts from the cluster config, runs the local query against one
replica per shard, and merges partial states on the initiator.
Key Numbers
internal_replication — write once or N×Cluster config & ON CLUSTER DDL
Clusters are defined in remote_servers XML (or config.d/clusters.xml). Each <shard> element groups
replicas; nodes inside one shard hold the same data, nodes across shards hold disjoint slices.
<remote_servers>
<analytics>
<shard>
<internal_replication>true</internal_replication>
<replica><host>ch-1a</host><port>9000</port></replica>
<replica><host>ch-1b</host><port>9000</port></replica>
</shard>
<shard>
<internal_replication>true</internal_replication>
<replica><host>ch-2a</host><port>9000</port></replica>
<replica><host>ch-2b</host><port>9000</port></replica>
</shard>
</analytics>
</remote_servers>
Once the cluster is named, ON CLUSTER propagates DDL through the system.distributed_ddl queue
(Keeper-coordinated). The same statement creates the local table on every node:
CREATE TABLE events_local ON CLUSTER analytics (
ts DateTime,
user_id UInt64,
event LowCardinality(String),
payload String
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/events',
'{replica}')
PARTITION BY toYYYYMM(ts)
ORDER BY (user_id, ts);
CREATE TABLE events_dist ON CLUSTER analytics
ENGINE = Distributed(analytics, currentDatabase(), events_local, cityHash64(user_id));
The macros {shard} and {replica} come from config.xml per-host. The fourth argument to
Distributed() is the sharding key expression evaluated at insert time on the initiator.
Sharding keys & data placement
The sharding-key expression must produce an integer. ClickHouse computes shard = key % total_weight after
adjusting for per-shard weights. cityHash64(user_id) is the canonical pick: it spreads load uniformly while
keeping all rows for one user on the same shard, which means GROUP-BY-user can be answered without cross-shard
re-aggregation if you push the GROUP BY down with distributed_group_by_no_merge=1.
Bad sharding keys: rand() (uniform, but breaks per-user locality), now() (everything lands on one shard
per second), or anything monotonic. Inspect actual placement with the virtual column:
SELECT _shard_num, count() FROM events_dist GROUP BY _shard_num ORDER BY _shard_num;
-- _shard_num │ count()
-- 1 │ 3,421,100
-- 2 │ 3,398,775
-- 3 │ 3,403,002 _shard_num is virtual — it does not exist on disk; the Distributed engine injects it during result merging.
It is invaluable for debugging skew, locating a misrouted row, or running a query against just one shard
(WHERE _shard_num = 2).
internal_replication: the one flag that matters
With internal_replication=true, an INSERT through the Distributed table picks one replica per shard
and writes there; replication propagates the part to siblings via ReplicatedMergeTree. With
internal_replication=false, the Distributed engine writes the same data to every replica itself —
which doubles or triples your write traffic and breaks deduplication.
| true (recommended) | false | |
|---|---|---|
| Write path | 1 replica per shard | All replicas per shard |
| Replication | ReplicatedMergeTree via Keeper | Distributed engine writes N× |
| Dedup | Works (block hash in Keeper) | Broken (writes look distinct) |
| Use with | ReplicatedMergeTree | Plain MergeTree only |
If your local engine is ReplicatedMergeTree, you almost always want internal_replication=true.
The other combo exists for legacy clusters running plain MergeTree where the Distributed engine is doing all the
replication itself.
Query execution: scatter, gather, merge
A query against a Distributed table is rewritten so each shard runs a "local" version against events_local.
Aggregations are special: ClickHouse computes partial aggregate states on each shard and merges them
on the initiator. That is why uniq() and quantileTDigest() work cluster-wide — their internal state
is mergeable.
-- Initiator-side rewrite (conceptually):
SELECT event, sumState(1) AS s, uniqState(user_id) AS u
FROM (
SELECT * FROM remote(shard1, default, events_local) UNION ALL
SELECT * FROM remote(shard2, default, events_local) UNION ALL
SELECT * FROM remote(shard3, default, events_local)
)
GROUP BY event;
-- Final merge on initiator: sumMerge(s), uniqMerge(u) Inspect what is actually shipped with EXPLAIN:
EXPLAIN PIPELINE SELECT event, count() FROM events_dist GROUP BY event;
-- (ReadFromRemote) ← per-shard scan
-- (Aggregating) ← partial aggregation on shards
-- (MergingAggregated) ← final merge on initiator Settings that change the shape of execution:
distributed_group_by_no_merge=1— skip the final merge; results are pre-grouped per shard. Safe only when the GROUP BY key is a prefix of the sharding key.optimize_distributed_group_by_sharding_key=1— automatic version of the above.prefer_localhost_replica=1— avoid a TCP roundtrip when the initiator owns a replica.max_parallel_replicas=N— split a single shard's work across replicas.
Distributed INSERTs: sync vs async
By default an INSERT into a Distributed table writes to a local spool directory on the initiator and
returns success immediately. A background sender thread later forwards each .bin file to the destination
shard. This is fast but the durability story is "if the initiator's disk survives, the row will eventually land".
-- Async (default): client gets ack after local spool flush
INSERT INTO events_dist VALUES (now(), 42, 'click', '...');
-- Sync: block until each shard confirms
SET insert_distributed_sync = 1;
INSERT INTO events_dist VALUES (now(), 42, 'click', '...'); The spool lives at /var/lib/clickhouse/store/<uuid>/<shard>/. Inspect backlog and errors:
SELECT database, table, data_path, is_blocked, error_count,
formatReadableSize(bytes_count) AS pending
FROM system.distribution_queue; For exactly-once semantics most teams skip Distributed INSERTs entirely: clients hash the key themselves and write directly to the right shard's local table. This costs a tiny client-side router but gives you back-pressure and immediate failure visibility.
Replicated + Distributed: the canonical pattern
Each node hosts:
- A
ReplicatedMergeTreetable — the actual data, replicated via Keeper. - A
Distributedview — the routing layer, reachable from any node.
Reads of events_dist from any node fan out to some replica per shard (load-balancing policy is
configurable: random, nearest_hostname, in_order, first_or_random). Writes through Distributed,
with internal_replication=true, hit one replica per shard and Keeper handles the fan-out within the shard.
Failure of one replica per shard is invisible. Failure of an entire shard fails any query that needs that shard.
Tradeoffs
- + Linear scan throughput: 3 shards ≈ 3× scan rate.
- + Writes parallelize across shards by sharding-key hash.
- +
ON CLUSTERDDL keeps schema in sync without external orchestration. - − Distributed JOINs are tricky: only
GLOBAL JOINis shard-safe, and it broadcasts. - − Async INSERT spool can lose data on initiator disk failure.
- − Queries fail if any shard is down; use
skip_unavailable_shards=1only for approximate analytics. - − Re-sharding is manual (DETACH PARTITION + ship + ATTACH). No automatic split/merge.
FAQ
What does _shard_num actually return?
The 1-based index of the shard in the cluster definition. It is injected by the Distributed engine on the initiator while merging streams; it has no on-disk representation.
Can I JOIN two Distributed tables?
Yes, but a plain JOIN runs locally on each shard, which silently produces wrong answers if the right side isn't co-sharded. Use GLOBAL JOIN to broadcast the right side, or co-shard both tables on the same key.
How do I add a shard to an existing cluster?
Edit remote_servers on every node, create the local table on the new shard, then move existing data with ALTER TABLE … MOVE PARTITION TO TABLE through a temporary Distributed pointing at the destination. There is no online resharding.
Why is my INSERT slow when I switch to insert_distributed_sync=1?
Because you are now waiting for the slowest shard to fsync. The async path was hiding that latency in the spool. Sync mode is correct for "must not lose this row"; async is fine for high-volume event ingestion where the spool is acceptable.
Can I run a query against just one shard?
Yes: SELECT … FROM events_dist WHERE _shard_num = 2, or query the local table directly. The first form is convenient when you don't know which host owns shard 2.