Structured Streaming
DataFrames Over Unbounded Tables, With Watermarks
Structured Streaming reframes streaming as a DataFrame query against an unbounded input table. You write the same SQL or DataFrame code you would use for batch, set a trigger interval, and Spark incrementally computes the result. Under the hood it is a series of small Catalyst-optimized batch jobs, each operating on the new rows since the last micro-batch.
The big architectural choice is event-time semantics with watermarks. Stateful aggregations remember per-key state in a checkpointed state store; the watermark moves forward as event-time progresses, and old state is garbage-collected. This makes long-running streaming queries operationally tractable.
The Unbounded Table Model
Key Numbers
Basic Streaming Query
// Read from Kafka
val events = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka-1:9092")
.option("subscribe", "events")
.option("startingOffsets", "latest")
.load()
.selectExpr("CAST(value AS STRING) as json", "timestamp")
.select(from_json($"json", schema).as("data"), $"timestamp")
.select("data.*", "timestamp")
// Windowed aggregation with event-time watermark
val perMinute = events
.withWatermark("event_time", "10 minutes")
.groupBy(
window($"event_time", "1 minute"),
$"user_id"
)
.agg(
sum($"amount").as("total"),
count("*").as("events")
)
// Write to Kafka with exactly-once
val query = perMinute.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka-1:9092")
.option("topic", "user-totals")
.option("checkpointLocation", "s3://chk/user-totals/")
.outputMode("update")
.trigger(Trigger.ProcessingTime("1 minute"))
.start() Triggers
// Default: continuous micro-batches as fast as possible
.trigger(Trigger.ProcessingTime(0)) // ASAP
// Fixed processing-time interval
.trigger(Trigger.ProcessingTime("30 seconds"))
// Once: process available data, then stop (for batch-style backfill)
.trigger(Trigger.Once())
// AvailableNow: process all currently available data in micro-batches, then stop
.trigger(Trigger.AvailableNow()) // Spark 3.3+, replacement for Once
// Continuous: per-record processing (experimental, limited operators)
.trigger(Trigger.Continuous("100 milliseconds")) Trigger.AvailableNow() is the right choice for "incremental batch" use cases — running a
streaming job on a schedule that processes everything new since last run. It uses streaming semantics
(checkpoints, exactly-once) but the job exits when there is no more data, making it ergonomic to run
under Airflow or Kubernetes CronJob.
Output Modes
| Mode | What is emitted | Used with |
|---|---|---|
| append | Only newly added rows since last batch | Stateless transforms, watermark-bound aggregations |
| update | Only rows whose value changed | Aggregations where you want current state per key |
| complete | The entire result table | Aggregations with no watermark; small result sets |
Aggregations without a watermark cannot use append mode — Spark cannot know when a window is "done"
and so cannot emit a final row. Add .withWatermark(...) and append becomes available; rows
are emitted only when the watermark passes their window's end time.
Watermarks and Late Data
// Mark event_time as the column tracking event-time
val withWatermark = events.withWatermark("event_time", "10 minutes")
// Watermark = max(event_time observed across all partitions) - 10 minutes
// State for windows ending before the watermark can be flushed.
// Late data behavior:
// - Event arrives with event_time = 09:32:00, current watermark = 09:25:00
// -> ON TIME, included in window (09:30, 09:31)
// - Event arrives with event_time = 09:14:00, current watermark = 09:25:00
// -> LATE, but within allowedLateness, included in window (09:14, 09:15)
// - Event arrives with event_time = 08:50:00, current watermark = 09:25:00
// -> TOO LATE (older than watermark - allowedLateness), DROPPED
// Watermark progression is per-batch:
// batch 1: max event_time = 09:00 -> watermark = 08:50
// batch 2: max event_time = 09:05 -> watermark = 08:55
// batch 3: max event_time = 09:30 -> watermark = 09:20
// Late events from batch 3 with event_time < 09:20 are dropped. State Stores
// Default: HDFSBackedStateStore — keeps all state in JVM heap,
// snapshots to checkpoint directory each batch
spark.sql.streaming.stateStore.providerClass =
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider
// Better for large state: RocksDB (3.2+)
spark.sql.streaming.stateStore.providerClass =
org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider
spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled = true
// State checkpoint directory layout
s3://chk/user-totals/
state/
0/ # operator id (e.g. 0 for first stateful op)
0/ # partition 0
1.delta # batch 1 changes
2.delta
3.snapshot # full snapshot
1/ # partition 1
...
offsets/ # source progress per batch
commits/ # batch commit markers
// Recovery: reload latest snapshot per partition + replay deltas
// With RocksDB: also incremental, far faster recovery for large state Streaming Joins
// Stream-static join (most common)
val transactions = spark.readStream.format("kafka")...
val accounts = spark.read.parquet("s3://accounts/") // batch DataFrame
transactions.join(accounts, "account_id")
// On every micro-batch, the static side is broadcast or sort-merged
// Stream-stream join (requires watermarks on both sides)
val orders = spark.readStream.format("kafka")...
.withWatermark("order_time", "10 minutes")
val payments = spark.readStream.format("kafka")...
.withWatermark("pay_time", "20 minutes")
orders.join(payments,
expr("""
order_id = payment_order_id AND
pay_time >= order_time AND
pay_time <= order_time + interval 1 hour
"""))
// Spark keeps both sides' state until watermark passes the time-window upper bound Tradeoffs and When Not to Use
Structured Streaming wins when
- You already use Spark for batch — same code, same cluster
- You can tolerate ~seconds latency
- You need strong end-to-end exactly-once
- State fits within partitioned RocksDB on executors
Consider Flink when
- Sub-100ms latency required (Spark continuous is too limited)
- Complex event-time semantics (custom session windows, side outputs)
- State is multi-TB and grows continuously
- Iterative algorithms or graph streaming
Frequently Asked Questions
What is the difference between Structured Streaming and the legacy DStream API?
DStreams (Spark 1.x) treated streams as a sequence of RDDs, requiring you to think in micro-batches and write imperative code per batch. Structured Streaming (Spark 2.0+) treats a stream as an unbounded DataFrame — you write the same DataFrame/SQL code as for batch, and Spark handles the incremental execution. Structured Streaming has stronger end-to-end fault tolerance, native support for event-time and watermarks, and integrates with Catalyst optimization. DStreams is deprecated; new code should use Structured Streaming.
Micro-batch or continuous trigger — which should I use?
Micro-batch (the default) processes records in batches at fixed intervals or as fast as possible. End-to-end latency is typically 100ms-several seconds. Continuous mode (experimental in 2.3+) processes records one at a time for ~1ms latency, but supports a much smaller subset of operations and has weaker fault tolerance. In production, almost everyone uses micro-batch, often with Trigger.AvailableNow for incremental batch jobs or Trigger.ProcessingTime('100 milliseconds') for low-latency streams.
What does a watermark actually do?
A watermark is a moving threshold that says 'I am no longer expecting events with timestamp earlier than X'. It is computed as max(event_time observed) - allowedLateness. State for windows ending before the watermark can be safely emitted and discarded. Without a watermark, Spark would have to keep all aggregation state forever in case a very old event arrives. Setting allowedLateness=10 minutes means events up to 10 minutes late are still included in their proper window; later events are dropped.
Where is streaming state kept?
By default, in HDFS-backed state stores: each stateful operator's state lives in a checkpoint directory, with one file per partition per micro-batch. Spark 3.2+ added a RocksDB state store backend, which is far better for large state — it spills to local disk and loads only what is needed, while HDFS-backed stores hold all state in the JVM heap. For multi-TB state, RocksDB is the right choice.
How does exactly-once work in Structured Streaming?
Structured Streaming provides end-to-end exactly-once when both source and sink are idempotent or transactional. Sources track progress with checkpoint offsets; the same offsets are reused on restart. Sinks like Kafka use transactions, file sinks use atomic-rename commits, and Delta Lake uses optimistic concurrency. The query engine writes a metadata log per micro-batch describing what input was processed and what output was produced, so restart from any point is consistent.