cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Dataframe is getting empty during execution of daily job with random pattern

M_S
New Contributor II

Hello, I have a daily ETL job that adds new records to a table for the previous day. However, from time to time, it does not produce any output.

After investigating, I discovered that one table is sometimes loaded as empty during execution. As a result, no new records are ingested.

It appears that this dataset is being read from the cache—I found evidence of this in SparkUI. This is interesting because we are not explicitly using .persist or .cache on any dataset, so it is likely done automatically.

To me, it seems that Spark attempts to load these records from Parquet but instead retrieves them from the cache, which returns an empty dataset.

M_S_0-1746605849738.png

 

1) Is there a chance that an automatic caching mechanism is interfering with my dataset?
2) Is there a chance that the cache contains version X of the Delta table, while storage already has version X+1?
3) Why is Spark reading an empty dataset from the cache? I thought that when a given DataFrame does not exist in the cache, it should be reloaded. I never expected an empty relation from `InMemoryTableScan`.

 

 

 

2 REPLIES 2

BigRoux
Databricks Employee
Databricks Employee

Here are some considerations/ideas you might be interested in:

  1. Automatic Caching Mechanism Interference: Yes, there is a possibility that an automatic caching mechanism interferes with your dataset. Spark employs several caching mechanisms:
    • The DataFrame cache in Spark SQL APIs stores DataFrame/Dataset data in memory when .cache() or .persist() is explicitly called.
    • The Disk Cache or Delta Cache (DBIO Cache) automatically caches Parquet or Delta files on the local storage of executors for improved performance. This feature is enabled by default on certain clusters and can be controlled via the spark.databricks.io.cache.enabled Spark configuration parameter.
    If this automatic caching mechanism refuses to update or invalidate correctly, stale data (empty or older versions) might be retrieved during jobs.
  2. Cache Containing Version X While Storage is at X+1: Yes, this can happen when the underlying cache is not cleared or invalidated after an update. For Delta tables, the disk cache mainly relies on file timestamps and metadata to determine cache invalidation. If updates to Delta tables are not reflected correctly in the cache, such scenarios may arise. It is advised to explicitly run REFRESH TABLE or spark.catalog.uncacheTable("table_name") after updates to ensure cache invalidation and syncing with the latest consistent state.
  3. Empty Dataset from Cache Unexpectedly: Spark reads an empty dataset from the cache when the cache holds no valid data for the requested operation, possibly due to:
    • Stale or corrupted cached data linked to previous operations.
    • Inconsistent cache state due to automatic caching mechanisms failing to handle updates to Delta tables.
    • Misalignment between cached metadata and subsequent queries.
    The InMemoryTableScan you observe indicates that Spark is scanning cached in-memory data. If a DataFrame was empty when it was previously cached, Spark would return the empty dataset on subsequent accesses. Without explicit invalidation or clearing (e.g., using .unpersist(true)), Spark might not reload it from disk/storage even when newer data exists.
### Recommendations: - Forcing Cache Invalidation: Add a REFRESH TABLE command in your ETL logic after updates to Delta tables. This ensures Spark reloads the table metadata and reflects the latest table state. - Disabling Automatic Disk Cache: If you suspect the automatic disk (DBIO cache) is causing issues, try disabling it with spark.databricks.io.cache.enabled = false in your cluster’s Spark configurations. Alternatively, control behaviors with properties like spark.databricks.io.cache.maxDiskUsage to limit caching. - Clear Stale Cache: Use spark.catalog.clearCache() or .unpersist() in your code to clear stale cached datasets explicitly before querying further.
 
 
Are you using Delta Tables here? If so are they managed or external?
 
Cheers, Louis.

M_S
New Contributor II

Thank you very much, @BigRoux , for such a detailed and insightful answer!

All tables used in this processing are managed Delta tables loaded through Unity Catalog.

I will try running it with spark.databricks.io.cache.enabled set to false just to see if the execution plan looks different. I believe we previously tried using REFRESH, but I will attempt to use it again directly after writing changes to the problematic table.

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now