ReplicatedMergeTree
ReplicatedMergeTree is the multi-replica variant of MergeTree. Coordination happens through
ZooKeeper (or its drop-in replacement, ClickHouse Keeper): every part is registered as a znode,
every replica subscribes to a queue of operations to perform, and merges are decided collectively. There is no
leader for writes — any replica accepts an INSERT, registers the new part with Keeper, and other replicas pull
it down. INSERTs are idempotent: blocks are deduped by hash for one hour by default. The quorum
knob (insert_quorum) lets you trade availability for durability. The whole machine is far simpler than
most multi-master databases because each part is independent and append-only.
Replication via Keeper
There is no leader for writes. Any replica creates a part, announces it in Keeper, and others pull the bytes directly.
Key Numbers
Schema and ZK path
CREATE TABLE events
(
ts DateTime,
user_id UInt64,
event LowCardinality(String)
)
ENGINE = ReplicatedMergeTree(
'/clickhouse/tables/{shard}/events', -- table-replication path
'{replica}' -- this replica's name
)
PARTITION BY toYYYYMM(ts)
ORDER BY (user_id, ts);
The {shard} and {replica} macros come from config.xml. The table path identifies the
logical table — every replica that uses the same path participates in replication. The replica name
identifies one replica within that group.
The replication path layout
/clickhouse/tables/01/events/
├── metadata — schema + engine settings
├── columns — column definitions
├── replicas/
│ ├── replica-A/
│ │ ├── queue/ — pending operations (FETCH, MERGE, MUTATE)
│ │ ├── parts/ — parts this replica owns
│ │ └── log_pointer — last log entry processed
│ ├── replica-B/...
│ └── replica-C/...
├── log/ — global write-ahead log of operations
├── blocks/ — INSERT block hashes (dedup window)
├── mutations/ — pending mutations
└── leader_election — who leads merges (advisory)
Every operation — new part, merge, mutation — appends to log/. Each replica reads the log, decides which
operations apply to it, and queues them.
INSERT path and idempotency
- Client sends INSERT to any replica.
- Replica writes the part locally, computes a block hash.
- Replica creates a znode under
blocks/<hash>in Keeper. If it already exists → INSERT is a duplicate, skip silently. - Replica appends a "GET part_X" entry to
log/. - Other replicas read the log, queue a FETCH operation, pull the bytes from the source replica over the inter-server HTTP port.
That third step gives you retry-safe INSERTs. If a client retries the same batch, the second copy
is identified by hash and dropped. The dedup window is one hour by default
(replicated_deduplication_window) — long enough to survive most network blips, short enough to avoid
unbounded Keeper growth.
insert_quorum: durability vs availability
By default INSERT returns success after the local part is registered in Keeper — replication happens asynchronously. To require multiple replicas to confirm:
SET insert_quorum = 2; -- N replicas must ack
SET insert_quorum_timeout = 60_000;
SET select_sequential_consistency = 1; -- reads see only quorum-acked rows
INSERT INTO events VALUES (now(), 42, 'click'); Quorum INSERTs block until N replicas have fetched the part. Use case: financial / audit-grade tables where you cannot tolerate a single-node loss between INSERT ack and replication. Cost: latency goes up, availability goes down (any timeout = INSERT failure even if the local write succeeded).
Merges and mutations
Merges are decided centrally — one replica is chosen as leader and writes the "MERGE parts X+Y → Z" entry to
log/. Other replicas read the entry, queue it locally, and either run the merge themselves or fetch the
result from the leader. Either path produces byte-identical parts because merges are deterministic.
Mutations propagate the same way: ALTER UPDATE writes a mutation entry, every replica picks it up, every
replica rewrites its parts. That's why mutations on a 3-replica cluster cost 3× the IO.
SELECT replica_name, queue_size, inserts_in_queue, merges_in_queue,
absolute_delay, last_queue_update
FROM system.replicas
WHERE table = 'events'; Operational tables
-- All replicas, lag, error count
SELECT * FROM system.replicas;
-- Pending operations on this replica
SELECT type, source_replica, parts_to_merge, num_postponed,
postpone_reason, last_exception
FROM system.replication_queue
ORDER BY create_time DESC;
-- Unrecovered parts (require RESTORE REPLICA)
SELECT * FROM system.replicated_fetches; Tradeoffs
- + Leaderless: any replica accepts writes, no failover dance.
- + Idempotent INSERTs survive retries automatically.
- + Merge work decided collectively; replicas can pull merged result instead of redoing it.
- − Requires running and operating a Keeper ensemble.
- − Mutations cost N× IO across replicas.
- − Keeper is a hot dependency; outage stops new INSERTs (existing reads keep working).
- − Dedup window is fixed-time, not per-batch — long client outages can still produce duplicates.
FAQ
Should I use ZooKeeper or ClickHouse Keeper?
Keeper. It's wire-compatible with ZK, written in C++, no JVM tuning, and packaged with ClickHouse. New deployments use Keeper; legacy ZK clusters can be migrated in place.
What if Keeper goes down?
Existing reads continue. New INSERTs fail because they can't register block hashes. Merges pause. Recovery: bring Keeper back; replicas catch up on their queues automatically.
How do I add a new replica?
CREATE TABLE with the same path on the new node. It registers itself in Keeper and starts pulling parts from existing replicas. No special bootstrap needed beyond network reachability.
Why is my replica's queue growing?
Either the source replica is unreachable (network), the replica's disk is full, or the queue is blocked on a poison-pill mutation. Check system.replication_queue.last_exception.
Can replicas have different schemas?
No. Schema lives in Keeper at the table path; every replica must match. Use ON CLUSTER ALTER to keep them in sync.