cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Upsert from Databricks to CosmosDB

William_Scardua
Valued Contributor

Hi guys,

I'm adjusting a data upsert process from Databricks to CosmosDB using the .jar connector. As the load is very large, do you know if it's possible to change only the fields that have been modified?

best regards

1 REPLY 1

BigRoux
Databricks Employee
Databricks Employee

Yes, you can update only the modified fields in your Cosmos DB documents from Databricks using the Partial Document Update feature (also known as Patch API). This is particularly useful for large documents where sending the entire document for updates is inefficient.

Using Partial Document Update from Databricks

The standard Databricks to Cosmos DB connector with `mode("overwrite")` and `"Upsert": "true"` will replace the entire document, which is causing your issue. Instead, you can:

1. Use the Cosmos DB SDK directly within Databricks to perform partial updates:

```python
from azure.cosmos import CosmosClient, PartitionKey

Connect to your Cosmos DB account
client = CosmosClient(endpoint="https://your-account.documents.azure.com:443/", credential="your-key")
database = client.get_database_client("mydatabase")
container = database.get_container_client("mycontainer")

Define patch operations
operations = [
{"op": "replace", "path": "/field1", "value": "new_value"},
{"op": "add", "path": "/newField", "value": 123}
]

Apply patch operations
container.patch_item(item="document_id", partition_key="partition_key_value", patch_operations=operations)
```

2. For batch processing, you can use Databricks to identify changed fields and then construct patch operations:

```python
# Process in batches
def patch_document(row):
doc_id = row["id"]
partition_key = row["partitionKey"]

# Determine which fields changed
operations = []
for field in changed_fields:
operations.append({"op": "replace", "path": f"/{field}", "value": row[field]})

# Apply patch
container.patch_item(item=doc_id, partition_key=partition_key, patch_operations=operations)

# Apply to your dataframe
df.foreach(patch_document)
```

## Benefits of Partial Updates

1. Reduced network traffic - only sending changed fields
2. Lower RU consumption - partial updates typically cost less than full replacements
3. Avoids concurrency issues - no need to read-modify-write the entire document
4. Preserves existing fields - other properties remain untouched

This approach is much more efficient than the current connector-based approach where the entire document is replaced, causing fields to be deleted when they're not included in the new document.

For very large datasets, you might need to implement batching and error handling to manage the process efficiently.

 

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now