by SakuraDev1
Hey guys, I'm encountering an issue with a project that uses Auto Loader for data ingestion. The production cluster is shutting down due to the error: The Driver restarted - possibly due to an OutOfMemoryError - and this stream has been stopped. I’ve identified that the issue is related to a spike in cache and buffer utilization in the production environment. Interestingly, this doesn’t happen in the development cluster, despite higher utilization there, and it never fails. prd cluster: dev cluster: Since multiple projects are running on the same production cluster (but not in dev), I need to find a way to reduce resource consumption for my project without affecting overall cluster performance. Any advice or suggestions on how to manage this would be greatly appreciated.
dfBronze = (spark.readStream .format("cloudFiles")
.option("cloudFiles.format", "json")
.withColumn("filename", col("_metadata.file_path"))
.withColumn("ingestion_time_utc", from_utc_timestamp(current_timestamp(), "UTC"))
.withColumn('data_parsed', from_json(col("data"), json_schema_silver["data"].dataType) )
dfBronze.writeStream \
.format("delta") \
.option("checkpointLocation", checkpoint_dir_path_bronze) \
.outputMode("append") \
.trigger(processingTime="120 second") \
dfSilver = (spark.readStream .format("delta") .table(bronze_table) )
dfSilver = col("site"),
col("").alias("energy_cumulative_active_value").cast("decimal(30,14)"), col("").alias("energy_cumulative_active_unit"), .... more transformations
.option("checkpointLocation", checkpoint_dir_path_silver)
.trigger(processingTime="120 second")