07-28-2024 09:59 PM
Hi everyone,
I have a streaming job with 29 notebooks that runs continuously. Initially, I allocated 28 GB of memory to the driver, but the job failed with a "Driver Out of Memory" error after 4 hours of execution.
To address this, I increased the driver's memory to 56 GB, but the job still failed after running for 60 hours.
Here's the code I’m using to ingest data from Kafka:
(
raw_kafka_events
.writeStream
.format('delta')
.option('key.converter', 'org.apache.kafka.connect.json.ByteArrayConverter')
.option('value.converter', 'org.apache.kafka.connect.json.ByteArrayConverter')
.option('checkpointLocation', checkpoint_location)
.outputMode('append')
.toTable(table_path)
)
And here's the code for upserting data from Kafka to the target table:
(
spark
.readStream
.format("delta")
.table(source_table)
.withColumn("key", F.from_json(F.col("key"), key_schema))
.withColumn("value", F.from_json(F.col("value"), value_schema))
.withColumn("landing_time", F.col("_ingested_time"))
.writeStream
.foreachBatch(merge_stream)
.option("checkpointLocation", checkpoint_location)
.start()
)
Despite these configurations, the driver continues to run out of memory. Any suggestions on how to further address this issue?
07-29-2024 11:46 AM
Does your merge_stream function contain any stateful operations, such as aggregation or deduplication logic? If so, your streaming job may be accumulating state in memory over time, which will eventually result in OOM error. If this is the case, you will see memory usage increase over time, until it hits the amount allocated to your cluster (View memory utilization in the "Metrics" tab for your cluster).
To resolve the issue, you could try adding a watermark to your upsert streaming query:
(
spark
.readStream
.format("delta")
.table(source_table)
.withColumn("key", F.from_json(F.col("key"), key_schema))
.withColumn("value", F.from_json(F.col("value"), value_schema))
.withColumn("landing_time", F.col("_ingested_time"))
.withWatermark("event_time", "1 hour")
.writeStream
.foreachBatch(merge_stream)
.option("checkpointLocation", checkpoint_location)
.start()
)
Applying a watermark will control the amount of memory used by your streaming job, and it can also improve job performance by keeping your stream state to a manageable size. If you apply a watermark, you should set the lateness threshold according to your requirements. For example, if your stream contains deduplication logic and you expect no duplicate records to arrive within more than 10 minutes of each other, you could set your watermark timestamp to 10 minutes.
Check out this documentation for more info on watermarks: https://docs.databricks.com/en/structured-streaming/watermarks.html
07-29-2024 11:46 AM
Does your merge_stream function contain any stateful operations, such as aggregation or deduplication logic? If so, your streaming job may be accumulating state in memory over time, which will eventually result in OOM error. If this is the case, you will see memory usage increase over time, until it hits the amount allocated to your cluster (View memory utilization in the "Metrics" tab for your cluster).
To resolve the issue, you could try adding a watermark to your upsert streaming query:
(
spark
.readStream
.format("delta")
.table(source_table)
.withColumn("key", F.from_json(F.col("key"), key_schema))
.withColumn("value", F.from_json(F.col("value"), value_schema))
.withColumn("landing_time", F.col("_ingested_time"))
.withWatermark("event_time", "1 hour")
.writeStream
.foreachBatch(merge_stream)
.option("checkpointLocation", checkpoint_location)
.start()
)
Applying a watermark will control the amount of memory used by your streaming job, and it can also improve job performance by keeping your stream state to a manageable size. If you apply a watermark, you should set the lateness threshold according to your requirements. For example, if your stream contains deduplication logic and you expect no duplicate records to arrive within more than 10 minutes of each other, you could set your watermark timestamp to 10 minutes.
Check out this documentation for more info on watermarks: https://docs.databricks.com/en/structured-streaming/watermarks.html
07-29-2024 05:33 PM
Thanks @xorbix_rshiva for your reply,
There was deduplication logic in my code. Could I use _source_cdc_time for the watermark in this case?
07-30-2024 07:07 AM
It looks like _source_cdc_time is the timestamp for when the CDC transaction occurred in your source system. This would be a good choice for a timestamp column for your watermark, since you would be deduping values according to the time the transactions actually occurred, not the timestamp when they are ingested and processed in Databricks.
07-30-2024 10:30 PM
Hi @tramtran, Thank you for reaching out to our community! We're here to help you.
To ensure we provide you with the best support, could you please take a moment to review the response and choose the one that best answers your question? Your feedback not only helps us assist you better but also benefits other community members who may have similar questions in the future.
If you found the answer helpful, consider giving it a kudo. If the response fully addresses your question, please mark it as the accepted solution. This will help us close the thread and ensure your question is resolved.
We appreciate your participation and are here to assist you further if you need it!
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group