Spark Internals

Apache Spark started in 2009 as Matei Zaharia's RDD prototype at Berkeley. Sixteen years later it is the de facto distributed compute engine for petabyte-scale analytics, ML training, and lakehouse workloads. The conceptual model is simple — a driver dispatches tasks to executors over a logical-then-physical plan — but the implementation hides a Catalyst optimizer that rivals commercial systems, a Tungsten runtime that generates Java bytecode per query, an adaptive execution layer that rewrites plans mid-flight, and a shuffle subsystem that has been rewritten three times. This hub unpacks each of those pieces.

Grounded in Spark 3.5 / 4.x source (Scala), the original RDD/Spark SQL papers, and Databricks' Tungsten posts.

Cluster Architecture

Driver Process SparkContext / SparkSession Catalyst (parse to optimize) DAGScheduler (stages) TaskScheduler (locality) Cluster Manager YARN / K8s / Standalone resource allocation Executor 1 JVM, N task slots Task Task Executor 2 JVM, N task slots Task Task Executor 3 JVM, N task slots Task Task Executor 4 JVM, N task slots Task Task Distributed Storage S3 / HDFS / GCS / Delta / Iceberg / Hudi · shuffle data, broadcast vars, checkpoint state

Key Numbers

Default partitions (shuffle)
200
Default broadcast threshold
10 MB
Default storage memory
60% of heap
Catalyst phases
4
Whole-stage codegen
since 2.0
Off-heap (Tungsten)
Java Unsafe
AQE
on by default 3.2+

Why Spark Exists

The Gap
Hadoop MapReduce wrote intermediate data to disk between every stage. Iterative algorithms (ML, graph, interactive analytics) hit the disk dozens of times. The model also forced you to write low-level map/reduce code — no SQL, no streaming, no generic lineage.
The Insight
If you keep intermediate data in memory and represent it as immutable, lineage-tracked partitions (RDDs), you can recompute lost partitions on failure without checkpointing every stage. Build SQL, streaming, ML, and graph APIs as libraries on top, and the unified runtime amortises across workloads.
The Result
Spark replaced MapReduce. The DataFrame/Catalyst model became the standard interface. Databricks built a $40B company around it. Today, almost every lakehouse pipeline (Delta, Iceberg, Hudi) starts and ends with Spark jobs.
✦ Live

DAG Execution

From a DataFrame query to a directed acyclic graph of stages and tasks — how Spark splits, schedules, and runs distributed jobs

Coming soon

Catalyst Optimizer

Parsing, analysis, logical optimization, physical planning, and code generation — the four-phase rewriter that produces JVM bytecode at runtime

Coming soon

Tungsten Execution

Off-heap memory management, whole-stage code generation, and vectorized columnar execution — the bare-metal runtime under DataFrames

Coming soon

Shuffle Internals

Sort-based shuffle, the shuffle service, push-based shuffle (SPIP), and why shuffle is still the most expensive operation in Spark

Coming soon

Adaptive Query Execution

Dynamic partition coalescing, skew handling, and join strategy switching at runtime — what AQE actually does to your query plan

Coming soon

Structured Streaming

Micro-batch and continuous processing, watermarks, state stores, and the unified batch/streaming API on top of Catalyst

Coming soon

Spark on Kubernetes

Driver pods, executor pods, dynamic allocation, and the operational tradeoffs versus YARN and EMR

Coming soon

Spark vs Flink vs Trino

Batch-first vs streaming-first vs interactive-first — the architectural choices that produce three very different engines

Driver vs Executors

A Spark application has exactly one driver process (running your user code, the SparkContext, Catalyst, and the DAG/TaskScheduler) and one or more executor processes that run tasks. Executors are sized at launch (cores + memory) and host one or more task slots = cores assigned to the executor. The driver hands tasks to executors via the cluster manager (YARN, Kubernetes, Mesos historically, or Standalone), serialises closures and broadcast data, and collects results.

The driver is a single point of failure for the application — it does not have HA in the OSS distribution (Databricks adds notebooks-level checkpointing). If the driver dies, the application dies and the cluster manager may restart it via spark.yarn.maxAppAttempts or equivalent. Executors are stateless across job boundaries; lost executors trigger task retries and partial recomputation up the lineage graph.

RDD, DataFrame, Dataset

The original Spark API was RDD (Resilient Distributed Dataset): a typed, partitioned, lineage-tracked collection. Operations split into transformations (lazy, return new RDD) and actions (eager, return value to driver). The lineage graph is recomputable, which is how Spark tolerates lost executors.

RDDs were too low-level to optimize. DataFrame (1.3) and Dataset (1.6) added a logical query plan: operations are not Scala closures over partitions but tree nodes that Catalyst can rewrite. The DataFrame is untyped (a Dataset[Row]), Dataset is typed at compile time. Both compile to the same Catalyst plan and run through Tungsten.

