Spark Internals
Apache Spark started in 2009 as Matei Zaharia's RDD prototype at Berkeley. Sixteen years later it is the de facto distributed compute engine for petabyte-scale analytics, ML training, and lakehouse workloads. The conceptual model is simple — a driver dispatches tasks to executors over a logical-then-physical plan — but the implementation hides a Catalyst optimizer that rivals commercial systems, a Tungsten runtime that generates Java bytecode per query, an adaptive execution layer that rewrites plans mid-flight, and a shuffle subsystem that has been rewritten three times. This hub unpacks each of those pieces.
Grounded in Spark 3.5 / 4.x source (Scala), the original RDD/Spark SQL papers, and Databricks' Tungsten posts.
Cluster Architecture
Key Numbers
Why Spark Exists
DAG Execution
From a DataFrame query to a directed acyclic graph of stages and tasks — how Spark splits, schedules, and runs distributed jobs
Catalyst Optimizer
Parsing, analysis, logical optimization, physical planning, and code generation — the four-phase rewriter that produces JVM bytecode at runtime
Tungsten Execution
Off-heap memory management, whole-stage code generation, and vectorized columnar execution — the bare-metal runtime under DataFrames
Shuffle Internals
Sort-based shuffle, the shuffle service, push-based shuffle (SPIP), and why shuffle is still the most expensive operation in Spark
Adaptive Query Execution
Dynamic partition coalescing, skew handling, and join strategy switching at runtime — what AQE actually does to your query plan
Structured Streaming
Micro-batch and continuous processing, watermarks, state stores, and the unified batch/streaming API on top of Catalyst
Spark on Kubernetes
Driver pods, executor pods, dynamic allocation, and the operational tradeoffs versus YARN and EMR
Spark vs Flink vs Trino
Batch-first vs streaming-first vs interactive-first — the architectural choices that produce three very different engines
Driver vs Executors
A Spark application has exactly one driver process (running your user code, the
SparkContext, Catalyst, and the DAG/TaskScheduler) and one or more executor
processes that run tasks. Executors are sized at launch (cores + memory) and host one or more
task slots = cores assigned to the executor. The driver hands tasks to executors via the
cluster manager (YARN, Kubernetes, Mesos historically, or Standalone), serialises closures and broadcast
data, and collects results.
The driver is a single point of failure for the application — it does not have HA in the OSS distribution
(Databricks adds notebooks-level checkpointing). If the driver dies, the application dies and the cluster
manager may restart it via spark.yarn.maxAppAttempts or equivalent. Executors are stateless
across job boundaries; lost executors trigger task retries and partial recomputation up the lineage graph.
RDD, DataFrame, Dataset
The original Spark API was RDD (Resilient Distributed Dataset): a typed, partitioned, lineage-tracked collection. Operations split into transformations (lazy, return new RDD) and actions (eager, return value to driver). The lineage graph is recomputable, which is how Spark tolerates lost executors.
RDDs were too low-level to optimize. DataFrame (1.3) and Dataset (1.6)
added a logical query plan: operations are not Scala closures over partitions but tree nodes that
Catalyst can rewrite. The DataFrame is untyped (a Dataset[Row]), Dataset is typed at
compile time. Both compile to the same Catalyst plan and run through Tungsten.
Modern Spark code uses DataFrames or spark.sql() almost exclusively. RDDs remain for
situations where Catalyst's relational model doesn't fit (graph algorithms, custom iterative ML, GroupBy
with very wide keys). For everything else, use DataFrames — you pay nothing and gain Catalyst's planning.
Catalyst: Four Phases
Catalyst is the heart of Spark SQL. It compiles a query through four phases.
1. Parsing. SQL or DataFrame DSL is parsed into an unresolved logical plan — a tree of operators with no schema knowledge. Identifiers refer to whatever the user wrote.
2. Analysis. The catalog (Hive metastore, Unity Catalog, in-memory) resolves table/column references, types are inferred and checked. Output: a resolved logical plan.
3. Logical optimization. A rule-based engine applies dozens of rewrites: predicate pushdown, projection pruning, constant folding, common subexpression elimination, join reordering (with cost-based optimization since 2.2), null handling. The output is still a logical plan, just dramatically smaller and cheaper to execute.
4. Physical planning + code generation. Catalyst converts the logical plan into one or more physical plans (alternative join algorithms, alternative aggregation strategies), costs them, picks the best, and finally generates Java bytecode via whole-stage code generation: an entire pipeline of operators is fused into a single function with no virtual calls or boxing.
Tungsten: Off-Heap and Codegen
Project Tungsten was Spark 1.5–2.0's effort to move from Java-object-based execution to bare-metal memory. Three major pieces.
UnsafeRow: a binary record format with fixed offsets, fixed-width nullability bits, and
variable-length data inline. Stored in sun.misc.Unsafe off-heap memory, bypassing the JVM
garbage collector for hot data. Comparison and hashing operate directly on bytes.
Whole-stage codegen: instead of pull-based volcano-style operators (each operator's
next() a virtual call), Catalyst emits a single Java method per pipeline. Filter + project +
hash-aggregate becomes a tight while-loop with inlined branch/copy/hash logic. JIT optimizes it like
hand-written code. The performance gain on simple queries is 5–10×.
Vectorized execution: for columnar sources (Parquet, ORC, Arrow), Spark batches values
into ColumnVector and processes them in columnar form, enabling SIMD and tight CPU loops.
Photon (Databricks proprietary) extends this to a fully vectorized C++ runtime.
Shuffle: The Expensive Part
A shuffle is required whenever data must move between partitions across the network — group-by, join (non-broadcast), repartition, distinct. Each map task partitions its output by the shuffle key and writes one file per reduce partition (sort-based shuffle, default since 1.2). Reducers fetch their assigned files from every map task — N×M network connections.
Spark 3.2 introduced push-based shuffle (SPIP-980): map tasks push partitioned blocks to a small set of merger nodes; reducers fetch fewer, larger blocks. This dramatically reduces small-file pressure on the shuffle service and improves tail latency on large clusters. Production Spark on Kubernetes typically uses an external shuffle service sidecar so executors can be decommissioned without losing shuffle data.
Shuffle skew — one key with vastly more rows than others — is the single most common cause of
"stuck stage" tickets. AQE addresses this dynamically (next section). Manual mitigations: salt the key,
use SKEW hints, or pre-aggregate.
Adaptive Query Execution (AQE)
Catalyst optimizes once at submission time using estimated statistics. AQE (default on since 3.2) re-optimizes between shuffle stages using actual partition sizes from the previous stage. Three main rewrites.
Coalesce shuffle partitions: the default 200 shuffle partitions are often too many for small data. AQE merges adjacent small post-shuffle partitions into single larger ones, reducing scheduling overhead.
Switch join strategies: if the build side of a join turns out smaller than the broadcast threshold post-filter, AQE changes a sort-merge join to a broadcast join — turning an O(N+M) shuffle into a broadcast scan.
Skew join handling: AQE detects partitions that are 5×+ the median and 256 MiB+ in size, then splits the skewed partition into multiple sub-partitions on the read side, broadcasting the matching rows from the other side. Eliminates the skewed-task tail without code changes.
Memory Management
Each executor's heap is partitioned. Storage memory caches RDDs/DataFrames and broadcast variables. Execution memory holds shuffle, join, aggregation, and sort buffers. The unified memory manager (default since 1.6) lets these regions borrow from each other dynamically; only a guaranteed minimum is reserved for storage.
Out-of-memory in execution memory triggers spill-to-disk (e.g., external sort). Storage memory eviction
(LRU) drops cached blocks. Off-heap Tungsten memory is bounded by
spark.memory.offHeap.size and managed by Spark's own allocator, not the JVM heap.
Structured Streaming
Structured Streaming reuses the entire DataFrame/Catalyst stack. A streaming query is a DataFrame query against a logically infinite input table; the engine runs it incrementally on micro-batches (default; the simplest model) or in a continuous-processing mode (millisecond latency, fewer features).
Watermarks bound the lateness Spark will tolerate before closing event-time windows. State stores (RocksDB-backed since 3.2, in-memory + HDFS-checkpointed before) hold keyed state (aggregations, deduplication, joins). Checkpoints to HDFS/S3 enable exactly-once delivery to idempotent sinks.
Compared to Flink: Spark is batch-first and treats streaming as small batches. Flink is streaming-first and treats batch as bounded streams. For micro-batch latencies (1+ seconds) and lakehouse integration, Spark wins on operational simplicity and ecosystem. For millisecond-latency event processing with rich windowing, Flink is the better choice.
Spark on Kubernetes vs YARN
Historically Spark ran on YARN (Hadoop) or Mesos. Today, Spark on Kubernetes is the dominant new-deployment choice. The driver and executors run as pods; the driver acts as a custom K8s operator requesting executor pods through the K8s API. Dynamic allocation works via the executor template + the K8s scheduler.
The tradeoffs versus YARN: K8s gives you uniform infrastructure (one cluster runs Spark, services, and ML), but Spark on K8s has weaker locality awareness, worse shuffle performance without an external shuffle service, and a less mature ecosystem of operators. YARN remains entrenched on existing Hadoop shops; new deployments lean K8s.
Tradeoffs and When Not To Use Spark
Spark's startup cost is real: tens of seconds to minutes for a fresh cluster, hundreds of milliseconds for a warm session. Don't use Spark for sub-second analytical queries — use DuckDB (single-node) or Trino/Presto (interactive, no cluster startup per query). Don't use it for true real-time event processing — use Flink. Don't use it for OLTP — use a real database.
Use Spark for: ETL pipelines (the canonical workload), large-scale ML training (MLlib, plus PyTorch/TF via Spark Connect), batch + micro-batch streaming jobs that share lakehouse table storage, and any workload where data > single-node RAM and you want a single API for data engineering and data science.
Spark vs Other Distributed Engines
| Spark | Flink | Trino / Presto | Dask | |
|---|---|---|---|---|
| Primary use | Batch + micro-batch + ML | Stream-first, batch-second | Interactive SQL on lakehouse | Python-native distributed compute |
| Language | Scala, Java, Python, R, SQL | Java, Scala, Python, SQL | SQL only | Python only |
| Execution model | DAG of stages, micro-batch streaming | True streaming dataflow | MPP query engine | Task graph, similar to Spark RDD |
| Lowest stable latency | ~1 s (micro-batch) | ~10 ms | seconds (interactive) | ~100 ms |
| Optimizer | Catalyst (rule + cost) | Calcite-based | Cost-based, hash-distributed | None (user controls graph) |
| State management | RocksDB / HDFS checkpoints | RocksDB + savepoints | Stateless | None |
| Best fit | Lakehouse ETL + ML | Real-time analytics | Ad-hoc queries | Pandas users on bigger data |
FAQ
Why is my Spark job stuck on one task?
Almost always shuffle skew: one key has many more rows than others, so its reducer task is doing the work of dozens. Enable AQE skew handling (default in 3.2+), or pre-salt the key. The Spark UI's stage page will show the offending task as an outlier in the duration distribution.
What's the right number of shuffle partitions?
Aim for 100–200 MB per partition post-shuffle. Default is 200, which is wrong for both small (you waste scheduling overhead) and large (each partition is bigger than executor memory) datasets. AQE coalesces small ones automatically. For very large jobs, set spark.sql.shuffle.partitions to total-shuffle-bytes / 128 MB.
Should I use DataFrames or Datasets in Scala?
DataFrames. Datasets give you compile-time typing but at the cost of disabling many Catalyst optimizations because typed lambdas are opaque to the optimizer. The cleanest pattern: use SQL or DataFrame DSL for the query, deserialize into a typed case class only at the very end if needed.
What is "broadcast hash join" and when does Spark pick it?
One side of the join is small enough (default ≤10 MB, controlled by spark.sql.autoBroadcastJoinThreshold) to copy in full to every executor, eliminating the shuffle. The other side is scanned once, probing the broadcast hash table. Always cheaper than sort-merge when applicable. AQE can also switch to broadcast at runtime if a filter shrinks the build side enough.
How do I make Spark on K8s as fast as Spark on YARN?
Run an external shuffle service or enable shuffle tracking in dynamic allocation, use local NVMe for shuffle if available, and configure node affinity so executors land on hot data. Photon or RAPIDS Accelerator give bigger speedups than infra tuning. For the closest YARN parity, use Apache YuniKorn as the K8s scheduler — it adds locality awareness Kubernetes' default scheduler lacks.
What does Spark Connect change?
Spark Connect (3.4+) splits the driver into a thin client (your Python/Scala/Go process) and a remote server (the Spark cluster's driver), connected via gRPC. The client never holds the SparkContext — it builds and ships a logical plan. Benefits: independent client lifecycle (your notebook can disconnect/reconnect without killing the job), polyglot clients without JVM in your process, lighter resource footprint. Most modern Databricks usage is Connect under the hood.
Do I need Hadoop to run Spark?
No. Spark depends on the Hadoop client libraries for filesystem abstractions (S3A, GCS, ABFS) but not on a running Hadoop cluster. Modern deployments are typically Spark + S3-compatible object storage + a metastore (Glue, Unity Catalog, Hive metastore as a service). The only reason to run HDFS today is if you have an existing on-prem Hadoop estate.