Structured Streaming

DataFrames Over Unbounded Tables, With Watermarks

Structured Streaming reframes streaming as a DataFrame query against an unbounded input table. You write the same SQL or DataFrame code you would use for batch, set a trigger interval, and Spark incrementally computes the result. Under the hood it is a series of small Catalyst-optimized batch jobs, each operating on the new rows since the last micro-batch.

The big architectural choice is event-time semantics with watermarks. Stateful aggregations remember per-key state in a checkpointed state store; the watermark moves forward as event-time progresses, and old state is garbage-collected. This makes long-running streaming queries operationally tractable.

The Unbounded Table Model

Streaming = appending rows to an infinite table Input Table (unbounded) batch 1: 100 rows arrived batch 2: 80 rows arrived batch 3: 110 rows arrived batch N: not yet arrived Streaming Query SELECT user_id, window(event_time, '5 min'), SUM(amount) FROM input GROUP BY user_id, window State Store running aggregations per (user_id, window) Watermark event_time - 10 min Result Table user A 09:00 = 230 user B 09:00 = 80 user A 09:05 = 320

Key Numbers

2.0
Spark version where Structured Streaming launched
100ms+
Typical micro-batch latency
~1ms
Continuous trigger latency (experimental)
3.2
Spark version with RocksDB state store backend
3
Output modes: append, update, complete
2 min
Common watermark allowedLateness
unbounded
Conceptual size of input table

Basic Streaming Query

// Read from Kafka
val events = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "kafka-1:9092")
  .option("subscribe", "events")
  .option("startingOffsets", "latest")
  .load()
  .selectExpr("CAST(value AS STRING) as json", "timestamp")
  .select(from_json($"json", schema).as("data"), $"timestamp")
  .select("data.*", "timestamp")

// Windowed aggregation with event-time watermark
val perMinute = events
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window($"event_time", "1 minute"),
    $"user_id"
  )
  .agg(
    sum($"amount").as("total"),
    count("*").as("events")
  )

// Write to Kafka with exactly-once
val query = perMinute.writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "kafka-1:9092")
  .option("topic", "user-totals")
  .option("checkpointLocation", "s3://chk/user-totals/")
  .outputMode("update")
  .trigger(Trigger.ProcessingTime("1 minute"))
  .start()

Triggers

// Default: continuous micro-batches as fast as possible
.trigger(Trigger.ProcessingTime(0))   // ASAP

// Fixed processing-time interval
.trigger(Trigger.ProcessingTime("30 seconds"))

// Once: process available data, then stop (for batch-style backfill)
.trigger(Trigger.Once())

// AvailableNow: process all currently available data in micro-batches, then stop
.trigger(Trigger.AvailableNow())   // Spark 3.3+, replacement for Once

// Continuous: per-record processing (experimental, limited operators)
.trigger(Trigger.Continuous("100 milliseconds"))

Trigger.AvailableNow() is the right choice for "incremental batch" use cases — running a streaming job on a schedule that processes everything new since last run. It uses streaming semantics (checkpoints, exactly-once) but the job exits when there is no more data, making it ergonomic to run under Airflow or Kubernetes CronJob.

Output Modes

ModeWhat is emittedUsed with
appendOnly newly added rows since last batchStateless transforms, watermark-bound aggregations
updateOnly rows whose value changedAggregations where you want current state per key
completeThe entire result tableAggregations with no watermark; small result sets

Aggregations without a watermark cannot use append mode — Spark cannot know when a window is "done" and so cannot emit a final row. Add .withWatermark(...) and append becomes available; rows are emitted only when the watermark passes their window's end time.

Watermarks and Late Data

// Mark event_time as the column tracking event-time
val withWatermark = events.withWatermark("event_time", "10 minutes")

// Watermark = max(event_time observed across all partitions) - 10 minutes
// State for windows ending before the watermark can be flushed.

// Late data behavior:
// - Event arrives with event_time = 09:32:00, current watermark = 09:25:00
//   -> ON TIME, included in window (09:30, 09:31)
// - Event arrives with event_time = 09:14:00, current watermark = 09:25:00
//   -> LATE, but within allowedLateness, included in window (09:14, 09:15)
// - Event arrives with event_time = 08:50:00, current watermark = 09:25:00
//   -> TOO LATE (older than watermark - allowedLateness), DROPPED

