Tungsten

Off-Heap Binary Format and Whole-Stage Codegen

Tungsten is the execution engine that turned Spark SQL from "Java program that processes Java objects" into "system that pretends to be a database engine written in C." It introduced three critical changes: an off-heap binary row format (UnsafeRow), whole-stage code generation that fuses operators into a single Java method, and vectorized columnar readers for Parquet and ORC.

The result is execution that approaches the performance of native database engines while still running in the JVM. A query that took 30 seconds in Spark 1.4 routinely runs in 3 seconds on Spark 3.5+, with the same data, the same hardware, and the same cluster. Most of that gap is Tungsten.

Architecture

Tungsten Pipeline Catalyst plan SparkPlan tree CollapseCodegenStages finds fuseable spans CodeGenerator emits Java source Janino compiler source -> bytecode Generated method processNext() inner loop Runtime data path Vectorized Reader ColumnVector batch Whole-stage method scan-filter-project-agg UnsafeRow output off-heap binary Shuffle / next stage

Key Numbers

10x
Typical speedup vs. legacy Volcano model
64 KB
JVM method size limit (codegen fallback trigger)
4 KB
Default Tungsten memory page size (off-heap)
8 bytes
UnsafeRow word alignment + bitset header per word
4096
Default vectorized batch size (columnarBatchSize)
5-10x
Vectorized Parquet reader vs row-based
Janino
In-process Java compiler used at runtime

UnsafeRow: The Binary Row Format

# Layout for UnsafeRow with N fields:
[ null_bitmap (ceil(N/8) bytes, padded to 8) ]
[ field 0 fixed-size value or pointer (8 bytes) ]
[ field 1 fixed-size value or pointer (8 bytes) ]
...
[ field N-1 ... ]
[ variable-length data (UTF8 strings, byte arrays) ]

# Field types:
# - long, int, double, etc: stored inline in 8 bytes
# - String: 8 bytes = (offset:32 | length:32) pointing into varlen area
# - Decimal: inline if precision <= 18, otherwise pointer

# Reading a field:
public long getLong(int ordinal) {
  return Platform.getLong(baseObject, baseOffset + 8 + ordinal * 8);
}
# Single memory load — no object dereference, no array bounds check (ordinal is constant)

# Why this is fast:
# - No Java object headers (16 bytes saved per row)
# - No pointer chasing (everything is offset arithmetic)
# - GC ignores off-heap memory entirely
# - Same format on disk, in network, in memory — no marshalling at boundaries

Whole-Stage Code Generation

// Logical plan: SELECT a + 1 FROM t WHERE b > 10
// Without codegen (Volcano iterator model):
//   Project { Filter { Scan } }.next()
//   -> calls Project.next()
//      -> calls Filter.next()
//         -> calls Scan.next()
//            returns row
//         evaluates b > 10, may iterate
//      computes a + 1
//   3 virtual method calls per row, each crossing operator boundaries

// With codegen, all three operators fuse:
public class GeneratedIteratorForCodegenStage1 {
  protected void processNext() throws java.io.IOException {
    while (input.hasNext()) {
      InternalRow scanRow = (InternalRow) input.next();
      // Inlined: Scan
      long b = scanRow.getLong(1);
      // Inlined: Filter b > 10
      if (b > 10L) {
        long a = scanRow.getLong(0);
        // Inlined: Project a + 1
        long result = a + 1L;
        // Write to output buffer
        unsafeRowWriter.write(0, result);
        append(unsafeRowWriter.getRow());
      }
    }
  }
}

// One method, one tight loop, JIT inlines aggressively.
// Visible in EXPLAIN as "* WholeStageCodegen (1)" — the * marks codegened ops.

Vectorized Parquet Reader

// ColumnarBatch: columnar batch of rows
class ColumnarBatch {
  int numRows;
  ColumnVector[] columns;   // one per column in the schema
}

class OnHeapColumnVector implements ColumnVector {
  byte[] nulls;     // 1 byte per row (could be bitmap, but byte[] is faster)
  int[]  intData;   // for int columns
  long[] longData;  // for long columns
  byte[] varData;   // varchar/binary backing
  int[]  varOffsets;
}

// Read a Parquet column chunk:
// 1. Decompress page (LZ4/Snappy)
// 2. Decode encoded values (RLE, delta, dictionary) directly into intData[]
// 3. Apply filters: count nulls, evaluate predicate, mark surviving rows
// 4. Pass batch to next operator

// Why this is so much faster than row-based:
// - One bounds check per batch instead of per row
// - SIMD-friendly tight loops over int[] (auto-vectorized by JIT)
// - Branchless predicate evaluation
// - Cache lines reused across many rows of the same column

spark.sql.parquet.enableVectorizedReader = true   # default
spark.sql.parquet.columnarReaderBatchSize = 4096

Cache-Aware Operators

# Tungsten hash aggregation (UnsafeFixedWidthAggregationMap)
# Key insight: a hash table sized to L2 cache (~256 KB) is much faster
# than a multi-MB hash table that spills to L3 or main memory.

# Two-level structure:
# - In-memory map of UnsafeRow keys -> aggregation buffers (off-heap pages)
# - When map size reaches threshold, sort and spill to disk

