cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Concurrency issue with append only writed

EDDatabricks
Contributor

Dear all,

We have a pyspark streaming job (DBR: 14.3) that continuously writes new data on a Delta Table (TableA).

On this table, there is a pyspark batch job (DBR: 14.3) that operates every 15 minuted and in some cases it may delete some records from TableA using a merge statement (WhenMatchedDelete construct with a business related condition). This condition identifies specific records that are already present in TableA and it is impossible to match with records that are written during query execution.

Periodically, we get exceptions due to concurrency issues with the following trace:

 

{
  "timestamp":1711718436569,
  "userId":"xxx",
  "userName":"xxx",
  "operation":"STREAMING UPDATE",
  "operationParameters":{
    "outputMode":"Append",
    "queryId":xxx,
    "epochId":xxx,
    "statsOnLoad":false
  },
  "job":{
  "jobId":"xxx",
  "jobName":"xxx",
  "jobRunId":"xxx",
  "runId":"xxx",
  "jobOwnerId":"xxx",
  "triggerType":"manual"
  },
  "notebook":{
    "notebookId":"xxx"
  },
  "clusterId":"xxx",
  "readVersion":21835,
  "isolationLevel":"WriteSerializable",
  "isBlindAppend":true,
  "operationMetrics":{
    "numRemovedFiles":"0",
    "numOutputRows":"50",
    "numOutputBytes":"25199",
    "numAddedFiles":"1"
  },
  "tags":{
    "restoresDeletedRows":"false",
    "delta.rowTracking.preserved":"true"
  },
  "engineInfo":"Databricks-Runtime/14.3.x-scala2.12",
  "txnId":"1eb445f1-2abd-4b6c-a8f0-3ccc1f2474f2"
}

 

 

As mentioned in this page: https://learn.microsoft.com/en-us/azure/databricks/optimizations/isolation-level, there should be no concurrency issue. Even after checking the limitations page (https://learn.microsoft.com/en-us/azure/databricks/optimizations/isolation-level#rlc-limitations) we follow all conditions
- No complex conditional clauses
- Explicit predicates that uniquely identify each record to be deleted

We suspect that this may have to do with this option: "delta.rowTracking.preserved":"true" but we were not able to locate any documentation about what this by default True configuration does. (only some github requests)

Please propose any mitigation actions for solving this concurrency issues as there may be some data inconsistencies to our ETL pipelines due to the aforementioned failures. Furthermore, this issue is not highlighted in the documentation per our understanding.

Thanks a-priori for your help.

Kind regards,
the European Dynamics team

2 REPLIES 2

Kaniz
Community Manager
Community Manager

Hi @EDDatabricks

Thank you for providing the details about your PySpark streaming and batch jobs operating on a Delta Table. 

The concurrency issue you’re encountering seems to be related to the deletion of records from your Delta Table (TableA) during the batch job execution. Specifically, the merge statement with the WhenMatchedDelete construct is causing exceptions.

  • To address the concurrency issue, consider the following steps:
    • Review Deletion Logic: Double-check the merge statement’s deletion logic. Ensure that it correctly identifies the records to be deleted.
    • Row Tracking: Since delta.rowTracking.preserved is set to “true,” it’s worth understanding its impact. Unfortunately, detailed documentation is lacking.
    • Change Data Feed (CDF): Explore the Change Data Feed (CDF) feature. CDF allows Delta tables to track row-level changes between versions, which might help mitigate concurrency issues.

Thanks for the fast reply @Kaniz .

What we are unable to understand is that in the documentation it is specifically mentioned that
writes can not have issues in write serializable.

EDDatabricks_0-1712060583490.png

(https://learn.microsoft.com/en-us/azure/databricks/optimizations/isolation-level)

Kind regards,

The European Dynamics team