<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: SakuraDev1 / Board: data-engineering (39000) in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/sakuradev1-board-data-engineering-39000/m-p/97073#M39414</link>
    <description>&lt;P class="_1t7bu9h1 paragraph"&gt;To address the resource scheduling and code-specific optimizations for your Auto Loader data ingestion pipeline, consider the following suggestions:&lt;/P&gt;
&lt;H3 class="_1jeaq5e0 _1t7bu9h9 heading3"&gt;Resource Scheduling&lt;/H3&gt;
&lt;OL&gt;
&lt;LI&gt;
&lt;P class="_1t7bu9h1 paragraph"&gt;&lt;STRONG&gt;Dynamic Allocation&lt;/STRONG&gt;:&lt;/P&gt;
&lt;UL class="_1t7bu9h7 _1t7bu9h2"&gt;
&lt;LI&gt;Enable dynamic allocation in your cluster configuration. This allows Spark to dynamically adjust the number of executors based on the workload, which can help manage resource utilization more efficiently.&lt;/LI&gt;
&lt;/UL&gt;
&lt;/LI&gt;
&lt;LI&gt;
&lt;P class="_1t7bu9h1 paragraph"&gt;&lt;STRONG&gt;Cluster Policies&lt;/STRONG&gt;:&lt;/P&gt;
&lt;UL class="_1t7bu9h7 _1t7bu9h2"&gt;
&lt;LI&gt;Implement cluster policies to control resource allocation. For example, you can set limits on the maximum number of concurrent jobs or the amount of memory and CPU each job can use. This helps prevent any single job from consuming excessive resources.&lt;/LI&gt;
&lt;/UL&gt;
&lt;/LI&gt;
&lt;LI&gt;
&lt;P class="_1t7bu9h1 paragraph"&gt;&lt;STRONG&gt;Job Scheduling&lt;/STRONG&gt;:&lt;/P&gt;
&lt;UL class="_1t7bu9h7 _1t7bu9h2"&gt;
&lt;LI&gt;Use Databricks Jobs Scheduler to schedule your streaming jobs during off-peak hours if possible. This can help balance the load on the production cluster.&lt;/LI&gt;
&lt;/UL&gt;
&lt;/LI&gt;
&lt;/OL&gt;
&lt;H3 class="_1jeaq5e0 _1t7bu9h9 heading3"&gt;Code Optimization&lt;/H3&gt;
&lt;OL&gt;
&lt;LI&gt;
&lt;P class="_1t7bu9h1 paragraph"&gt;&lt;STRONG&gt;Optimize Memory Usage&lt;/STRONG&gt;:&lt;/P&gt;
&lt;UL class="_1t7bu9h7 _1t7bu9h2"&gt;
&lt;LI&gt;&lt;STRONG&gt;Reduce Batch Size&lt;/STRONG&gt;: Lower the batch size for your streaming queries to reduce memory usage. You can do this by adjusting the &lt;CODE&gt;trigger&lt;/CODE&gt; interval or using the &lt;CODE&gt;maxFilesPerTrigger&lt;/CODE&gt; option in Auto Loader.&lt;/LI&gt;
&lt;LI&gt;&lt;STRONG&gt;Use Efficient Data Types&lt;/STRONG&gt;: Ensure that you are using the most efficient data types for your columns. For example, use &lt;CODE&gt;DecimalType&lt;/CODE&gt; instead of &lt;CODE&gt;DoubleType&lt;/CODE&gt; for precise numeric values.&lt;/LI&gt;
&lt;/UL&gt;
&lt;/LI&gt;
&lt;LI&gt;
&lt;P class="_1t7bu9h1 paragraph"&gt;&lt;STRONG&gt;Optimize Transformations&lt;/STRONG&gt;:&lt;/P&gt;
&lt;UL class="_1t7bu9h7 _1t7bu9h2"&gt;
&lt;LI&gt;&lt;STRONG&gt;Filter Early&lt;/STRONG&gt;: Apply filters as early as possible in your data processing pipeline to reduce the amount of data being processed. For example, filter the data in &lt;CODE&gt;dfBronze&lt;/CODE&gt; before writing it to the Delta table.&lt;/LI&gt;
&lt;LI&gt;&lt;STRONG&gt;Avoid Unnecessary Transformations&lt;/STRONG&gt;: Review your transformations to ensure they are necessary and efficient. For example, avoid using &lt;CODE&gt;withColumn&lt;/CODE&gt; multiple times if you can achieve the same result with a single transformation.&lt;/LI&gt;
&lt;/UL&gt;
&lt;/LI&gt;
&lt;LI&gt;
&lt;P class="_1t7bu9h1 paragraph"&gt;&lt;STRONG&gt;Cache Management&lt;/STRONG&gt;:&lt;/P&gt;
&lt;UL class="_1t7bu9h7 _1t7bu9h2"&gt;
&lt;LI&gt;
&lt;P class="_1t7bu9h1 paragraph"&gt;&lt;SPAN&gt;&lt;STRONG&gt;Delta Cache&lt;/STRONG&gt;: Enable Delta cache to accelerate data reads by creating copies of remote files in nodes’ local storage. This can be done by setting &lt;CODE&gt;spark.databricks.io.cache.enabled = true&lt;/CODE&gt;.&lt;/SPAN&gt;&lt;/P&gt;
&lt;/LI&gt;
&lt;LI&gt;
&lt;P class="_1t7bu9h1 paragraph"&gt;&lt;SPAN&gt;&lt;STRONG&gt;Spark Cache&lt;/STRONG&gt;: Use &lt;CODE&gt;cache()&lt;/CODE&gt; or &lt;CODE&gt;persist()&lt;/CODE&gt; methods to cache intermediate DataFrame computations if they are reused in subsequent actions. This can help reduce redundant computations and improve performance.&lt;/SPAN&gt;&lt;/P&gt;
&lt;/LI&gt;
&lt;LI&gt;
&lt;P class="_1t7bu9h1 paragraph"&gt;&lt;STRONG&gt;Unpersisting Data&lt;/STRONG&gt;: Ensure that you unpersist cached data that is no longer needed using &lt;CODE&gt;unpersist()&lt;/CODE&gt; to free up memory.&lt;/P&gt;
&lt;/LI&gt;
&lt;LI&gt;
&lt;P class="_1t7bu9h1 paragraph"&gt;&lt;STRONG&gt;Efficient Storage Levels&lt;/STRONG&gt;: Configure the cache settings to use a more efficient storage level, such as &lt;CODE&gt;MEMORY_AND_DISK&lt;/CODE&gt;, to balance memory usage and performance.&lt;/P&gt;
&lt;/LI&gt;
&lt;/UL&gt;
&lt;/LI&gt;
&lt;/OL&gt;
&lt;H3 class="_1jeaq5e0 _1t7bu9h9 heading3"&gt;Example Code Adjustments&lt;/H3&gt;
&lt;P class="_1t7bu9h1 paragraph"&gt;Here are some specific adjustments you can make to your code:&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;LI-CODE lang="python"&gt;# Optimize memory usage by reducing batch size
dfBronze = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("maxFilesPerTrigger", 10)  # Adjust this value based on your workload
    .schema(json_schema_bronze)
    .load("s3://bucket")
    .withColumn("filename", col("_metadata.file_path"))
    .withColumn("ingestion_time_utc", from_utc_timestamp(current_timestamp(), "UTC"))
    .withColumn('data_parsed', from_json(col("data"), json_schema_silver["data"].dataType))
)