Modern Spark code uses DataFrames or spark.sql() almost exclusively. RDDs remain for situations where Catalyst's relational model doesn't fit (graph algorithms, custom iterative ML, GroupBy with very wide keys). For everything else, use DataFrames — you pay nothing and gain Catalyst's planning.

Catalyst: Four Phases

Catalyst is the heart of Spark SQL. It compiles a query through four phases.

1. Parsing. SQL or DataFrame DSL is parsed into an unresolved logical plan — a tree of operators with no schema knowledge. Identifiers refer to whatever the user wrote.

2. Analysis. The catalog (Hive metastore, Unity Catalog, in-memory) resolves table/column references, types are inferred and checked. Output: a resolved logical plan.

3. Logical optimization. A rule-based engine applies dozens of rewrites: predicate pushdown, projection pruning, constant folding, common subexpression elimination, join reordering (with cost-based optimization since 2.2), null handling. The output is still a logical plan, just dramatically smaller and cheaper to execute.

4. Physical planning + code generation. Catalyst converts the logical plan into one or more physical plans (alternative join algorithms, alternative aggregation strategies), costs them, picks the best, and finally generates Java bytecode via whole-stage code generation: an entire pipeline of operators is fused into a single function with no virtual calls or boxing.

Tungsten: Off-Heap and Codegen

Project Tungsten was Spark 1.5–2.0's effort to move from Java-object-based execution to bare-metal memory. Three major pieces.

UnsafeRow: a binary record format with fixed offsets, fixed-width nullability bits, and variable-length data inline. Stored in sun.misc.Unsafe off-heap memory, bypassing the JVM garbage collector for hot data. Comparison and hashing operate directly on bytes.

