cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

autoloader cache and buffer utilization error

SakuraDev1
New Contributor

Hey guys,

I'm encountering an issue with a project that uses Auto Loader for data ingestion. The production cluster is shutting down due to the error: The Driver restarted - possibly due to an OutOfMemoryError - and this stream has been stopped.

I’ve identified that the issue is related to a spike in cache and buffer utilization in the production environment. Interestingly, this doesn’t happen in the development cluster, despite higher utilization there, and it never fails.

prd cluster:

SakuraDev1_0-1729271704783.png

dev cluster:

SakuraDev1_0-1729271834424.png

Since multiple projects are running on the same production cluster (but not in dev), I need to find a way to reduce resource consumption for my project without affecting overall cluster performance.

Any advice or suggestions on how to manage this would be greatly appreciated.

dfBronze = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .schema(json_schema_bronze)
    .load("s3://bucket")
    .withColumn("filename", col("_metadata.file_path"))
    .withColumn("ingestion_time_utc", from_utc_timestamp(current_timestamp(), "UTC"))
    .withColumn('data_parsed', from_json(col("data"), json_schema_silver["data"].dataType) )
)

dfBronze.writeStream \
    .format("delta") \
    .option("checkpointLocation", checkpoint_dir_path_bronze) \
    .outputMode("append") \
    .trigger(processingTime="120 second") \
    .table(bronze_table)
dfSilver = (spark.readStream
      .format("delta")
      .table(bronze_table)
)

dfSilver = dfSilver_filtered.select(
    col("site"),
    col("device_time_utc"),

   col("data.energy.value").alias("energy_cumulative_active_value").cast("decimal(30,14)"),
    col("data.energy.unit").alias("energy_cumulative_active_unit"),
.... more transformations

)

(dfSilver.writeStream
    .option("checkpointLocation", checkpoint_dir_path_silver)
    .outputMode("append")
    .trigger(processingTime="120 second") 
    .toTable(silver_table))
0 REPLIES 0

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group