# Apply early filtering
dfBronze_filtered = dfBronze.filter(col("some_column") == "some_value")

dfBronze_filtered.writeStream \
    .format("delta") \
    .option("checkpointLocation", checkpoint_dir_path_bronze) \
    .outputMode("append") \
    .trigger(processingTime="120 second") \
    .table(bronze_table)

dfSilver = (spark.readStream
    .format("delta")
    .table(bronze_table)
)

# Apply transformations efficiently
dfSilver_transformed = dfSilver.select(
    col("site"),
    col("device_time_utc"),
    col("data.energy.value").alias("energy_cumulative_active_value").cast("decimal(30,14)"),
    col("data.energy.unit").alias("energy_cumulative_active_unit"),
    # ... more transformations
)

dfSilver_transformed.writeStream \
    .option("checkpointLocation", checkpoint_dir_path_silver) \
    .outputMode("append") \
    .trigger(processingTime="120 second") \
    .toTable(silver_table)&lt;/LI-CODE&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
    <pubDate>Thu, 31 Oct 2024 17:33:49 GMT</pubDate>
    <dc:creator>VZLA</dc:creator>
    <dc:date>2024-10-31T17:33:49Z</dc:date>
    <item>
      <title>SakuraDev1 / Board: data-engineering (39000)</title>
      <link>https://community.databricks.com/t5/data-engineering/sakuradev1-board-data-engineering-39000/m-p/94942#M39017</link>
      <description>&lt;P&gt;Link to post: &lt;A href="https://community.databricks.com/t5/data-engineering/autoloader-cache-and-buffer-utilization-error/m-p/94927#M39000" target="_blank" rel="noopener"&gt;(autoloader cache and buffer utilization error)&lt;/A&gt;&lt;BR /&gt;by &lt;A href="https://community.databricks.com/t5/user/viewprofilepage/user-id/127421" target="_blank" rel="noopener"&gt;SakuraDev1&lt;/A&gt;&lt;/P&gt;
