Topology at a glance

SELECT … FROM events_dist shard 1 replica A replica B events_local shard 2 replica A replica B events_local shard 3 replica A replica B events_local

The Distributed engine resolves shard / replica hosts from the cluster config, runs the local query against one replica per shard, and merges partial states on the initiator.

Key Numbers

2
tables per node — local + Distributed view
N×M
topology: N shards × M replicas
1
replica chosen per shard at query time
async
default INSERT mode through Distributed
0/1
internal_replication — write once or N×
~50ms
typical fan-out overhead per query
_shard_num
virtual column tagging origin shard

Cluster config & ON CLUSTER DDL

Clusters are defined in remote_servers XML (or config.d/clusters.xml). Each <shard> element groups replicas; nodes inside one shard hold the same data, nodes across shards hold disjoint slices.

<remote_servers>
  <analytics>
    <shard>
      <internal_replication>true</internal_replication>
      <replica><host>ch-1a</host><port>9000</port></replica>
      <replica><host>ch-1b</host><port>9000</port></replica>
    </shard>
    <shard>
      <internal_replication>true</internal_replication>
      <replica><host>ch-2a</host><port>9000</port></replica>
      <replica><host>ch-2b</host><port>9000</port></replica>
    </shard>
  </analytics>
</remote_servers>

Once the cluster is named, ON CLUSTER propagates DDL through the system.distributed_ddl queue (Keeper-coordinated). The same statement creates the local table on every node:

CREATE TABLE events_local ON CLUSTER analytics (
    ts        DateTime,
    user_id   UInt64,
    event     LowCardinality(String),
    payload   String
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/events',
                             '{replica}')
PARTITION BY toYYYYMM(ts)
ORDER BY (user_id, ts);

CREATE TABLE events_dist ON CLUSTER analytics
ENGINE = Distributed(analytics, currentDatabase(), events_local, cityHash64(user_id));

The macros {shard} and {replica} come from config.xml per-host. The fourth argument to Distributed() is the sharding key expression evaluated at insert time on the initiator.

Sharding keys & data placement

The sharding-key expression must produce an integer. ClickHouse computes shard = key % total_weight after adjusting for per-shard weights. cityHash64(user_id) is the canonical pick: it spreads load uniformly while keeping all rows for one user on the same shard, which means GROUP-BY-user can be answered without cross-shard re-aggregation if you push the GROUP BY down with distributed_group_by_no_merge=1.

Bad sharding keys: rand() (uniform, but breaks per-user locality), now() (everything lands on one shard per second), or anything monotonic. Inspect actual placement with the virtual column:

SELECT _shard_num, count() FROM events_dist GROUP BY _shard_num ORDER BY _shard_num;
-- _shard_num │ count()
--     1      │ 3,421,100
--     2      │ 3,398,775
--     3      │ 3,403,002

_shard_num is virtual — it does not exist on disk; the Distributed engine injects it during result merging. It is invaluable for debugging skew, locating a misrouted row, or running a query against just one shard (WHERE _shard_num = 2).

internal_replication: the one flag that matters

With internal_replication=true, an INSERT through the Distributed table picks one replica per shard and writes there; replication propagates the part to siblings via ReplicatedMergeTree. With internal_replication=false, the Distributed engine writes the same data to every replica itself — which doubles or triples your write traffic and breaks deduplication.

true (recommended)false
Write path1 replica per shardAll replicas per shard
ReplicationReplicatedMergeTree via KeeperDistributed engine writes N×
DedupWorks (block hash in Keeper)Broken (writes look distinct)
Use withReplicatedMergeTreePlain MergeTree only

If your local engine is ReplicatedMergeTree, you almost always want internal_replication=true. The other combo exists for legacy clusters running plain MergeTree where the Distributed engine is doing all the replication itself.

Query execution: scatter, gather, merge

A query against a Distributed table is rewritten so each shard runs a "local" version against events_local. Aggregations are special: ClickHouse computes partial aggregate states on each shard and merges them on the initiator. That is why uniq() and quantileTDigest() work cluster-wide — their internal state is mergeable.

