Hey guys,
I'm encountering an issue with a project that uses Auto Loader for data ingestion. The production cluster is shutting down due to the error: The Driver restarted - possibly due to an OutOfMemoryError - and this stream has been stopped.
I’ve identified that the issue is related to a spike in cache and buffer utilization in the production environment. Interestingly, this doesn’t happen in the development cluster, despite higher utilization there, and it never fails.
prd cluster:
dev cluster:
Since multiple projects are running on the same production cluster (but not in dev), I need to find a way to reduce resource consumption for my project without affecting overall cluster performance.
Any advice or suggestions on how to manage this would be greatly appreciated.
dfBronze = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.schema(json_schema_bronze)
.load("s3://bucket")
.withColumn("filename", col("_metadata.file_path"))
.withColumn("ingestion_time_utc", from_utc_timestamp(current_timestamp(), "UTC"))
.withColumn('data_parsed', from_json(col("data"), json_schema_silver["data"].dataType) )
)
dfBronze.writeStream \
.format("delta") \
.option("checkpointLocation", checkpoint_dir_path_bronze) \
.outputMode("append") \
.trigger(processingTime="120 second") \
.table(bronze_table)
dfSilver = (spark.readStream
.format("delta")
.table(bronze_table)
)
dfSilver = dfSilver_filtered.select(
col("site"),
col("device_time_utc"),
col("data.energy.value").alias("energy_cumulative_active_value").cast("decimal(30,14)"),
col("data.energy.unit").alias("energy_cumulative_active_unit"),
.... more transformations
)
(dfSilver.writeStream
.option("checkpointLocation", checkpoint_dir_path_silver)
.outputMode("append")
.trigger(processingTime="120 second")
.toTable(silver_table))