Adaptive Query Execution
Replanning at Runtime With Real Statistics
Adaptive Query Execution (AQE) is Spark 3's answer to the limits of static query planning. Catalyst produces a physical plan based on whatever statistics it has at compile time — table sizes, column cardinalities, partition counts. After the first few joins and filters, those statistics are often meaningless, and the plan that looked optimal can be wildly wrong.
AQE re-plans the query at every shuffle boundary using the actual sizes of intermediate results. It does three things: coalesces small post-shuffle partitions, switches join strategies when one side turns out small enough to broadcast, and splits skewed partitions to eliminate stragglers. Enabling AQE typically gives 10-50% speedup on real ETL with no code change.
Architecture
Key Numbers
Configuration
# Enable AQE (default-on since 3.2)
spark.sql.adaptive.enabled = true
# Dynamic coalesce
spark.sql.adaptive.coalescePartitions.enabled = true
spark.sql.adaptive.advisoryPartitionSizeInBytes = 64m
spark.sql.adaptive.coalescePartitions.minPartitionSize = 1m
spark.sql.adaptive.coalescePartitions.parallelismFirst = false # honors advisory size
# Dynamic join strategy
spark.sql.adaptive.localShuffleReader.enabled = true # avoid extra shuffle when switching to broadcast
spark.sql.autoBroadcastJoinThreshold = 10m # often raised to 50-100m
spark.sql.adaptive.skewJoin.enabled = true
spark.sql.adaptive.skewJoin.skewedPartitionFactor = 5
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes = 256m Dynamic Partition Coalescing
After a shuffle, the post-shuffle partition count comes from spark.sql.shuffle.partitions (default 200). For small data, that's wasteful; for huge data, it's not enough.
# Without AQE:
# spark.sql.shuffle.partitions = 200
# Stage produces 200 partitions of 5 MB each = 200 tiny tasks
# Per-task scheduling cost ~100ms, so 200 tasks = 20s overhead alone
# With AQE coalescing:
# advisoryPartitionSizeInBytes = 64 MB
# 200 partitions of 5 MB each = 1 GB total
# Coalesced to ~16 partitions of ~64 MB
# Per-task overhead 16 * 100ms = 1.6s
# How it works:
# 1. After shuffle stage finishes, each map task reports the size
# of its output for each reduce partition (MapOutputStatistics)
# 2. AQE iterates over reduce partitions in order, accumulating sizes
# 3. When accumulated size >= advisory size, that span becomes one
# coalesced partition; reset and continue
# 4. The next stage reads each coalesced span as a single task Dynamic Join Strategy
Catalyst chooses sort-merge join when neither side is small enough to broadcast, based on table statistics. After filters and joins, the actual shuffled size is often much smaller.
# Query: SELECT * FROM orders o JOIN customers c
# ON o.cust_id = c.id WHERE c.country = 'DE'
# Catalyst's static plan:
# customers table: 200 GB total -> sort-merge join
# orders table: 500 GB total
# After AQE observes the stage that filters customers:
# customers WHERE country='DE' actually produces 8 MB
# 8 MB < autoBroadcastJoinThreshold (10 MB)
# AQE replans: BroadcastHashJoin instead of SortMergeJoin
# Plan transformation visible in EXPLAIN:
== Final Plan ==
+- AdaptiveSparkPlan isFinalPlan=true
+- AQEShuffleRead (coalesced)
+- BroadcastHashJoin Inner BuildRight
:- ColumnarToRow
: +- FileScan parquet [orders]
+- BroadcastExchange HashedRelationBroadcastMode
+- AQEShuffleRead (local) <-- reads stage1 output without re-shuffle
+- ShuffleExchange ... (was the original sort-merge shuffle) The clever bit: the customers side already shuffled before AQE realized broadcast was better. AQE adds a "local shuffle reader" that reads each map output locally on the executor that holds it, then broadcasts. This avoids redoing the shuffle.
Skew Join Optimization
Skewed keys produce one giant partition while the rest are normal-sized. The skewed task becomes a straggler, and the whole stage waits.
# Detection condition (per shuffle partition i):
size[i] > max(
median(size) * skewedPartitionFactor, # 5x median by default
skewedPartitionThresholdInBytes # 256 MB by default
)
# Example shuffle partition sizes after a join:
# p0: 50 MB, p1: 60 MB, p2: 10 GB (skewed!), p3: 55 MB, p4: 45 MB, ...
# median = 55 MB, threshold = max(55*5, 256) = 275 MB
# p2 (10 GB) exceeds threshold -> skewed
# AQE response:
# Split p2 of the left side into 40 subpartitions (each ~250 MB)
# Replicate p2 of the right side 40 times
# Generate 40 join sub-tasks instead of one task processing 10 GB
# Result: stage time drops from "10 GB at 100 MB/s = 100 sec straggler"
# to "40 parallel tasks of 250 MB each = ~3 sec" Watching AQE in Action
# Spark UI -> SQL tab -> click query
# AQE appears in the plan as "AdaptiveSparkPlan isFinalPlan=true"
# Sub-plans show "InitialPlan" vs "FinalPlan"
# Programmatic: access query execution
val df = spark.sql("SELECT ...")
df.collect()
val plan = df.queryExecution.executedPlan
println(plan.toString)
# Logs (set log level)
spark.conf.set("log4j.logger.org.apache.spark.sql.execution.adaptive", "INFO")
# Sample log line:
# AdaptiveSparkPlanExec: Plan changed from
# SortMergeJoin (cost=...) to BroadcastHashJoin (cost=...)
# Reason: Build side estimated 8.3 MB < threshold 10.0 MB
# Metrics
spark.sparkContext.uiWebUrl # then visit /SQL/, click the query
# Look for "AdaptiveSparkPlan", "AQEShuffleRead (coalesced/local/skewed)" Tradeoffs and When Not to Use
AQE almost always helps
- Multi-stage queries with joins and aggregations
- ETL with skewed keys
- Workloads where statistics drift over time
- Queries where you would otherwise hand-tune partition counts
AQE less useful when
- Queries are simple scans with no shuffle
- You have already hand-tuned every config for a recurring batch
- You use UDFs that hide the shape of intermediate data
- Streaming queries (AQE works only on batch shuffles)
Frequently Asked Questions
Why is AQE necessary if Catalyst already optimizes queries?
Catalyst is a static optimizer — it makes decisions before execution based on table statistics, which are often incomplete or wrong (especially after multiple joins, filters, and aggregations). AQE replans the query at runtime using the actual size of intermediate shuffle outputs. A join estimated to produce 10 GB might actually produce 100 MB; AQE notices this and switches from sort-merge join to broadcast join, saving an entire shuffle stage.
What is a 'shuffle stage' boundary?
Spark's DAG is divided into stages by shuffle boundaries. A shuffle happens when data must be repartitioned across executors — for example, before a wide aggregation or a join on a non-partitioned key. AQE only kicks in at these boundaries, because that is when actual statistics about the shuffle output become available. Within a stage, Spark's whole-stage code generation pipelines operators with no opportunity for AQE to intervene.
What is dynamic partition coalescing?
After a shuffle, Spark may have hundreds or thousands of small partitions. Each partition becomes a task, and tiny tasks have high scheduling overhead. AQE looks at the actual sizes of the shuffle blocks and merges adjacent small partitions into bigger ones, targeting spark.sql.adaptive.advisoryPartitionSizeInBytes (default 64 MB). A query that originally had 2000 post-shuffle partitions might be coalesced to 50, dramatically reducing task overhead.
How does AQE optimize skew joins?
If one shuffle partition is much bigger than the median (typically 5x larger and over 256 MB by default), AQE splits it into multiple smaller subpartitions. The matching partition on the other side of the join is replicated to match each subpartition. So instead of one straggler task processing 10 GB while everything else processes 100 MB, you get 10 parallel tasks each processing 1 GB.
Does AQE work for non-shuffle queries?
Most of AQE's power requires a shuffle to observe statistics. For pure-broadcast or pure-scan queries (no shuffle), AQE has nothing to replan. There is one exception: dynamic partition pruning (DPP) can fire at runtime when a broadcast join's build side reveals which partition values are actually needed, even without an explicit shuffle. AQE and DPP are commonly enabled together.