- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
07-28-2024 09:59 PM
Hi everyone,
I have a streaming job with 29 notebooks that runs continuously. Initially, I allocated 28 GB of memory to the driver, but the job failed with a "Driver Out of Memory" error after 4 hours of execution.
To address this, I increased the driver's memory to 56 GB, but the job still failed after running for 60 hours.
Here's the code I’m using to ingest data from Kafka:
(
raw_kafka_events
.writeStream
.format('delta')
.option('key.converter', 'org.apache.kafka.connect.json.ByteArrayConverter')
.option('value.converter', 'org.apache.kafka.connect.json.ByteArrayConverter')
.option('checkpointLocation', checkpoint_location)
.outputMode('append')
.toTable(table_path)
)
And here's the code for upserting data from Kafka to the target table:
(
spark
.readStream
.format("delta")
.table(source_table)
.withColumn("key", F.from_json(F.col("key"), key_schema))
.withColumn("value", F.from_json(F.col("value"), value_schema))
.withColumn("landing_time", F.col("_ingested_time"))
.writeStream
.foreachBatch(merge_stream)
.option("checkpointLocation", checkpoint_location)
.start()
)
Despite these configurations, the driver continues to run out of memory. Any suggestions on how to further address this issue?