Catalyst Optimizer

Trees, Rules, and the Path From SQL to Physical Plan

Catalyst is the optimizer behind Spark SQL and DataFrames. It is built on a small set of abstractions — TreeNode, Rule, Expression — and a four-phase pipeline: parsing, analysis, logical optimization, and physical planning. The whole optimizer is written in Scala with pattern matching, which is why adding a new rule is usually a few dozen lines.

The design's lasting impact comes from the choice to make plans immutable trees and rules pure functions. Optimization is just function composition: take a plan, apply rules until fixpoint, repeat for the next batch. This makes Catalyst extensible — Photon, Iceberg, Delta Lake, and many third-party engines plug their own rules in without forking Spark.

The Four Phases

1. Parse SQL string → Unresolved LogicalPlan 2. Analyze resolve refs & types → Resolved LogicalPlan 3. Optimize apply rules to fix → Optimized LogicalPlan 4. Plan strategies pick algos → SparkPlan (physical) CodeGen Tungsten whole-stage Java Optimization Batches (executed in order, each to fixpoint) Constant Folding · Column Pruning · Predicate Pushdown · Join Reordering Subquery Elimination · Decimal Optimizations · Boolean Simplification Limit Pushdown · Combine Filters · Push Project Through Joins Each rule pattern-matches part of the plan and returns a rewritten subtree.

Key Numbers

~150
Logical optimization rules in vanilla Spark
4
Major phases (parse, analyze, optimize, plan)
100
spark.sql.optimizer.maxIterations default
12
join.reorder threshold (CBO kicks in below)
10 MB
autoBroadcastJoinThreshold default
2 phase
Code generation: source -> bytecode
Janino
In-process Java compiler used by Tungsten

TreeNode and Rule

// Catalyst's core abstractions (simplified)
abstract class TreeNode[BaseType <: TreeNode[BaseType]] {
  def children: Seq[BaseType]
  def transform(rule: PartialFunction[BaseType,BaseType]): BaseType =
    rule.applyOrElse(this, identity).mapChildren(_.transform(rule))
}

abstract class LogicalPlan extends TreeNode[LogicalPlan]
abstract class Expression  extends TreeNode[Expression]

// A rule is a function LogicalPlan => LogicalPlan
abstract class Rule[T <: TreeNode[_]] {
  def apply(plan: T): T
}

// Example rule: collapse Project(Project(x, p1), p2) into Project(x, p2 o p1)
object CollapseProject extends Rule[LogicalPlan] {
  def apply(plan: LogicalPlan): LogicalPlan = plan.transform {
    case Project(p2, Project(p1, child)) =>
      Project(buildCleanedProjectList(p2, p1), child)
  }
}

Common Optimization Rules

RulePatternRewrite
PushDownPredicateFilter(Join(L, R))Join(Filter(L), Filter(R)) — push each predicate to the side it references
ColumnPruningProject(child) reads N cols, downstream uses M<NPush Project closer to source so unused columns never read
ConstantFoldingAdd(Literal(2), Literal(3))Literal(5) — evaluate at plan time
BooleanSimplificationAnd(true, X), Or(false, X)X
LimitPushDownLimit(Union(L, R))Limit(Union(Limit(L), Limit(R))) — fetch fewer rows from each branch
CombineFiltersFilter(p2, Filter(p1, child))Filter(p1 AND p2, child)
EliminateOuterJoinOuterJoin where filter eliminates nullsInnerJoin (allows more aggressive optimization)
ReorderJoinInner-join chain of N tablesDP-based reorder for N≤12, greedy for larger

Cost-Based Optimization (CBO)

# CBO is opt-in (off by default)
spark.sql.cbo.enabled = true
spark.sql.cbo.joinReorder.enabled = true
spark.sql.cbo.joinReorder.dp.threshold = 12

# Compute statistics first
ANALYZE TABLE orders COMPUTE STATISTICS;
ANALYZE TABLE orders COMPUTE STATISTICS FOR COLUMNS cust_id, total, status;

# What CBO uses:
# - Table size (rowCount, sizeInBytes)
# - Per-column: min, max, num distinct, num nulls, avg/max length
# - Histograms for non-uniform distributions

# Without CBO, Catalyst uses heuristic estimates
# (often "size = sum of input sizes after filter selectivity 0.5")

In practice, CBO alone has limited adoption because computing column stats is expensive and the statistics drift. AQE largely supplanted it for runtime decisions, while CBO remains useful for large multi-way join reordering at compile time when stats are reliable.

Physical Planning Strategies

# A Strategy turns a LogicalPlan into one or more SparkPlans.
# SparkPlanner tries each strategy and picks the best.

abstract class Strategy {
  def apply(plan: LogicalPlan): Seq[SparkPlan]
}

