DataboltDATABOLT
CHALLENGESPATTERNSLEARNDISCUSSIONSWRITE-UPSMY WORK
DataboltDATABOLT
Home
WORK
My WorkNotebooksCode / Scripts
COMMUNITY
DiscussionsCompetitionsContributionsWrite-ups
LEARN
Learning PathsNotebook Playground
ACHIEVEMENTS
Badges
SETTINGS
Cloud (BYOC)
RECENT
ETL Pipeline - Customer Data
17m ago
ETL Speed Race
1h ago
Late-arriving data approach
2h ago
Kafka Consumer Script
4h ago
Spark Fundamentals
8h ago
LOGIN / SIGN UP
BACK TO DISCUSSIONS
42
PINNEDQUESTIONReal-Time Stream Processing

Best practices for handling late-arriving data in streaming pipelines?

D
dataeng_sarahSenior Data Engineer
Feb 3, 2026456 views

I'm working on a real-time analytics pipeline processing ~2M events/hour from Kafka and I keep running into issues with late-arriving data.

Current Setup

  • Source: Kafka topic with customer events
  • Processing: Spark Structured Streaming
  • Sink: Delta Lake on S3

The Problem

About 3-5% of events arrive more than 10 minutes late due to mobile client buffering. Our current watermark of 5 minutes drops these events entirely.

python
df = spark.readStream \
    .format("kafka") \
    .option("subscribe", "customer_events") \
    .load()

# Current watermark - too aggressive
windowed = df \
    .withWatermark("event_time", "5 minutes") \
    .groupBy(window("event_time", "1 hour")) \
    .agg(count("*").alias("event_count"))

What I've Tried

  1. Increasing the watermark to 30 minutes -- this works but adds too much latency
  2. A separate batch job to reconcile late data -- works but feels hacky
  3. Writing late events to a dead letter queue for reprocessing

Has anyone implemented a hybrid approach that balances latency with completeness? I've seen some posts about using Delta Lake's MERGE for late-arriving data but haven't found a clean pattern.

Any help is appreciated! Here's the relevant Spark docs: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#handling-late-data-and-watermarking

Also found this useful article: https://databricks.com/blog/watermarking-in-structured-streaming

streamingkafkawatermarkssparkdelta-lake

YOUR REPLY

Supports **bold**, *italic*, `code`, ```code blocks```, lists, headings, and blockquotesCtrl+B bold | Ctrl+I italic | Tab indent
URLs are automatically converted to **bold clickable links**

3 Replies

ACCEPTED ANSWER
67
S
stream_kingStaff Engineer @ DataCo
Feb 3, 2026

Great question! We solved this with a two-layer approach that's been running in production for 8 months:

Layer 1: Streaming with generous watermark

We use a 15-minute watermark for the "hot" path. This captures ~97% of events with acceptable latency.

python
# Hot path - streaming
hot_df = df \
    .withWatermark("event_time", "15 minutes") \
    .groupBy(window("event_time", "1 hour")) \
    .agg(count("*").alias("event_count"))

hot_df.writeStream \
    .format("delta") \
    .outputMode("append") \
    .start("s3://data/hot_metrics/")

Layer 2: Batch reconciliation with Delta MERGE

A scheduled job runs every 4 hours and merges late-arriving data:

python
# Cold path - batch reconciliation
late_events = spark.read.format("kafka") \
    .option("startingOffsets", four_hours_ago) \
    .load()

delta_table = DeltaTable.forPath(spark, "s3://data/metrics/")
delta_table.alias("target").merge(
    late_events.alias("source"),
    "target.window_start = source.window_start"
).whenMatchedUpdate(set={
    "event_count": "target.event_count + source.late_count"
}).whenNotMatchedInsertAll().execute()

The key insight is that your hot path doesn't need to be perfect - it just needs to be fast. The batch reconciliation handles correctness.

More on this pattern: https://delta.io/blog/merge-late-arriving-data

8
D
dataeng_sarahSenior Data Engineer
Feb 3, 2026

This is exactly what I was looking for! The two-layer approach makes total sense. Quick question - how do you handle exactly-once semantics in the merge step? Do you use idempotent writes?

12
S
stream_kingStaff Engineer @ DataCo
Feb 3, 2026

Yes! We use the merge condition as our idempotency key. Since Delta Lake's MERGE is atomic, you get exactly-once semantics for free as long as your merge condition is deterministic.

The trick is using a composite key: window_start + source_partition

34
P
pipeline_ninjaData Platform Lead
Feb 3, 2026

Another approach worth considering: Flink's event-time processing handles this more natively than Spark.

In Flink you can configure allowed lateness separately from the watermark:

java
DataStream<Event> stream = env
    .addSource(kafkaSource)
    .assignTimestampsAndWatermarks(
        WatermarkStrategy
            .<Event>forBoundedOutOfOrderness(Duration.ofMinutes(5))
            .withIdleness(Duration.ofSeconds(30))
    )
    .keyBy(Event::getCustomerId)
    .window(TumblingEventTimeWindows.of(Time.hours(1)))
    .allowedLateness(Time.minutes(30))  // Accept late data up to 30 min
    .sideOutputLateData(lateOutputTag)   // Capture anything beyond that
    .aggregate(new EventAggregator());

The allowedLateness parameter keeps the window state around so late events can still update results, while the side output catches anything that's really late.

Not saying you should switch to Flink, but it's worth knowing the tradeoffs!

15
D
data_wizard
Feb 3, 2026

We use a simpler approach for our use case:

  • Write all events (including late ones) to a raw Delta table with no aggregation
  • Run materialized views on top for the aggregated metrics
  • Delta Lake's time travel lets us recompute any window

This works well if your query patterns are flexible. Not ideal if you need sub-second query latency though.