Shuffle

From Hash to Sort to Push-Based Magnet

Shuffle is where Spark spends most of its time on real workloads. Whenever data must be repartitioned across executors — for joins, aggregations, repartition() — each map task writes its output to disk, and each reduce task reads its slice from every map output. The all-to-all communication pattern is unavoidable; the question is how to do it efficiently.

Spark has gone through three major shuffle implementations: hash shuffle (1.x, abandoned), sort shuffle (default since 2.0), and tungsten-sort (UnsafeShuffleWriter, used when keys allow). The latest addition is push-based shuffle (KIP-Magnet, 3.2+), which fundamentally changes the data flow to address the small-read problem on large shuffles.

Architecture (Sort Shuffle)

Map side (M tasks) Reduce side (R tasks) Map task 0 writes shuffle_0.data + .index Map task 1 writes shuffle_1.data + .index Map task 2 Map task M-1 Disk (per map task) shuffle_0.data (sorted) shuffle_0.index (R offsets) shuffle_1.data shuffle_1.index Reduce task 0 reads slice 0 from each map Reduce task 1 reads slice 1 from each map Reduce task 2 Reduce task R-1 Each reduce task reads M slices (one from each map's output) — total M*R reads

Key Numbers

200
spark.sql.shuffle.partitions default
2 files
Per map task: .data + .index
M*R
Total shuffle reads (the small-read problem)
128 MB
Typical map output size for healthy shuffle
3.2
Spark version with push-based shuffle
~30%
LinkedIn's reported runtime reduction with push-based
0
Hash shuffle's status today (deprecated)

Sort Shuffle (Default)

Each map task partitions records by destination, sorts them, and writes a single output file with an index of partition offsets.

# Map-side write (SortShuffleWriter, simplified)
class SortShuffleWriter:
  buffer = ExternalSorter(partitioner=hashPartitioner, ordering=Some(...))
  for (key, value) in input_iterator:
    buffer.insert(key, value)
  # Sort by partition_id then by key (if keyOrdering provided)
  partitionedIterator = buffer.partitionedIterator()
  output_data = open("shuffle_X.data", "w")
  output_index = open("shuffle_X.index", "w")
  offset = 0
  for partition_id in 0..numPartitions:
    output_index.write(offset)
    for record in partitionedIterator.partition(partition_id):
      bytes = serialize(record)
      output_data.write(bytes)
      offset += len(bytes)
  output_index.write(offset)  # sentinel for last partition

# Reduce-side read (BlockStoreShuffleReader)
class BlockStoreShuffleReader:
  for map_id in 0..M:
    block = fetch_block(map_id, my_reduce_id)
    # block fetch reads .index to find offset, then reads
    # <range> bytes from .data
  yield combined_iterator

Tungsten-Sort: Off-Heap Sorting

# UnsafeShuffleWriter is automatically chosen when:
# - shuffle dependency does not need ordering on values
# - serializer supports relocation of serialized values (Kryo, default)
# - number of output partitions <= 16777215 (24-bit partition_id)

# How it works:
# 1. Records are serialized into off-heap memory pages (4 MB each)
# 2. A long[] index holds 8-byte pointers: top 24 bits = partition_id, bottom 40 = offset
# 3. Sort the long[] in place — extremely cache-friendly
# 4. Walk the sorted index, copy serialized bytes to output file

# Why faster than regular sort shuffle:
# - No object deserialization during sort
# - No Java GC during sort (off-heap)
# - Pointer is 8 bytes; sort one long[] of pointers, not heavyweight tuples
# - The CPU cache hits much more often during sort comparisons

# Result: 2-3x faster shuffle write for typical workloads

External Shuffle Service

spark.shuffle.service.enabled = true
spark.shuffle.service.port = 7337

# YARN: deploy as auxiliary service in NodeManager
# K8s: deploy as DaemonSet (or use newer per-PVC alternative)

# Without external shuffle service:
# - Executor X writes shuffle files
# - Spark cannot remove executor X while another stage needs to read those files
# - Limits dynamic allocation effectiveness

# With external shuffle service:
# - Executor writes files to local disk via the service's NIO interface
# - Service serves reads from any executor
# - Spark can release executor X immediately; service still serves its files
# - Enables aggressive dynamic allocation

# Tradeoffs:
# - Adds operational complexity (one more daemon)
# - Service is an SPOF per node (if it dies, all shuffle on that node is gone)
# - Memory and threadpool sizing is one more thing to tune

Push-Based Shuffle (Magnet, 3.2+)

spark.shuffle.push.enabled = true
spark.shuffle.push.maxBlockSizeToPush = 1m
spark.shuffle.push.maxBlockBatchSize = 3m
spark.shuffle.push.merge.finalizeTimeout = 10s
spark.shuffle.push.numPushThreads = 4

# Architecture:
# 1. Map tasks finish, write shuffle output as usual
# 2. After writing, mappers push their output blocks to assigned merger nodes
# 3. A small number of merger nodes are chosen per shuffle (e.g. 5 of N executors)
# 4. Each merger receives blocks for its assigned partitions and merges them
#    on the fly into a per-partition merged file
# 5. Reduce tasks then read pre-merged blocks (one per partition per merger)
#    instead of M small blocks

# Benefits:
# - 100,000 small reads -> ~10 large reads per reduce task
# - Disk IO sequential, much faster
# - Network reads fewer, less RPC overhead
# - Merger nodes also act as a cache for late-arriving fetches

# What if the merger fails?
# - Push is best-effort; reducer falls back to direct map-side fetch
# - Push-based shuffle is layered on top of regular sort shuffle

Skew Handling

# Symptom: one task in a stage takes 10 minutes while peers take 30 seconds
# Cause: hash(key) % numPartitions concentrates one or more keys

# Solution 1: AQE skew join (automatic, recommended)
spark.sql.adaptive.enabled = true
spark.sql.adaptive.skewJoin.enabled = true
spark.sql.adaptive.skewJoin.skewedPartitionFactor = 5
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes = 256m

# Solution 2: Salted joins (manual, when AQE not applicable)
val salted = orders.withColumn("salt", lit(scala.util.Random.nextInt(10)))
val saltedDim = customers.crossJoin(spark.range(10).toDF("salt"))
salted.join(saltedDim, Seq("cust_id", "salt"))
# Each hot key is split across 10 salt buckets; load is even

# Solution 3: Pre-aggregate before join
# If aggregating by skewed key, do partial aggregation in a map-side step
# so the reduce side sees small partial counts instead of millions of rows

Configuring spark.sql.shuffle.partitions

# Default: 200
# This is almost never right.

# Rule of thumb: target 128 MB per shuffle partition
# Total shuffle output = M * avg_partition_size
# Set partitions = Total / 128MB

# Example: 1 TB input expected to shuffle ~500 GB after filter
# Optimal partitions ~= 500_000 / 128 = 3900

# Too few partitions:
# - Each partition is huge (multi-GB), spills to disk, slow
# - Limits parallelism (one task per partition)

# Too many partitions:
# - Each partition is tiny, scheduling overhead dominates
# - Many small shuffle reads, the M*R problem gets worse

# AQE coalescePartitions can save you from over-allocating:
spark.sql.adaptive.coalescePartitions.enabled = true
spark.sql.adaptive.advisoryPartitionSizeInBytes = 128m
# Set spark.sql.shuffle.partitions high (1000+); AQE coalesces dynamically

Tradeoffs and Common Pitfalls

Good shuffle hygiene

  • Enable AQE (handles skew + partition sizing)
  • Use external shuffle service or remote shuffle (Celeborn) for dynamic allocation
  • Enable push-based shuffle for large jobs (3.2+)
  • Choose Kryo serialization for tungsten-sort path
  • Watch shuffle read/write metrics in Spark UI

Common pitfalls

  • Default 200 shuffle partitions on TB-scale data (huge spills)
  • Disabling AQE because of one bad estimate (almost always net win)
  • repartition() before a final write (extra unnecessary shuffle)
  • Salted joins hand-coded when AQE would suffice

Frequently Asked Questions

Why was hash shuffle replaced by sort shuffle?

Hash shuffle (Spark 1.x default) had each map task open one file per reduce partition. With M map tasks and R reduce partitions, that was M*R files — easily millions on real workloads. The OS file descriptor and inode pressure made hash shuffle unscalable. Sort shuffle, the current default, has each map task write a single sorted file plus an index, so the file count is just M. The reduce side seeks into the index to find its slice.

What is tungsten-sort shuffle?

Tungsten-sort (UnsafeShuffleWriter) is an off-heap optimization for sort shuffle. Instead of sorting (key, value) Java objects, it sorts pointers to serialized bytes in off-heap memory. The pointer is 8 bytes containing both the partition ID and the offset of the record in the page. This makes the sort cache-friendly and avoids GC pressure. It is used automatically when keys do not need user-defined ordering.

What does the external shuffle service do?

The external shuffle service is a long-running process (one per node in YARN, optionally one per node as DaemonSet on K8s) that serves shuffle file reads on behalf of executors. The benefit: when an executor exits, its shuffle files remain accessible. This enables dynamic allocation — Spark can shrink the executor count without losing shuffle data needed by future stages. Without the service, Spark must keep executors alive for shuffle.

What is push-based shuffle?

Push-based shuffle (Magnet, KIP-style design from LinkedIn, Spark 3.2+) inverts the pull model. Instead of each reduce task pulling many small map outputs, map tasks push their outputs directly to a small number of dedicated shuffle merge nodes that aggregate by partition. Reduce tasks then pull pre-merged blocks. This dramatically reduces small reads, which were the dominant performance problem for large shuffles. LinkedIn reported 30% job runtime reductions.

How is shuffle skew different from data skew?

Data skew is when input partitions are very uneven in size — a single oversized input file. Shuffle skew is when post-shuffle partitions are uneven — typically because hash(key) % numPartitions concentrates many records in one partition due to a hot key or low cardinality. AQE specifically handles shuffle skew by detecting partitions far above median and splitting them into subpartitions. For input skew, you handle it via repartitioning before processing.