This discussion thread contains community insights and shared knowledge.
YOUR REPLY
3 Replies
Great question! We solved this with a two-layer approach that's been running in production for 8 months:
Layer 1: Streaming with generous watermark
We use a 15-minute watermark for the "hot" path. This captures ~97% of events with acceptable latency.
# Hot path - streaming
hot_df = df \
.withWatermark("event_time", "15 minutes") \
.groupBy(window("event_time", "1 hour")) \
.agg(count("*").alias("event_count"))
hot_df.writeStream \
.format("delta") \
.outputMode("append") \
.start("s3://data/hot_metrics/")Layer 2: Batch reconciliation with Delta MERGE
A scheduled job runs every 4 hours and merges late-arriving data:
# Cold path - batch reconciliation
late_events = spark.read.format("kafka") \
.option("startingOffsets", four_hours_ago) \
.load()
delta_table = DeltaTable.forPath(spark, "s3://data/metrics/")
delta_table.alias("target").merge(
late_events.alias("source"),
"target.window_start = source.window_start"
).whenMatchedUpdate(set={
"event_count": "target.event_count + source.late_count"
}).whenNotMatchedInsertAll().execute()The key insight is that your hot path doesn't need to be perfect - it just needs to be fast. The batch reconciliation handles correctness.
More on this pattern: https://delta.io/blog/merge-late-arriving-data
This is exactly what I was looking for! The two-layer approach makes total sense. Quick question - how do you handle exactly-once semantics in the merge step? Do you use idempotent writes?
Yes! We use the merge condition as our idempotency key. Since Delta Lake's MERGE is atomic, you get exactly-once semantics for free as long as your merge condition is deterministic.
The trick is using a composite key: window_start + source_partition
Another approach worth considering: Flink's event-time processing handles this more natively than Spark.
In Flink you can configure allowed lateness separately from the watermark:
DataStream<Event> stream = env
.addSource(kafkaSource)
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Event>forBoundedOutOfOrderness(Duration.ofMinutes(5))
.withIdleness(Duration.ofSeconds(30))
)
.keyBy(Event::getCustomerId)
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.allowedLateness(Time.minutes(30)) // Accept late data up to 30 min
.sideOutputLateData(lateOutputTag) // Capture anything beyond that
.aggregate(new EventAggregator());The allowedLateness parameter keeps the window state around so late events can still update results, while the side output catches anything that's really late.
Not saying you should switch to Flink, but it's worth knowing the tradeoffs!
We use a simpler approach for our use case:
- Write all events (including late ones) to a raw Delta table with no aggregation
- Run materialized views on top for the aggregated metrics
- Delta Lake's time travel lets us recompute any window
This works well if your query patterns are flexible. Not ideal if you need sub-second query latency though.