Catalyst Optimizer
Trees, Rules, and the Path From SQL to Physical Plan
Catalyst is the optimizer behind Spark SQL and DataFrames. It is built on a small set of abstractions
— TreeNode, Rule, Expression — and a four-phase pipeline:
parsing, analysis, logical optimization, and physical planning. The whole optimizer is written in
Scala with pattern matching, which is why adding a new rule is usually a few dozen lines.
The design's lasting impact comes from the choice to make plans immutable trees and rules pure functions. Optimization is just function composition: take a plan, apply rules until fixpoint, repeat for the next batch. This makes Catalyst extensible — Photon, Iceberg, Delta Lake, and many third-party engines plug their own rules in without forking Spark.
The Four Phases
Key Numbers
TreeNode and Rule
// Catalyst's core abstractions (simplified)
abstract class TreeNode[BaseType <: TreeNode[BaseType]] {
def children: Seq[BaseType]
def transform(rule: PartialFunction[BaseType,BaseType]): BaseType =
rule.applyOrElse(this, identity).mapChildren(_.transform(rule))
}
abstract class LogicalPlan extends TreeNode[LogicalPlan]
abstract class Expression extends TreeNode[Expression]
// A rule is a function LogicalPlan => LogicalPlan
abstract class Rule[T <: TreeNode[_]] {
def apply(plan: T): T
}
// Example rule: collapse Project(Project(x, p1), p2) into Project(x, p2 o p1)
object CollapseProject extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan.transform {
case Project(p2, Project(p1, child)) =>
Project(buildCleanedProjectList(p2, p1), child)
}
} Common Optimization Rules
| Rule | Pattern | Rewrite |
|---|---|---|
| PushDownPredicate | Filter(Join(L, R)) | Join(Filter(L), Filter(R)) — push each predicate to the side it references |
| ColumnPruning | Project(child) reads N cols, downstream uses M<N | Push Project closer to source so unused columns never read |
| ConstantFolding | Add(Literal(2), Literal(3)) | Literal(5) — evaluate at plan time |
| BooleanSimplification | And(true, X), Or(false, X) | X |
| LimitPushDown | Limit(Union(L, R)) | Limit(Union(Limit(L), Limit(R))) — fetch fewer rows from each branch |
| CombineFilters | Filter(p2, Filter(p1, child)) | Filter(p1 AND p2, child) |
| EliminateOuterJoin | OuterJoin where filter eliminates nulls | InnerJoin (allows more aggressive optimization) |
| ReorderJoin | Inner-join chain of N tables | DP-based reorder for N≤12, greedy for larger |
Cost-Based Optimization (CBO)
# CBO is opt-in (off by default)
spark.sql.cbo.enabled = true
spark.sql.cbo.joinReorder.enabled = true
spark.sql.cbo.joinReorder.dp.threshold = 12
# Compute statistics first
ANALYZE TABLE orders COMPUTE STATISTICS;
ANALYZE TABLE orders COMPUTE STATISTICS FOR COLUMNS cust_id, total, status;
# What CBO uses:
# - Table size (rowCount, sizeInBytes)
# - Per-column: min, max, num distinct, num nulls, avg/max length
# - Histograms for non-uniform distributions
# Without CBO, Catalyst uses heuristic estimates
# (often "size = sum of input sizes after filter selectivity 0.5") In practice, CBO alone has limited adoption because computing column stats is expensive and the statistics drift. AQE largely supplanted it for runtime decisions, while CBO remains useful for large multi-way join reordering at compile time when stats are reliable.
Physical Planning Strategies
# A Strategy turns a LogicalPlan into one or more SparkPlans.
# SparkPlanner tries each strategy and picks the best.
abstract class Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan]
}
# Built-in strategies for joins:
JoinSelection extends Strategy {
// 1. Broadcast hash join if one side <= autoBroadcastJoinThreshold
// 2. Shuffle hash join if one side small enough to fit in executor
// (controlled by spark.sql.join.preferSortMergeJoin = false)
// 3. Sort-merge join (default for big-vs-big)
// 4. Broadcast nested loop / cartesian for non-equi
}
# Other strategies:
# - Aggregation: hash-based vs sort-based
# - Window: per-partition window or global
# - DataSourceV2: file scan with partition pruning Inspecting the Plan
# Get the plan at each stage
val df = spark.sql("SELECT c.country, SUM(o.total) FROM orders o " +
"JOIN customers c ON o.cust_id=c.id WHERE c.region='EU' " +
"GROUP BY c.country")
df.queryExecution.parsed // Unresolved
df.queryExecution.analyzed // Resolved (column types known)
df.queryExecution.optimizedPlan // After Catalyst rules
df.queryExecution.executedPlan // Physical plan with strategies
df.queryExecution.debug.codegen // Generated Java source
# EXPLAIN modes
df.explain(false) // simple
df.explain(true) // all four phases
df.explain("formatted") // Spark 3+ — most readable
df.explain("cost") // includes CBO row counts (if computed)
# Sample formatted output
== Physical Plan ==
* HashAggregate (5)
+- Exchange (4) <-- shuffle
+- * HashAggregate (3)
+- * SortMergeJoin Inner (2)
:- * Filter (1)
+- * Scan parquet [orders]
+- * Filter
+- * Scan parquet [customers] Extending Catalyst
# Add a custom rule via SparkSessionExtensions
class MyOptimizationRule extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan.transform {
case Filter(EqualTo(AttributeReference("status",_,_,_), Literal("X",_)), child) =>
// Skip the filter entirely if we know our table never has status="X"
child
}
}
class MyExtensions extends (SparkSessionExtensions => Unit) {
def apply(ext: SparkSessionExtensions): Unit = {
ext.injectOptimizerRule(_ => new MyOptimizationRule)
}
}
# Register in spark-defaults.conf
spark.sql.extensions = com.example.MyExtensions
# Catalysts used by big projects:
# - Iceberg adds rules for incremental scans and metadata pruning
# - Delta Lake adds time-travel and merge-into rewriting
# - Photon (Databricks) replaces parts of physical planning Tradeoffs and Limits
Catalyst's strengths
- Pluggable, well-defined extension points
- Functional design — rules are easy to write and test
- ~150 rules cover most analytical patterns
- Combined with Tungsten, the generated code is hand-tuned-tight
Where it falls short
- Static — needs AQE for accurate runtime decisions
- UDFs are opaque (cannot push down through them)
- No vectorization for some operators (filled by Photon, Comet)
- Cost model is heuristic without computed statistics
Frequently Asked Questions
What is the difference between Catalyst and AQE?
Catalyst is the static (compile-time) optimizer. It takes parsed SQL or DataFrame operations and produces a physical execution plan using a fixed set of rewrite rules and cost-based decisions. AQE (Adaptive Query Execution) is a runtime layer that re-plans the query at shuffle boundaries using actual data sizes. Catalyst runs once before execution; AQE runs continuously during execution and modifies the plan.
What is a TreeNode in Catalyst?
TreeNode is the abstract base class for everything in a Catalyst plan: logical operators (LogicalPlan), physical operators (SparkPlan), and expressions (Expression). It represents an immutable tree where each node knows its children. All transformations are pure functions that produce new trees. This functional design is why optimization passes can be written as simple pattern-matching rules.
Are optimization rules ordered?
Yes, very deliberately. The optimizer runs batches of rules until a fixed point (no more changes) or a max iteration count. The order matters: column pruning before predicate pushdown, predicate pushdown before join reordering, constant folding before everything. Each rule is local — it pattern-matches a specific plan shape and rewrites it. The optimizer is essentially a list of (BatchName, Strategy, Rule[]) triples.
How does Catalyst know which join algorithm to pick?
Three signals: 1) Hint (BROADCAST, MERGE, SHUFFLE_HASH) — explicit user choice always wins. 2) Statistics — if one side is smaller than spark.sql.autoBroadcastJoinThreshold (default 10 MB), broadcast hash join. If both sides are sortable, sort-merge join. 3) Special cases — broadcast nested loop for non-equi joins, cartesian for cross joins. Catalyst also has heuristics for shuffle-hash vs sort-merge based on table sizes.
Can I add my own optimization rules?
Yes. SparkSessionExtensions allows you to register custom rules via injectOptimizerRule, injectPlannerStrategy, etc. The rule must be a Rule[LogicalPlan] (or RuleExecutor for batches). This is how vendors like Databricks ship Photon, and how libraries like Iceberg add catalog-specific optimizations. Custom rules see the same TreeNode API as builtin rules.