cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Streaming data to CosmosDB

Antoine_De_A
New Contributor III

Hello everyone,

Here is the problem I am facing. I'm currently working on streaming data to DataBricks, my goal is to create a data stream on a first notebook, and then on a second notebook to read this data stream, add all the new rows to a dataFrame and finally write the rows as it happens on my CosmosDB instance.

For the first notebook no problem, I add rows in a temporary file in DBFS (annex1).

On my second notebook a readStream() function allows me to retrieve in real time the different lines that are added to my file and then go and add them to a dataFrame called stream, this part is also functional (annex2).

My problem is on the second function of this notebook, I manage to save the lines which are added progressively on my dataframe in a temporary file, but I would like to save these new lines on a CosmosDB instance which runs with a CoreSQL API (annex3). But I can't write writeStream() correctly. Do you have any idea why it doesn't work? (annex4)

Note that I can listen to all the actions that are done on a CosmosDB container on another notebook, so I'm sure it's possible to insert data (annex5).

1 ACCEPTED SOLUTION

Accepted Solutions

Antoine_De_A
New Contributor III

Problem solved!

Instead of trying to do everything directly with the .writeStream options I used the .forEachBatch() function which allows me to call a function outside the .writeStream().

In this function I get a dataFrame in parameter which is my stream dataFrame and I save it directly in CosmosDB with the save() function and my configuration.

View solution in original post

1 REPLY 1

Antoine_De_A
New Contributor III

Problem solved!

Instead of trying to do everything directly with the .writeStream options I used the .forEachBatch() function which allows me to call a function outside the .writeStream().

In this function I get a dataFrame in parameter which is my stream dataFrame and I save it directly in CosmosDB with the save() function and my configuration.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group