cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Driver: Out of Memory

tramtran
Contributor

Hi everyone,

I have a streaming job with 29 notebooks that runs continuously. Initially, I allocated 28 GB of memory to the driver, but the job failed with a "Driver Out of Memory" error after 4 hours of execution.

To address this, I increased the driver's memory to 56 GB, but the job still failed after running for 60 hours.

Here's the code I’m using to ingest data from Kafka:

(
raw_kafka_events
.writeStream
.format('delta')
.option('key.converter', 'org.apache.kafka.connect.json.ByteArrayConverter')
.option('value.converter', 'org.apache.kafka.connect.json.ByteArrayConverter')
.option('checkpointLocation', checkpoint_location)
.outputMode('append')
.toTable(table_path)
)

And here's the code for upserting data from Kafka to the target table:

(
spark
.readStream
.format("delta")
.table(source_table)
.withColumn("key", F.from_json(F.col("key"), key_schema))
.withColumn("value", F.from_json(F.col("value"), value_schema))
.withColumn("landing_time", F.col("_ingested_time"))
.writeStream
.foreachBatch(merge_stream)
.option("checkpointLocation", checkpoint_location)
.start()
)

 

Despite these configurations, the driver continues to run out of memory. Any suggestions on how to further address this issue?

1 ACCEPTED SOLUTION

Accepted Solutions

xorbix_rshiva
Contributor

Does your merge_stream function contain any stateful operations, such as aggregation or deduplication logic? If so, your streaming job may be accumulating state in memory over time, which will eventually result in OOM error. If this is the case, you will see memory usage increase over time, until it hits the amount allocated to your cluster (View memory utilization in the "Metrics" tab for your cluster).

To resolve the issue, you could try adding a watermark to your upsert streaming query:

(
spark
.readStream
.format("delta")
.table(source_table)
.withColumn("key", F.from_json(F.col("key"), key_schema))
.withColumn("value", F.from_json(F.col("value"), value_schema))
.withColumn("landing_time", F.col("_ingested_time"))
.withWatermark("event_time", "1 hour")
.writeStream
.foreachBatch(merge_stream)
.option("checkpointLocation", checkpoint_location)
.start()
)

Applying a watermark will control the amount of memory used by your streaming job, and it can also improve job performance by keeping your stream state to a manageable size. If you apply a watermark, you should set the lateness threshold according to your requirements. For example, if your stream contains deduplication logic and you expect no duplicate records to arrive within more than 10 minutes of each other, you could set your watermark timestamp to 10 minutes.

Check out this documentation for more info on watermarks: https://docs.databricks.com/en/structured-streaming/watermarks.html

View solution in original post

4 REPLIES 4

xorbix_rshiva
Contributor

Does your merge_stream function contain any stateful operations, such as aggregation or deduplication logic? If so, your streaming job may be accumulating state in memory over time, which will eventually result in OOM error. If this is the case, you will see memory usage increase over time, until it hits the amount allocated to your cluster (View memory utilization in the "Metrics" tab for your cluster).

To resolve the issue, you could try adding a watermark to your upsert streaming query:

(
spark
.readStream
.format("delta")
.table(source_table)
.withColumn("key", F.from_json(F.col("key"), key_schema))
.withColumn("value", F.from_json(F.col("value"), value_schema))
.withColumn("landing_time", F.col("_ingested_time"))
.withWatermark("event_time", "1 hour")
.writeStream
.foreachBatch(merge_stream)
.option("checkpointLocation", checkpoint_location)
.start()
)

Applying a watermark will control the amount of memory used by your streaming job, and it can also improve job performance by keeping your stream state to a manageable size. If you apply a watermark, you should set the lateness threshold according to your requirements. For example, if your stream contains deduplication logic and you expect no duplicate records to arrive within more than 10 minutes of each other, you could set your watermark timestamp to 10 minutes.

Check out this documentation for more info on watermarks: https://docs.databricks.com/en/structured-streaming/watermarks.html

Thanks @xorbix_rshiva for your reply,

There was deduplication logic in my code. Could I use _source_cdc_time for the watermark in this case?

def merge_stream(microBatchDF, i):
    microBatchDF.createOrReplaceTempView("vw_delta")
    microBatchDF._jdf.sparkSession().sql("""
        MERGE INTO customer as tg 
        USING (
            SELECT * FROM (
            SELECT
                key.customerid,
                key.customerkey,
                value.after.customername,
                --meta data
                value.op,
                current_timestamp() as _ingested_time,
                to_timestamp(value.source.ts_ms/1000) as _source_cdc_time,
                to_timestamp(value.ts_ms/1000) as _kafka_created_time,
                landing_time as _landing_time,
                row_number() over(PARTITION BY key.customerid, key.cusomterkey order by value.source.ts_ms desc) as rank
                FROM vw_delta
                WHERE value.op IS NOT NULL
                ) as t 
                WHERE rank = 1
        ) as src
        ON  tg.customerid= src.customerid 
        AND tg.customerkey= src.customerkey
        WHEN MATCHED AND src.op = 'd' THEN DELETE
        WHEN MATCHED AND src.op != 'd' THEN UPDATE SET *
        WHEN NOT MATCHED AND src.op != 'd' THEN INSERT *
    """)

xorbix_rshiva
Contributor

It looks like _source_cdc_time is the timestamp for when the CDC transaction occurred in your source system. This would be a good choice for a timestamp column for your watermark, since you would be deduping values according to the time the transactions actually occurred, not the timestamp when they are ingested and processed in Databricks.

Kaniz_Fatma
Community Manager
Community Manager

Hi @tramtran, Thank you for reaching out to our community! We're here to help you.

To ensure we provide you with the best support, could you please take a moment to review the response and choose the one that best answers your question? Your feedback not only helps us assist you better but also benefits other community members who may have similar questions in the future.

If you found the answer helpful, consider giving it a kudo. If the response fully addresses your question, please mark it as the accepted solution. This will help us close the thread and ensure your question is resolved.

We appreciate your participation and are here to assist you further if you need it!

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group