// Watermark progression is per-batch:
//   batch 1: max event_time = 09:00 -> watermark = 08:50
//   batch 2: max event_time = 09:05 -> watermark = 08:55
//   batch 3: max event_time = 09:30 -> watermark = 09:20
// Late events from batch 3 with event_time < 09:20 are dropped.

State Stores

// Default: HDFSBackedStateStore — keeps all state in JVM heap,
// snapshots to checkpoint directory each batch
spark.sql.streaming.stateStore.providerClass =
  org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider

// Better for large state: RocksDB (3.2+)
spark.sql.streaming.stateStore.providerClass =
  org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider
spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled = true

// State checkpoint directory layout
s3://chk/user-totals/
  state/
    0/                # operator id (e.g. 0 for first stateful op)
      0/              # partition 0
        1.delta       # batch 1 changes
        2.delta
        3.snapshot    # full snapshot
      1/              # partition 1
      ...
  offsets/            # source progress per batch
  commits/            # batch commit markers

// Recovery: reload latest snapshot per partition + replay deltas
// With RocksDB: also incremental, far faster recovery for large state

Streaming Joins

// Stream-static join (most common)
val transactions = spark.readStream.format("kafka")...
val accounts = spark.read.parquet("s3://accounts/")    // batch DataFrame
transactions.join(accounts, "account_id")
// On every micro-batch, the static side is broadcast or sort-merged

// Stream-stream join (requires watermarks on both sides)
val orders = spark.readStream.format("kafka")...
  .withWatermark("order_time", "10 minutes")
val payments = spark.readStream.format("kafka")...
  .withWatermark("pay_time", "20 minutes")

orders.join(payments,
  expr("""
    order_id = payment_order_id AND
    pay_time >= order_time AND
    pay_time <= order_time + interval 1 hour
  """))
// Spark keeps both sides' state until watermark passes the time-window upper bound

Tradeoffs and When Not to Use

Structured Streaming wins when

  • You already use Spark for batch — same code, same cluster
  • You can tolerate ~seconds latency
  • You need strong end-to-end exactly-once
  • State fits within partitioned RocksDB on executors

Consider Flink when

  • Sub-100ms latency required (Spark continuous is too limited)
  • Complex event-time semantics (custom session windows, side outputs)
  • State is multi-TB and grows continuously
  • Iterative algorithms or graph streaming

Frequently Asked Questions

What is the difference between Structured Streaming and the legacy DStream API?

DStreams (Spark 1.x) treated streams as a sequence of RDDs, requiring you to think in micro-batches and write imperative code per batch. Structured Streaming (Spark 2.0+) treats a stream as an unbounded DataFrame — you write the same DataFrame/SQL code as for batch, and Spark handles the incremental execution. Structured Streaming has stronger end-to-end fault tolerance, native support for event-time and watermarks, and integrates with Catalyst optimization. DStreams is deprecated; new code should use Structured Streaming.

Micro-batch or continuous trigger — which should I use?

Micro-batch (the default) processes records in batches at fixed intervals or as fast as possible. End-to-end latency is typically 100ms-several seconds. Continuous mode (experimental in 2.3+) processes records one at a time for ~1ms latency, but supports a much smaller subset of operations and has weaker fault tolerance. In production, almost everyone uses micro-batch, often with Trigger.AvailableNow for incremental batch jobs or Trigger.ProcessingTime('100 milliseconds') for low-latency streams.

What does a watermark actually do?

A watermark is a moving threshold that says 'I am no longer expecting events with timestamp earlier than X'. It is computed as max(event_time observed) - allowedLateness. State for windows ending before the watermark can be safely emitted and discarded. Without a watermark, Spark would have to keep all aggregation state forever in case a very old event arrives. Setting allowedLateness=10 minutes means events up to 10 minutes late are still included in their proper window; later events are dropped.

Where is streaming state kept?

By default, in HDFS-backed state stores: each stateful operator's state lives in a checkpoint directory, with one file per partition per micro-batch. Spark 3.2+ added a RocksDB state store backend, which is far better for large state — it spills to local disk and loads only what is needed, while HDFS-backed stores hold all state in the JVM heap. For multi-TB state, RocksDB is the right choice.

How does exactly-once work in Structured Streaming?

Structured Streaming provides end-to-end exactly-once when both source and sink are idempotent or transactional. Sources track progress with checkpoint offsets; the same offsets are reused on restart. Sinks like Kafka use transactions, file sinks use atomic-rename commits, and Delta Lake uses optimistic concurrency. The query engine writes a metadata log per micro-batch describing what input was processed and what output was produced, so restart from any point is consistent.