cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

SakuraDev1 / Board: data-engineering (39000)

SakuraDev1
New Contributor II

Link to post: (autoloader cache and buffer utilization error)
by SakuraDev1


https://community.databricks.com/t5/data-engineering/autoloader-cache-and-buffer-utilization-error/m...


Hey guys, I'm encountering an issue with a project that uses Auto Loader for data ingestion. The production cluster is shutting down due to the error: The Driver restarted - possibly due to an OutOfMemoryError - and this stream has been stopped. I’ve identified that the issue is related to a spike in cache and buffer utilization in the production environment. Interestingly, this doesn’t happen in the development cluster, despite higher utilization there, and it never fails. prd cluster: dev cluster: Since multiple projects are running on the same production cluster (but not in dev), I need to find a way to reduce resource consumption for my project without affecting overall cluster performance. Any advice or suggestions on how to manage this would be greatly appreciated.  

dfBronze = (spark.readStream .format("cloudFiles")
.option("cloudFiles.format", "json")
.schema(json_schema_bronze)
.load("s3://bucket")
.withColumn("filename", col("_metadata.file_path"))
.withColumn("ingestion_time_utc", from_utc_timestamp(current_timestamp(), "UTC"))
.withColumn('data_parsed', from_json(col("data"), json_schema_silver["data"].dataType) )
)

dfBronze.writeStream \
.format("delta") \
.option("checkpointLocation", checkpoint_dir_path_bronze) \
.outputMode("append") \
.trigger(processingTime="120 second") \
.table(bronze_table)

dfSilver = (spark.readStream .format("delta") .table(bronze_table) )
dfSilver = dfSilver_filtered.select( col("site"),
col("device_time_utc"),
col("data.energy.value").alias("energy_cumulative_active_value").cast("decimal(30,14)"), col("data.energy.unit").alias("energy_cumulative_active_unit"), .... more transformations
)

(dfSilver.writeStream
.option("checkpointLocation", checkpoint_dir_path_silver)
.outputMode("append")
.trigger(processingTime="120 second")
.toTable(silver_table)) 

1 REPLY 1

VZLA
Databricks Employee
Databricks Employee

To address the resource scheduling and code-specific optimizations for your Auto Loader data ingestion pipeline, consider the following suggestions:

Resource Scheduling

  1. Dynamic Allocation:

    • Enable dynamic allocation in your cluster configuration. This allows Spark to dynamically adjust the number of executors based on the workload, which can help manage resource utilization more efficiently.
  2. Cluster Policies:

    • Implement cluster policies to control resource allocation. For example, you can set limits on the maximum number of concurrent jobs or the amount of memory and CPU each job can use. This helps prevent any single job from consuming excessive resources.
  3. Job Scheduling:

    • Use Databricks Jobs Scheduler to schedule your streaming jobs during off-peak hours if possible. This can help balance the load on the production cluster.

Code Optimization

  1. Optimize Memory Usage:

    • Reduce Batch Size: Lower the batch size for your streaming queries to reduce memory usage. You can do this by adjusting the trigger interval or using the maxFilesPerTrigger option in Auto Loader.
    • Use Efficient Data Types: Ensure that you are using the most efficient data types for your columns. For example, use DecimalType instead of DoubleType for precise numeric values.
  2. Optimize Transformations:

    • Filter Early: Apply filters as early as possible in your data processing pipeline to reduce the amount of data being processed. For example, filter the data in dfBronze before writing it to the Delta table.
    • Avoid Unnecessary Transformations: Review your transformations to ensure they are necessary and efficient. For example, avoid using withColumn multiple times if you can achieve the same result with a single transformation.
  3. Cache Management:

    • Delta Cache: Enable Delta cache to accelerate data reads by creating copies of remote files in nodes’ local storage. This can be done by setting spark.databricks.io.cache.enabled = true.

    • Spark Cache: Use cache() or persist() methods to cache intermediate DataFrame computations if they are reused in subsequent actions. This can help reduce redundant computations and improve performance.

    • Unpersisting Data: Ensure that you unpersist cached data that is no longer needed using unpersist() to free up memory.

    • Efficient Storage Levels: Configure the cache settings to use a more efficient storage level, such as MEMORY_AND_DISK, to balance memory usage and performance.

Example Code Adjustments

Here are some specific adjustments you can make to your code:

 

# Optimize memory usage by reducing batch size
dfBronze = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("maxFilesPerTrigger", 10)  # Adjust this value based on your workload
    .schema(json_schema_bronze)
    .load("s3://bucket")
    .withColumn("filename", col("_metadata.file_path"))
    .withColumn("ingestion_time_utc", from_utc_timestamp(current_timestamp(), "UTC"))
    .withColumn('data_parsed', from_json(col("data"), json_schema_silver["data"].dataType))
)

# Apply early filtering
dfBronze_filtered = dfBronze.filter(col("some_column") == "some_value")

dfBronze_filtered.writeStream \
    .format("delta") \
    .option("checkpointLocation", checkpoint_dir_path_bronze) \
    .outputMode("append") \
    .trigger(processingTime="120 second") \
    .table(bronze_table)

dfSilver = (spark.readStream
    .format("delta")
    .table(bronze_table)
)

# Apply transformations efficiently
dfSilver_transformed = dfSilver.select(
    col("site"),
    col("device_time_utc"),
    col("data.energy.value").alias("energy_cumulative_active_value").cast("decimal(30,14)"),
    col("data.energy.unit").alias("energy_cumulative_active_unit"),
    # ... more transformations
)

dfSilver_transformed.writeStream \
    .option("checkpointLocation", checkpoint_dir_path_silver) \
    .outputMode("append") \
    .trigger(processingTime="120 second") \
    .toTable(silver_table)

 

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group