cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Observable API and Delta Table merge

Malthe
Valued Contributor II

Using the Observable API on the source dataframe to a Delta Table merge seems to hang indefinitely.

Steps to reproduce:

  1. Create one or more pyspark.sql.Observation objects.
  2. Use DataFrame.observe on the merge source.
  3. Run merge.
  4. Accessing Observation.get blocks indefinitely.

The source dataframe here is a batch dataframe, executed within the foreachBatch framework on a streaming data source.

Is the Observable API not compatible with Delta Table merges?

1 REPLY 1

AnthonyAnand
Databricks Partner
Hi @Malthe,
 
You have hit a very specific, known behavioral gap in how Apache Spark and Delta Lake interact.

To answer your question directly: Yes, the Observable API is effectively incompatible with Delta Table merges when used directly.

Why It Hangs Indefinitely
The deadlock you are experiencing boils down to how Delta Lake plans its queries versus how Spark listens for metrics:
  1. How Observation works: The pyspark.sql.Observation object relies on standard Spark actions (like .collect(), .count(), or .write()) to complete. When these actions finish, they trigger a background physical query execution event that populates your observation object.
  2. How Delta MERGE works: A Delta MERGE is not processed as a standard Spark action. Internally, the Delta engine intercepts the logical plan, heavily modifies it to figure out matching/non-matching rows, and executes custom physical writes.
  3. The Clash: During this plan rewriting, the logical node attached by .observe() often gets stripped out or fails to trigger the expected listener event.
  4. The Hang: Because the background listener never receives the signal that the data flowed through, Observation.get defaults to its fallback behavior: waiting forever.
This is magnified inside foreachBatch because you are dealing with a static micro-batch DataFrame, but the core issue remains the Delta execution plan itself.

Feel free to add more info if i have misunderstood your issue or for a workaround.