cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Streaming data to CosmosDB

Antoine_De_A
New Contributor III

Hello everyone,

Here is the problem I am facing. I'm currently working on streaming data to DataBricks, my goal is to create a data stream on a first notebook, and then on a second notebook to read this data stream, add all the new rows to a dataFrame and finally write the rows as it happens on my CosmosDB instance.

For the first notebook no problem, I add rows in a temporary file in DBFS (annex1).

On my second notebook a readStream() function allows me to retrieve in real time the different lines that are added to my file and then go and add them to a dataFrame called stream, this part is also functional (annex2).

My problem is on the second function of this notebook, I manage to save the lines which are added progressively on my dataframe in a temporary file, but I would like to save these new lines on a CosmosDB instance which runs with a CoreSQL API (annex3). But I can't write writeStream() correctly. Do you have any idea why it doesn't work? (annex4)

Note that I can listen to all the actions that are done on a CosmosDB container on another notebook, so I'm sure it's possible to insert data (annex5).

1 ACCEPTED SOLUTION

Accepted Solutions

Antoine_De_A
New Contributor III

Problem solved!

Instead of trying to do everything directly with the .writeStream options I used the .forEachBatch() function which allows me to call a function outside the .writeStream().

In this function I get a dataFrame in parameter which is my stream dataFrame and I save it directly in CosmosDB with the save() function and my configuration.

View solution in original post

2 REPLIES 2

Antoine_De_A
New Contributor III

Problem solved!

Instead of trying to do everything directly with the .writeStream options I used the .forEachBatch() function which allows me to call a function outside the .writeStream().

In this function I get a dataFrame in parameter which is my stream dataFrame and I save it directly in CosmosDB with the save() function and my configuration.

Hi @Antoine De Araujo​, Thank you so much for your update. Would you mind marking your answer as the best, as it would help the community find the best solution?

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.