Replication via Keeper

Keeper /clickhouse/tables/{shard}/... replica A part_42 part_43 queue: 0 replica B part_42 part_43 queue: 0 replica C (lag) part_42 (fetching) queue: 1 Keeper holds metadata; replicas fetch part bytes peer-to-peer.

There is no leader for writes. Any replica creates a part, announces it in Keeper, and others pull the bytes directly.

Key Numbers

leaderless
writes accepted by any replica
3
recommended Keeper ensemble size
1h
default INSERT dedup window
znode
per part — Keeper grows with parts
2/3
Keeper quorum for any operation
~ms
Keeper RTT under healthy load
P2P
part fetches between replicas

Schema and ZK path

CREATE TABLE events
(
    ts        DateTime,
    user_id   UInt64,
    event     LowCardinality(String)
)
ENGINE = ReplicatedMergeTree(
    '/clickhouse/tables/{shard}/events',  -- table-replication path
    '{replica}'                          -- this replica's name
)
PARTITION BY toYYYYMM(ts)
ORDER BY (user_id, ts);

The {shard} and {replica} macros come from config.xml. The table path identifies the logical table — every replica that uses the same path participates in replication. The replica name identifies one replica within that group.

The replication path layout

/clickhouse/tables/01/events/
├── metadata           — schema + engine settings
├── columns            — column definitions
├── replicas/
│   ├── replica-A/
│   │   ├── queue/     — pending operations (FETCH, MERGE, MUTATE)
│   │   ├── parts/     — parts this replica owns
│   │   └── log_pointer — last log entry processed
│   ├── replica-B/...
│   └── replica-C/...
├── log/               — global write-ahead log of operations
├── blocks/            — INSERT block hashes (dedup window)
├── mutations/         — pending mutations
└── leader_election    — who leads merges (advisory)

Every operation — new part, merge, mutation — appends to log/. Each replica reads the log, decides which operations apply to it, and queues them.

INSERT path and idempotency

  1. Client sends INSERT to any replica.
  2. Replica writes the part locally, computes a block hash.
  3. Replica creates a znode under blocks/<hash> in Keeper. If it already exists → INSERT is a duplicate, skip silently.
  4. Replica appends a "GET part_X" entry to log/.
  5. Other replicas read the log, queue a FETCH operation, pull the bytes from the source replica over the inter-server HTTP port.

That third step gives you retry-safe INSERTs. If a client retries the same batch, the second copy is identified by hash and dropped. The dedup window is one hour by default (replicated_deduplication_window) — long enough to survive most network blips, short enough to avoid unbounded Keeper growth.

insert_quorum: durability vs availability

By default INSERT returns success after the local part is registered in Keeper — replication happens asynchronously. To require multiple replicas to confirm:

SET insert_quorum = 2;          -- N replicas must ack
SET insert_quorum_timeout = 60_000;
SET select_sequential_consistency = 1;  -- reads see only quorum-acked rows

INSERT INTO events VALUES (now(), 42, 'click');

Quorum INSERTs block until N replicas have fetched the part. Use case: financial / audit-grade tables where you cannot tolerate a single-node loss between INSERT ack and replication. Cost: latency goes up, availability goes down (any timeout = INSERT failure even if the local write succeeded).

Merges and mutations

Merges are decided centrally — one replica is chosen as leader and writes the "MERGE parts X+Y → Z" entry to log/. Other replicas read the entry, queue it locally, and either run the merge themselves or fetch the result from the leader. Either path produces byte-identical parts because merges are deterministic.

Mutations propagate the same way: ALTER UPDATE writes a mutation entry, every replica picks it up, every replica rewrites its parts. That's why mutations on a 3-replica cluster cost 3× the IO.

SELECT replica_name, queue_size, inserts_in_queue, merges_in_queue,
       absolute_delay, last_queue_update
FROM system.replicas
WHERE table = 'events';

Operational tables

-- All replicas, lag, error count
SELECT * FROM system.replicas;

-- Pending operations on this replica
SELECT type, source_replica, parts_to_merge, num_postponed,
       postpone_reason, last_exception
FROM system.replication_queue
ORDER BY create_time DESC;

-- Unrecovered parts (require RESTORE REPLICA)
SELECT * FROM system.replicated_fetches;

Tradeoffs

  • + Leaderless: any replica accepts writes, no failover dance.
  • + Idempotent INSERTs survive retries automatically.
  • + Merge work decided collectively; replicas can pull merged result instead of redoing it.
  • Requires running and operating a Keeper ensemble.
  • Mutations cost N× IO across replicas.
  • Keeper is a hot dependency; outage stops new INSERTs (existing reads keep working).
  • Dedup window is fixed-time, not per-batch — long client outages can still produce duplicates.

FAQ

Should I use ZooKeeper or ClickHouse Keeper?

Keeper. It's wire-compatible with ZK, written in C++, no JVM tuning, and packaged with ClickHouse. New deployments use Keeper; legacy ZK clusters can be migrated in place.

What if Keeper goes down?

Existing reads continue. New INSERTs fail because they can't register block hashes. Merges pause. Recovery: bring Keeper back; replicas catch up on their queues automatically.

How do I add a new replica?

CREATE TABLE with the same path on the new node. It registers itself in Keeper and starts pulling parts from existing replicas. No special bootstrap needed beyond network reachability.

Why is my replica's queue growing?

Either the source replica is unreachable (network), the replica's disk is full, or the queue is blocked on a poison-pill mutation. Check system.replication_queue.last_exception.

Can replicas have different schemas?

No. Schema lives in Keeper at the table path; every replica must match. Use ON CLUSTER ALTER to keep them in sync.