Upsert from Databricks to CosmosDB
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
3 weeks ago
Hi guys,
I'm adjusting a data upsert process from Databricks to CosmosDB using the .jar connector. As the load is very large, do you know if it's possible to change only the fields that have been modified?
best regards
- Labels:
-
Spark
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
3 weeks ago
Yes, you can update only the modified fields in your Cosmos DB documents from Databricks using the Partial Document Update feature (also known as Patch API). This is particularly useful for large documents where sending the entire document for updates is inefficient.
Using Partial Document Update from Databricks
The standard Databricks to Cosmos DB connector with `mode("overwrite")` and `"Upsert": "true"` will replace the entire document, which is causing your issue. Instead, you can:
1. Use the Cosmos DB SDK directly within Databricks to perform partial updates:
```python
from azure.cosmos import CosmosClient, PartitionKey
Connect to your Cosmos DB account
client = CosmosClient(endpoint="https://your-account.documents.azure.com:443/", credential="your-key")
database = client.get_database_client("mydatabase")
container = database.get_container_client("mycontainer")
Define patch operations
operations = [
{"op": "replace", "path": "/field1", "value": "new_value"},
{"op": "add", "path": "/newField", "value": 123}
]
Apply patch operations
container.patch_item(item="document_id", partition_key="partition_key_value", patch_operations=operations)
```
2. For batch processing, you can use Databricks to identify changed fields and then construct patch operations:
```python
# Process in batches
def patch_document(row):
doc_id = row["id"]
partition_key = row["partitionKey"]
# Determine which fields changed
operations = []
for field in changed_fields:
operations.append({"op": "replace", "path": f"/{field}", "value": row[field]})
# Apply patch
container.patch_item(item=doc_id, partition_key=partition_key, patch_operations=operations)
# Apply to your dataframe
df.foreach(patch_document)
```
## Benefits of Partial Updates
1. Reduced network traffic - only sending changed fields
2. Lower RU consumption - partial updates typically cost less than full replacements
3. Avoids concurrency issues - no need to read-modify-write the entire document
4. Preserves existing fields - other properties remain untouched
This approach is much more efficient than the current connector-based approach where the entire document is replaced, causing fields to be deleted when they're not included in the new document.
For very large datasets, you might need to implement batching and error handling to manage the process efficiently.