&lt;P&gt;&lt;BR /&gt;&lt;A href="https://community.databricks.com/t5/data-engineering/autoloader-cache-and-buffer-utilization-error/m-p/94927#M39000" target="_blank" rel="noopener"&gt;https://community.databricks.com/t5/data-engineering/autoloader-cache-and-buffer-utilization-error/m-p/94927#M39000&lt;/A&gt;&lt;/P&gt;
&lt;P&gt;&lt;BR /&gt;Hey guys, I'm encountering an issue with a project that uses Auto Loader for data ingestion. The production cluster is shutting down due to the error: The Driver restarted - possibly due to an OutOfMemoryError - and this stream has been stopped. I’ve identified that the issue is related to a spike in cache and buffer utilization in the production environment. Interestingly, this doesn’t happen in the development cluster, despite higher utilization there, and it never fails. prd cluster: dev cluster: Since multiple projects are running on the same production cluster (but not in dev), I need to find a way to reduce resource consumption for my project without affecting overall cluster performance. Any advice or suggestions on how to manage this would be greatly appreciated. &amp;nbsp;&lt;/P&gt;
&lt;P&gt;dfBronze = (spark.readStream .format("cloudFiles")&lt;BR /&gt;&lt;SPAN&gt;.option("cloudFiles.format", "json")&lt;BR /&gt;&lt;/SPAN&gt;.schema(json_schema_bronze)&lt;BR /&gt;.load("s3://bucket")&lt;BR /&gt;&lt;SPAN&gt;.withColumn("filename", col("_metadata.file_path"))&lt;BR /&gt;&lt;/SPAN&gt;.withColumn("ingestion_time_utc", from_utc_timestamp(current_timestamp(), "UTC"))&lt;BR /&gt;&lt;SPAN&gt;.withColumn('data_parsed', from_json(col("data"), json_schema_silver["data"].dataType) )&lt;BR /&gt;&lt;/SPAN&gt;)&lt;/P&gt;
&lt;P&gt;dfBronze.writeStream \&lt;BR /&gt;.format("delta") \&lt;BR /&gt;&lt;SPAN&gt;.option("checkpointLocation", checkpoint_dir_path_bronze) \&lt;BR /&gt;&lt;/SPAN&gt;.outputMode("append") \&lt;BR /&gt;&lt;SPAN&gt;.trigger(processingTime="120 second") \&lt;BR /&gt;&lt;/SPAN&gt;.table(bronze_table)&lt;/P&gt;
&lt;P&gt;dfSilver = (spark.readStream .format("delta") .table(bronze_table) )&lt;BR /&gt;&lt;SPAN&gt;dfSilver = dfSilver_filtered.select( col("site"),&lt;BR /&gt;&lt;/SPAN&gt;col("device_time_utc"),&lt;BR /&gt;&lt;SPAN&gt;col("data.energy.value").alias("energy_cumulative_active_value").cast("decimal(30,14)"), col("data.energy.unit").alias("energy_cumulative_active_unit"), .... more transformations&lt;BR /&gt;&lt;/SPAN&gt;)&lt;/P&gt;
&lt;P&gt;(dfSilver.writeStream&lt;BR /&gt;&lt;SPAN&gt;.option("checkpointLocation", checkpoint_dir_path_silver)&lt;BR /&gt;&lt;/SPAN&gt;.outputMode("append")&lt;BR /&gt;&lt;SPAN&gt;.trigger(processingTime="120 second")&lt;BR /&gt;&lt;/SPAN&gt;.toTable(silver_table))&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Sat, 19 Oct 2024 00:44:36 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/sakuradev1-board-data-engineering-39000/m-p/94942#M39017</guid>
      <dc:creator>SakuraDev1</dc:creator>
      <dc:date>2024-10-19T00:44:36Z</dc:date>
    </item>
    <item>
      <title>Re: SakuraDev1 / Board: data-engineering (39000)</title>
      <link>https://community.databricks.com/t5/data-engineering/sakuradev1-board-data-engineering-39000/m-p/97073#M39414</link>
      <description>&lt;P class="_1t7bu9h1 paragraph"&gt;To address the resource scheduling and code-specific optimizations for your Auto Loader data ingestion pipeline, consider the following suggestions:&lt;/P&gt;