Whole-stage codegen: instead of pull-based volcano-style operators (each operator's next() a virtual call), Catalyst emits a single Java method per pipeline. Filter + project + hash-aggregate becomes a tight while-loop with inlined branch/copy/hash logic. JIT optimizes it like hand-written code. The performance gain on simple queries is 5–10×.

Vectorized execution: for columnar sources (Parquet, ORC, Arrow), Spark batches values into ColumnVector and processes them in columnar form, enabling SIMD and tight CPU loops. Photon (Databricks proprietary) extends this to a fully vectorized C++ runtime.

Shuffle: The Expensive Part

A shuffle is required whenever data must move between partitions across the network — group-by, join (non-broadcast), repartition, distinct. Each map task partitions its output by the shuffle key and writes one file per reduce partition (sort-based shuffle, default since 1.2). Reducers fetch their assigned files from every map task — N×M network connections.

Spark 3.2 introduced push-based shuffle (SPIP-980): map tasks push partitioned blocks to a small set of merger nodes; reducers fetch fewer, larger blocks. This dramatically reduces small-file pressure on the shuffle service and improves tail latency on large clusters. Production Spark on Kubernetes typically uses an external shuffle service sidecar so executors can be decommissioned without losing shuffle data.

Shuffle skew — one key with vastly more rows than others — is the single most common cause of "stuck stage" tickets. AQE addresses this dynamically (next section). Manual mitigations: salt the key, use SKEW hints, or pre-aggregate.

Adaptive Query Execution (AQE)

Catalyst optimizes once at submission time using estimated statistics. AQE (default on since 3.2) re-optimizes between shuffle stages using actual partition sizes from the previous stage. Three main rewrites.

Coalesce shuffle partitions: the default 200 shuffle partitions are often too many for small data. AQE merges adjacent small post-shuffle partitions into single larger ones, reducing scheduling overhead.

Switch join strategies: if the build side of a join turns out smaller than the broadcast threshold post-filter, AQE changes a sort-merge join to a broadcast join — turning an O(N+M) shuffle into a broadcast scan.

Skew join handling: AQE detects partitions that are 5×+ the median and 256 MiB+ in size, then splits the skewed partition into multiple sub-partitions on the read side, broadcasting the matching rows from the other side. Eliminates the skewed-task tail without code changes.

Memory Management

Each executor's heap is partitioned. Storage memory caches RDDs/DataFrames and broadcast variables. Execution memory holds shuffle, join, aggregation, and sort buffers. The unified memory manager (default since 1.6) lets these regions borrow from each other dynamically; only a guaranteed minimum is reserved for storage.

Out-of-memory in execution memory triggers spill-to-disk (e.g., external sort). Storage memory eviction (LRU) drops cached blocks. Off-heap Tungsten memory is bounded by spark.memory.offHeap.size and managed by Spark's own allocator, not the JVM heap.

Structured Streaming

Structured Streaming reuses the entire DataFrame/Catalyst stack. A streaming query is a DataFrame query against a logically infinite input table; the engine runs it incrementally on micro-batches (default; the simplest model) or in a continuous-processing mode (millisecond latency, fewer features).

Watermarks bound the lateness Spark will tolerate before closing event-time windows. State stores (RocksDB-backed since 3.2, in-memory + HDFS-checkpointed before) hold keyed state (aggregations, deduplication, joins). Checkpoints to HDFS/S3 enable exactly-once delivery to idempotent sinks.

Compared to Flink: Spark is batch-first and treats streaming as small batches. Flink is streaming-first and treats batch as bounded streams. For micro-batch latencies (1+ seconds) and lakehouse integration, Spark wins on operational simplicity and ecosystem. For millisecond-latency event processing with rich windowing, Flink is the better choice.

Spark on Kubernetes vs YARN

Historically Spark ran on YARN (Hadoop) or Mesos. Today, Spark on Kubernetes is the dominant new-deployment choice. The driver and executors run as pods; the driver acts as a custom K8s operator requesting executor pods through the K8s API. Dynamic allocation works via the executor template + the K8s scheduler.

The tradeoffs versus YARN: K8s gives you uniform infrastructure (one cluster runs Spark, services, and ML), but Spark on K8s has weaker locality awareness, worse shuffle performance without an external shuffle service, and a less mature ecosystem of operators. YARN remains entrenched on existing Hadoop shops; new deployments lean K8s.

Tradeoffs and When Not To Use Spark

Spark's startup cost is real: tens of seconds to minutes for a fresh cluster, hundreds of milliseconds for a warm session. Don't use Spark for sub-second analytical queries — use DuckDB (single-node) or Trino/Presto (interactive, no cluster startup per query). Don't use it for true real-time event processing — use Flink. Don't use it for OLTP — use a real database.

Use Spark for: ETL pipelines (the canonical workload), large-scale ML training (MLlib, plus PyTorch/TF via Spark Connect), batch + micro-batch streaming jobs that share lakehouse table storage, and any workload where data > single-node RAM and you want a single API for data engineering and data science.

Spark vs Other Distributed Engines

SparkFlinkTrino / PrestoDask
Primary useBatch + micro-batch + MLStream-first, batch-secondInteractive SQL on lakehousePython-native distributed compute
LanguageScala, Java, Python, R, SQLJava, Scala, Python, SQLSQL onlyPython only
Execution modelDAG of stages, micro-batch streamingTrue streaming dataflowMPP query engineTask graph, similar to Spark RDD
Lowest stable latency~1 s (micro-batch)~10 msseconds (interactive)~100 ms
OptimizerCatalyst (rule + cost)Calcite-basedCost-based, hash-distributedNone (user controls graph)
State managementRocksDB / HDFS checkpointsRocksDB + savepointsStatelessNone
Best fitLakehouse ETL + MLReal-time analyticsAd-hoc queriesPandas users on bigger data

FAQ

Why is my Spark job stuck on one task?

Almost always shuffle skew: one key has many more rows than others, so its reducer task is doing the work of dozens. Enable AQE skew handling (default in 3.2+), or pre-salt the key. The Spark UI's stage page will show the offending task as an outlier in the duration distribution.

What's the right number of shuffle partitions?

Aim for 100–200 MB per partition post-shuffle. Default is 200, which is wrong for both small (you waste scheduling overhead) and large (each partition is bigger than executor memory) datasets. AQE coalesces small ones automatically. For very large jobs, set spark.sql.shuffle.partitions to total-shuffle-bytes / 128 MB.

Should I use DataFrames or Datasets in Scala?

DataFrames. Datasets give you compile-time typing but at the cost of disabling many Catalyst optimizations because typed lambdas are opaque to the optimizer. The cleanest pattern: use SQL or DataFrame DSL for the query, deserialize into a typed case class only at the very end if needed.

What is "broadcast hash join" and when does Spark pick it?

One side of the join is small enough (default ≤10 MB, controlled by spark.sql.autoBroadcastJoinThreshold) to copy in full to every executor, eliminating the shuffle. The other side is scanned once, probing the broadcast hash table. Always cheaper than sort-merge when applicable. AQE can also switch to broadcast at runtime if a filter shrinks the build side enough.

How do I make Spark on K8s as fast as Spark on YARN?

Run an external shuffle service or enable shuffle tracking in dynamic allocation, use local NVMe for shuffle if available, and configure node affinity so executors land on hot data. Photon or RAPIDS Accelerator give bigger speedups than infra tuning. For the closest YARN parity, use Apache YuniKorn as the K8s scheduler — it adds locality awareness Kubernetes' default scheduler lacks.

What does Spark Connect change?

Spark Connect (3.4+) splits the driver into a thin client (your Python/Scala/Go process) and a remote server (the Spark cluster's driver), connected via gRPC. The client never holds the SparkContext — it builds and ships a logical plan. Benefits: independent client lifecycle (your notebook can disconnect/reconnect without killing the job), polyglot clients without JVM in your process, lighter resource footprint. Most modern Databricks usage is Connect under the hood.

Do I need Hadoop to run Spark?

No. Spark depends on the Hadoop client libraries for filesystem abstractions (S3A, GCS, ABFS) but not on a running Hadoop cluster. Modern deployments are typically Spark + S3-compatible object storage + a metastore (Glue, Unity Catalog, Hive metastore as a service). The only reason to run HDFS today is if you have an existing on-prem Hadoop estate.