Link to post: (autoloader cache and buffer utilization error)
by SakuraDev1
https://community.databricks.com/t5/data-engineering/autoloader-cache-and-buffer-utilization-error/m...
Hey guys, I'm encountering an issue with a project that uses Auto Loader for data ingestion. The production cluster is shutting down due to the error: The Driver restarted - possibly due to an OutOfMemoryError - and this stream has been stopped. I’ve identified that the issue is related to a spike in cache and buffer utilization in the production environment. Interestingly, this doesn’t happen in the development cluster, despite higher utilization there, and it never fails. prd cluster: dev cluster: Since multiple projects are running on the same production cluster (but not in dev), I need to find a way to reduce resource consumption for my project without affecting overall cluster performance. Any advice or suggestions on how to manage this would be greatly appreciated.
dfBronze = (spark.readStream .format("cloudFiles")
.option("cloudFiles.format", "json")
.schema(json_schema_bronze)
.load("s3://bucket")
.withColumn("filename", col("_metadata.file_path"))
.withColumn("ingestion_time_utc", from_utc_timestamp(current_timestamp(), "UTC"))
.withColumn('data_parsed', from_json(col("data"), json_schema_silver["data"].dataType) )
)
dfBronze.writeStream \
.format("delta") \
.option("checkpointLocation", checkpoint_dir_path_bronze) \
.outputMode("append") \
.trigger(processingTime="120 second") \
.table(bronze_table)
dfSilver = (spark.readStream .format("delta") .table(bronze_table) )
dfSilver = dfSilver_filtered.select( col("site"),
col("device_time_utc"),
col("data.energy.value").alias("energy_cumulative_active_value").cast("decimal(30,14)"), col("data.energy.unit").alias("energy_cumulative_active_unit"), .... more transformations
)
(dfSilver.writeStream
.option("checkpointLocation", checkpoint_dir_path_silver)
.outputMode("append")
.trigger(processingTime="120 second")
.toTable(silver_table))