-- Initiator-side rewrite (conceptually):
SELECT event, sumState(1) AS s, uniqState(user_id) AS u
FROM (
    SELECT * FROM remote(shard1, default, events_local) UNION ALL
    SELECT * FROM remote(shard2, default, events_local) UNION ALL
    SELECT * FROM remote(shard3, default, events_local)
)
GROUP BY event;
-- Final merge on initiator: sumMerge(s), uniqMerge(u)

Inspect what is actually shipped with EXPLAIN:

EXPLAIN PIPELINE SELECT event, count() FROM events_dist GROUP BY event;
-- (ReadFromRemote)        ← per-shard scan
-- (Aggregating)           ← partial aggregation on shards
-- (MergingAggregated)     ← final merge on initiator

Settings that change the shape of execution:

  • distributed_group_by_no_merge=1 — skip the final merge; results are pre-grouped per shard. Safe only when the GROUP BY key is a prefix of the sharding key.
  • optimize_distributed_group_by_sharding_key=1 — automatic version of the above.
  • prefer_localhost_replica=1 — avoid a TCP roundtrip when the initiator owns a replica.
  • max_parallel_replicas=N — split a single shard's work across replicas.

Distributed INSERTs: sync vs async

By default an INSERT into a Distributed table writes to a local spool directory on the initiator and returns success immediately. A background sender thread later forwards each .bin file to the destination shard. This is fast but the durability story is "if the initiator's disk survives, the row will eventually land".

-- Async (default): client gets ack after local spool flush
INSERT INTO events_dist VALUES (now(), 42, 'click', '...');

-- Sync: block until each shard confirms
SET insert_distributed_sync = 1;
INSERT INTO events_dist VALUES (now(), 42, 'click', '...');

The spool lives at /var/lib/clickhouse/store/<uuid>/<shard>/. Inspect backlog and errors:

SELECT database, table, data_path, is_blocked, error_count,
       formatReadableSize(bytes_count) AS pending
FROM system.distribution_queue;

For exactly-once semantics most teams skip Distributed INSERTs entirely: clients hash the key themselves and write directly to the right shard's local table. This costs a tiny client-side router but gives you back-pressure and immediate failure visibility.

Replicated + Distributed: the canonical pattern

Each node hosts:

  1. A ReplicatedMergeTree table — the actual data, replicated via Keeper.
  2. A Distributed view — the routing layer, reachable from any node.

Reads of events_dist from any node fan out to some replica per shard (load-balancing policy is configurable: random, nearest_hostname, in_order, first_or_random). Writes through Distributed, with internal_replication=true, hit one replica per shard and Keeper handles the fan-out within the shard. Failure of one replica per shard is invisible. Failure of an entire shard fails any query that needs that shard.

Tradeoffs

  • + Linear scan throughput: 3 shards ≈ 3× scan rate.
  • + Writes parallelize across shards by sharding-key hash.
  • + ON CLUSTER DDL keeps schema in sync without external orchestration.
  • Distributed JOINs are tricky: only GLOBAL JOIN is shard-safe, and it broadcasts.
  • Async INSERT spool can lose data on initiator disk failure.
  • Queries fail if any shard is down; use skip_unavailable_shards=1 only for approximate analytics.
  • Re-sharding is manual (DETACH PARTITION + ship + ATTACH). No automatic split/merge.

FAQ

What does _shard_num actually return?

The 1-based index of the shard in the cluster definition. It is injected by the Distributed engine on the initiator while merging streams; it has no on-disk representation.

Can I JOIN two Distributed tables?

Yes, but a plain JOIN runs locally on each shard, which silently produces wrong answers if the right side isn't co-sharded. Use GLOBAL JOIN to broadcast the right side, or co-shard both tables on the same key.

How do I add a shard to an existing cluster?

Edit remote_servers on every node, create the local table on the new shard, then move existing data with ALTER TABLE … MOVE PARTITION TO TABLE through a temporary Distributed pointing at the destination. There is no online resharding.

Why is my INSERT slow when I switch to insert_distributed_sync=1?

Because you are now waiting for the slowest shard to fsync. The async path was hiding that latency in the spool. Sync mode is correct for "must not lose this row"; async is fine for high-volume event ingestion where the spool is acceptable.

Can I run a query against just one shard?

Yes: SELECT … FROM events_dist WHERE _shard_num = 2, or query the local table directly. The first form is convenient when you don't know which host owns shard 2.