xorbix_rshiva
Databricks MVP

Does your merge_stream function contain any stateful operations, such as aggregation or deduplication logic? If so, your streaming job may be accumulating state in memory over time, which will eventually result in OOM error. If this is the case, you will see memory usage increase over time, until it hits the amount allocated to your cluster (View memory utilization in the "Metrics" tab for your cluster).

To resolve the issue, you could try adding a watermark to your upsert streaming query:

(
spark
.readStream
.format("delta")
.table(source_table)
.withColumn("key", F.from_json(F.col("key"), key_schema))
.withColumn("value", F.from_json(F.col("value"), value_schema))
.withColumn("landing_time", F.col("_ingested_time"))
.withWatermark("event_time", "1 hour")
.writeStream
.foreachBatch(merge_stream)
.option("checkpointLocation", checkpoint_location)
.start()
)

Applying a watermark will control the amount of memory used by your streaming job, and it can also improve job performance by keeping your stream state to a manageable size. If you apply a watermark, you should set the lateness threshold according to your requirements. For example, if your stream contains deduplication logic and you expect no duplicate records to arrive within more than 10 minutes of each other, you could set your watermark timestamp to 10 minutes.

Check out this documentation for more info on watermarks: https://docs.databricks.com/en/structured-streaming/watermarks.html

View solution in original post