cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

autoloader cache and buffer utilization error

SakuraDev1
New Contributor II

Hey guys,

I'm encountering an issue with a project that uses Auto Loader for data ingestion. The production cluster is shutting down due to the error: The Driver restarted - possibly due to an OutOfMemoryError - and this stream has been stopped.

I’ve identified that the issue is related to a spike in cache and buffer utilization in the production environment. Interestingly, this doesn’t happen in the development cluster, despite higher utilization there, and it never fails.

prd cluster:

SakuraDev1_0-1729271704783.png

dev cluster:

SakuraDev1_0-1729271834424.png

Since multiple projects are running on the same production cluster (but not in dev), I need to find a way to reduce resource consumption for my project without affecting overall cluster performance.

Any advice or suggestions on how to manage this would be greatly appreciated.

dfBronze = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .schema(json_schema_bronze)
    .load("s3://bucket")
    .withColumn("filename", col("_metadata.file_path"))
    .withColumn("ingestion_time_utc", from_utc_timestamp(current_timestamp(), "UTC"))
    .withColumn('data_parsed', from_json(col("data"), json_schema_silver["data"].dataType) )
)

dfBronze.writeStream \
    .format("delta") \
    .option("checkpointLocation", checkpoint_dir_path_bronze) \
    .outputMode("append") \
    .trigger(processingTime="120 second") \
    .table(bronze_table)
dfSilver = (spark.readStream
      .format("delta")
      .table(bronze_table)
)

dfSilver = dfSilver_filtered.select(
    col("site"),
    col("device_time_utc"),

   col("data.energy.value").alias("energy_cumulative_active_value").cast("decimal(30,14)"),
    col("data.energy.unit").alias("energy_cumulative_active_unit"),
.... more transformations

)

(dfSilver.writeStream
    .option("checkpointLocation", checkpoint_dir_path_silver)
    .outputMode("append")
    .trigger(processingTime="120 second") 
    .toTable(silver_table))
1 REPLY 1

VZLA
Databricks Employee
Databricks Employee

The error message is sometimes generic "possibly due to an OutOfMemoryError"

There is memory pressure indeed, but try to correlate those graph metrics with the Driver's STDOUT file content and check if the GC/FullGCs are able to work properly and reclaim memory. Also check if there are any other side effects, like CPU contention or anything that could cause or lead the driver to become unresponsive. If the Chauffeur can't heartbeat check with the Driver (because the heartbeat thread is blocked) then it would also lead to the same situation. Just sharing this as additional examples.

Is the Driver unresponsiveness triggered by your application? Check at which point of your application (transformation/action) it is leading the Driver to an unresponsiveness status.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group