# Built-in strategies for joins:
JoinSelection extends Strategy {
  // 1. Broadcast hash join if one side <= autoBroadcastJoinThreshold
  // 2. Shuffle hash join if one side small enough to fit in executor
  //    (controlled by spark.sql.join.preferSortMergeJoin = false)
  // 3. Sort-merge join (default for big-vs-big)
  // 4. Broadcast nested loop / cartesian for non-equi
}

# Other strategies:
# - Aggregation: hash-based vs sort-based
# - Window: per-partition window or global
# - DataSourceV2: file scan with partition pruning

Inspecting the Plan

# Get the plan at each stage
val df = spark.sql("SELECT c.country, SUM(o.total) FROM orders o " +
                   "JOIN customers c ON o.cust_id=c.id WHERE c.region='EU' " +
                   "GROUP BY c.country")

df.queryExecution.parsed         // Unresolved
df.queryExecution.analyzed       // Resolved (column types known)
df.queryExecution.optimizedPlan  // After Catalyst rules
df.queryExecution.executedPlan   // Physical plan with strategies
df.queryExecution.debug.codegen  // Generated Java source

# EXPLAIN modes
df.explain(false)         // simple
df.explain(true)          // all four phases
df.explain("formatted")   // Spark 3+ — most readable
df.explain("cost")        // includes CBO row counts (if computed)

# Sample formatted output
== Physical Plan ==
* HashAggregate (5)
+- Exchange (4)              <-- shuffle
   +- * HashAggregate (3)
      +- * SortMergeJoin Inner (2)
         :- * Filter (1)
            +- * Scan parquet [orders]
         +- * Filter
            +- * Scan parquet [customers]

Extending Catalyst

# Add a custom rule via SparkSessionExtensions
class MyOptimizationRule extends Rule[LogicalPlan] {
  def apply(plan: LogicalPlan): LogicalPlan = plan.transform {
    case Filter(EqualTo(AttributeReference("status",_,_,_), Literal("X",_)), child) =>
      // Skip the filter entirely if we know our table never has status="X"
      child
  }
}

class MyExtensions extends (SparkSessionExtensions => Unit) {
  def apply(ext: SparkSessionExtensions): Unit = {
    ext.injectOptimizerRule(_ => new MyOptimizationRule)
  }
}

# Register in spark-defaults.conf
spark.sql.extensions = com.example.MyExtensions

# Catalysts used by big projects:
# - Iceberg adds rules for incremental scans and metadata pruning
# - Delta Lake adds time-travel and merge-into rewriting
# - Photon (Databricks) replaces parts of physical planning

Tradeoffs and Limits

Catalyst's strengths

  • Pluggable, well-defined extension points
  • Functional design — rules are easy to write and test
  • ~150 rules cover most analytical patterns
  • Combined with Tungsten, the generated code is hand-tuned-tight

Where it falls short

  • Static — needs AQE for accurate runtime decisions
  • UDFs are opaque (cannot push down through them)
  • No vectorization for some operators (filled by Photon, Comet)
  • Cost model is heuristic without computed statistics

Frequently Asked Questions

What is the difference between Catalyst and AQE?

Catalyst is the static (compile-time) optimizer. It takes parsed SQL or DataFrame operations and produces a physical execution plan using a fixed set of rewrite rules and cost-based decisions. AQE (Adaptive Query Execution) is a runtime layer that re-plans the query at shuffle boundaries using actual data sizes. Catalyst runs once before execution; AQE runs continuously during execution and modifies the plan.

What is a TreeNode in Catalyst?

TreeNode is the abstract base class for everything in a Catalyst plan: logical operators (LogicalPlan), physical operators (SparkPlan), and expressions (Expression). It represents an immutable tree where each node knows its children. All transformations are pure functions that produce new trees. This functional design is why optimization passes can be written as simple pattern-matching rules.

Are optimization rules ordered?

Yes, very deliberately. The optimizer runs batches of rules until a fixed point (no more changes) or a max iteration count. The order matters: column pruning before predicate pushdown, predicate pushdown before join reordering, constant folding before everything. Each rule is local — it pattern-matches a specific plan shape and rewrites it. The optimizer is essentially a list of (BatchName, Strategy, Rule[]) triples.

How does Catalyst know which join algorithm to pick?

Three signals: 1) Hint (BROADCAST, MERGE, SHUFFLE_HASH) — explicit user choice always wins. 2) Statistics — if one side is smaller than spark.sql.autoBroadcastJoinThreshold (default 10 MB), broadcast hash join. If both sides are sortable, sort-merge join. 3) Special cases — broadcast nested loop for non-equi joins, cartesian for cross joins. Catalyst also has heuristics for shuffle-hash vs sort-merge based on table sizes.

Can I add my own optimization rules?

Yes. SparkSessionExtensions allows you to register custom rules via injectOptimizerRule, injectPlannerStrategy, etc. The rule must be a Rule[LogicalPlan] (or RuleExecutor for batches). This is how vendors like Databricks ship Photon, and how libraries like Iceberg add catalog-specific optimizations. Custom rules see the same TreeNode API as builtin rules.