&lt;H3 class="_1jeaq5e0 _1t7bu9h9 heading3"&gt;Resource Scheduling&lt;/H3&gt;
&lt;OL&gt;
&lt;LI&gt;
&lt;P class="_1t7bu9h1 paragraph"&gt;&lt;STRONG&gt;Dynamic Allocation&lt;/STRONG&gt;:&lt;/P&gt;
&lt;UL class="_1t7bu9h7 _1t7bu9h2"&gt;
&lt;LI&gt;Enable dynamic allocation in your cluster configuration. This allows Spark to dynamically adjust the number of executors based on the workload, which can help manage resource utilization more efficiently.&lt;/LI&gt;
&lt;/UL&gt;
&lt;/LI&gt;
&lt;LI&gt;
&lt;P class="_1t7bu9h1 paragraph"&gt;&lt;STRONG&gt;Cluster Policies&lt;/STRONG&gt;:&lt;/P&gt;
&lt;UL class="_1t7bu9h7 _1t7bu9h2"&gt;
&lt;LI&gt;Implement cluster policies to control resource allocation. For example, you can set limits on the maximum number of concurrent jobs or the amount of memory and CPU each job can use. This helps prevent any single job from consuming excessive resources.&lt;/LI&gt;
&lt;/UL&gt;
&lt;/LI&gt;
&lt;LI&gt;
&lt;P class="_1t7bu9h1 paragraph"&gt;&lt;STRONG&gt;Job Scheduling&lt;/STRONG&gt;:&lt;/P&gt;
&lt;UL class="_1t7bu9h7 _1t7bu9h2"&gt;
&lt;LI&gt;Use Databricks Jobs Scheduler to schedule your streaming jobs during off-peak hours if possible. This can help balance the load on the production cluster.&lt;/LI&gt;
&lt;/UL&gt;
&lt;/LI&gt;
&lt;/OL&gt;
&lt;H3 class="_1jeaq5e0 _1t7bu9h9 heading3"&gt;Code Optimization&lt;/H3&gt;
&lt;OL&gt;
&lt;LI&gt;
&lt;P class="_1t7bu9h1 paragraph"&gt;&lt;STRONG&gt;Optimize Memory Usage&lt;/STRONG&gt;:&lt;/P&gt;
&lt;UL class="_1t7bu9h7 _1t7bu9h2"&gt;
&lt;LI&gt;&lt;STRONG&gt;Reduce Batch Size&lt;/STRONG&gt;: Lower the batch size for your streaming queries to reduce memory usage. You can do this by adjusting the &lt;CODE&gt;trigger&lt;/CODE&gt; interval or using the &lt;CODE&gt;maxFilesPerTrigger&lt;/CODE&gt; option in Auto Loader.&lt;/LI&gt;
&lt;LI&gt;&lt;STRONG&gt;Use Efficient Data Types&lt;/STRONG&gt;: Ensure that you are using the most efficient data types for your columns. For example, use &lt;CODE&gt;DecimalType&lt;/CODE&gt; instead of &lt;CODE&gt;DoubleType&lt;/CODE&gt; for precise numeric values.&lt;/LI&gt;
&lt;/UL&gt;
&lt;/LI&gt;
&lt;LI&gt;
&lt;P class="_1t7bu9h1 paragraph"&gt;&lt;STRONG&gt;Optimize Transformations&lt;/STRONG&gt;:&lt;/P&gt;
&lt;UL class="_1t7bu9h7 _1t7bu9h2"&gt;
&lt;LI&gt;&lt;STRONG&gt;Filter Early&lt;/STRONG&gt;: Apply filters as early as possible in your data processing pipeline to reduce the amount of data being processed. For example, filter the data in &lt;CODE&gt;dfBronze&lt;/CODE&gt; before writing it to the Delta table.&lt;/LI&gt;
&lt;LI&gt;&lt;STRONG&gt;Avoid Unnecessary Transformations&lt;/STRONG&gt;: Review your transformations to ensure they are necessary and efficient. For example, avoid using &lt;CODE&gt;withColumn&lt;/CODE&gt; multiple times if you can achieve the same result with a single transformation.&lt;/LI&gt;
&lt;/UL&gt;
&lt;/LI&gt;
&lt;LI&gt;
&lt;P class="_1t7bu9h1 paragraph"&gt;&lt;STRONG&gt;Cache Management&lt;/STRONG&gt;:&lt;/P&gt;
&lt;UL class="_1t7bu9h7 _1t7bu9h2"&gt;
&lt;LI&gt;
&lt;P class="_1t7bu9h1 paragraph"&gt;&lt;SPAN&gt;&lt;STRONG&gt;Delta Cache&lt;/STRONG&gt;: Enable Delta cache to accelerate data reads by creating copies of remote files in nodes’ local storage. This can be done by setting &lt;CODE&gt;spark.databricks.io.cache.enabled = true&lt;/CODE&gt;.&lt;/SPAN&gt;&lt;/P&gt;
&lt;/LI&gt;
&lt;LI&gt;
&lt;P class="_1t7bu9h1 paragraph"&gt;&lt;SPAN&gt;&lt;STRONG&gt;Spark Cache&lt;/STRONG&gt;: Use &lt;CODE&gt;cache()&lt;/CODE&gt; or &lt;CODE&gt;persist()&lt;/CODE&gt; methods to cache intermediate DataFrame computations if they are reused in subsequent actions. This can help reduce redundant computations and improve performance.&lt;/SPAN&gt;&lt;/P&gt;
&lt;/LI&gt;
&lt;LI&gt;
&lt;P class="_1t7bu9h1 paragraph"&gt;&lt;STRONG&gt;Unpersisting Data&lt;/STRONG&gt;: Ensure that you unpersist cached data that is no longer needed using &lt;CODE&gt;unpersist()&lt;/CODE&gt; to free up memory.&lt;/P&gt;
&lt;/LI&gt;
&lt;LI&gt;
&lt;P class="_1t7bu9h1 paragraph"&gt;&lt;STRONG&gt;Efficient Storage Levels&lt;/STRONG&gt;: Configure the cache settings to use a more efficient storage level, such as &lt;CODE&gt;MEMORY_AND_DISK&lt;/CODE&gt;, to balance memory usage and performance.&lt;/P&gt;
&lt;/LI&gt;
&lt;/UL&gt;
&lt;/LI&gt;
&lt;/OL&gt;
&lt;H3 class="_1jeaq5e0 _1t7bu9h9 heading3"&gt;Example Code Adjustments&lt;/H3&gt;
&lt;P class="_1t7bu9h1 paragraph"&gt;Here are some specific adjustments you can make to your code:&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;LI-CODE lang="python"&gt;# Optimize memory usage by reducing batch size
dfBronze = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("maxFilesPerTrigger", 10)  # Adjust this value based on your workload
    .schema(json_schema_bronze)
    .load("s3://bucket")
    .withColumn("filename", col("_metadata.file_path"))
    .withColumn("ingestion_time_utc", from_utc_timestamp(current_timestamp(), "UTC"))
    .withColumn('data_parsed', from_json(col("data"), json_schema_silver["data"].dataType))
)