# Sort operator (UnsafeExternalSorter)
# Sorts pointers, not full rows:
# long[] pointers : each 8 bytes = (partition_id:24 | offset:40)
# byte[][] pages  : actual UnsafeRow bytes

# Sorting long[] is cache-friendly:
# - Array fits more entries per cache line
# - Comparison is integer compare on the pointer high bits (when possible)
# - No object dereferencing during sort
# - Branch predictor warms up quickly on this kind of code

# Result: TPC-DS sort benchmark improved 5-10x with this approach

EXPLAIN: Reading Codegen Output

scala> spark.sql("SELECT a + 1 FROM t WHERE b > 10").explain()

== Physical Plan ==
*(1) Project [a#10 + 1 AS (a + 1)#15]
+- *(1) Filter (b#11 > 10)
   +- *(1) ColumnarToRow
      +- FileScan parquet [a#10,b#11] ...

# The *(1) prefix means "part of WholeStageCodegen stage 1"
# Three operators are fused into one generated method.

# To see the generated code:
scala> spark.sql("...").queryExecution.debug.codegen()

/* 030 */ protected void processNext() throws java.io.IOException {
/* 031 */   while (...) {
/* 032 */     long scan_value_0 = scan_row.getLong(0);
/* 033 */     long scan_value_1 = scan_row.getLong(1);
/* 034 */     boolean filter_value_0 = scan_value_1 > 10L;
/* 035 */     if (filter_value_0) {
/* 036 */       long project_value_0 = scan_value_0 + 1L;
/* 037 */       ...
/* 038 */     }
/* 039 */   }
/* 040 */ }

When Tungsten Falls Back

# Codegen falls back to interpreted (Volcano) when:
spark.sql.codegen.maxFields = 100        # too many output fields
spark.sql.codegen.fallback = true        # if codegen throws
spark.sql.codegen.methodSplitThreshold   # split if too big

# Common causes:
# - Generated method exceeds 64KB JVM limit (very wide schemas)
# - Catalyst rule produces an expression codegen doesn't yet support
# - Python/Scala UDF is opaque to codegen
# - Some user-defined types not yet supported

# To verify: look for operators in EXPLAIN without the *
== Physical Plan ==
HashAggregate                  <-- not codegened (no asterisk)
+- *(1) Filter ...
+- *(1) Scan ...

# In Spark 3.4+ many more operators codegen reliably; the fallback rate is low.

Tradeoffs and Where Native Engines Go Further

Tungsten gives Spark

  • Native-database-like performance from JVM code
  • Automatic optimization — no per-query tuning
  • Vectorized Parquet/ORC at scale
  • Off-heap memory frees GC from data

Where it stops

  • Still bound by JVM method size and JIT quirks
  • UDFs in Python defeat codegen
  • Vectorization is row-batched, not true SIMD
  • Photon, Comet, Velox — native C++ engines push further

Frequently Asked Questions

What is whole-stage code generation?

Tungsten generates a single Java method per pipeline of operators, fusing them together so each row is processed by direct method calls and field accesses instead of going through Spark's iterator interface. A SCAN -> FILTER -> PROJECT -> AGGREGATE pipeline that would normally be four separate iterators becomes one tight inner loop. Janino compiles the generated source at runtime. The benefit is removing virtual call overhead, enabling JIT inlining, and exposing the data flow to the JIT for better register allocation.

Why is UnsafeRow off-heap?

UnsafeRow stores its bytes outside the Java heap using sun.misc.Unsafe. Three benefits: 1) The garbage collector never scans these bytes, eliminating GC pauses on the data itself. 2) The format is a fixed binary layout, so reading a field is just an offset into the buffer — no Java object headers or pointer chasing. 3) The same buffer can be passed across stages without serialization, since the format is already serializable. The downside is manual memory management; Tungsten uses a memory manager with explicit page allocation.

How does the vectorized Parquet reader work?

Instead of reading one row at a time and constructing UnsafeRows individually, the vectorized reader reads a column chunk into a ColumnVector — a compact array of values plus a null-bitmap — and processes it in bulk. SIMD-friendly operations like 'count nulls' or 'apply filter to integer column' run on native arrays. The vectorized reader is enabled by spark.sql.parquet.enableVectorizedReader=true (default) and provides 5-10x speedup over the legacy reader for typical analytical queries.

Does codegen always help?

Almost always for analytical SQL. It struggles when generated methods exceed the JVM's 64KB method size limit, which can happen with very wide projection lists or many subexpressions. Tungsten falls back to interpreted execution in those cases. UDFs in Python or Scala that wrap arbitrary code defeat codegen — Tungsten cannot inline through them. Both UDF and method-size issues are visible in EXPLAIN output (operators without the asterisk are not codegened).

What does 'cache-aware' mean here?

Cache-aware operators arrange their data accesses to fit in the CPU's L1/L2/L3 caches. Tungsten's hash aggregation, for example, uses a small in-memory hash table sized to fit in L2 (typically 256KB-1MB), spilling to a larger on-disk structure when needed. The sort operator uses a cache-conscious in-place sort on long[] pointer arrays rather than sorting heavyweight Java objects. These choices can give 2-5x speedup over the same algorithm without cache awareness.