# Apply early filtering
dfBronze_filtered = dfBronze.filter(col("some_column") == "some_value")

dfBronze_filtered.writeStream \
    .format("delta") \
    .option("checkpointLocation", checkpoint_dir_path_bronze) \
    .outputMode("append") \
    .trigger(processingTime="120 second") \
    .table(bronze_table)

dfSilver = (spark.readStream
    .format("delta")
    .table(bronze_table)
)

# Apply transformations efficiently
dfSilver_transformed = dfSilver.select(
    col("site"),
    col("device_time_utc"),
    col("data.energy.value").alias("energy_cumulative_active_value").cast("decimal(30,14)"),
    col("data.energy.unit").alias("energy_cumulative_active_unit"),
    # ... more transformations
)

dfSilver_transformed.writeStream \
    .option("checkpointLocation", checkpoint_dir_path_silver) \
    .outputMode("append") \
    .trigger(processingTime="120 second") \
    .toTable(silver_table)&lt;/LI-CODE&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Thu, 31 Oct 2024 17:33:49 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/sakuradev1-board-data-engineering-39000/m-p/97073#M39414</guid>
      <dc:creator>VZLA</dc:creator>
      <dc:date>2024-10-31T17:33:49Z</dc:date>
    </item>
  </channel>
</rss>

