- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
06-09-2022 06:47 AM
Hello everyone,
Here is the problem I am facing. I'm currently working on streaming data to DataBricks, my goal is to create a data stream on a first notebook, and then on a second notebook to read this data stream, add all the new rows to a dataFrame and finally write the rows as it happens on my CosmosDB instance.
For the first notebook no problem, I add rows in a temporary file in DBFS (annex1).
On my second notebook a readStream() function allows me to retrieve in real time the different lines that are added to my file and then go and add them to a dataFrame called stream, this part is also functional (annex2).
My problem is on the second function of this notebook, I manage to save the lines which are added progressively on my dataframe in a temporary file, but I would like to save these new lines on a CosmosDB instance which runs with a CoreSQL API (annex3). But I can't write writeStream() correctly. Do you have any idea why it doesn't work? (annex4)
Note that I can listen to all the actions that are done on a CosmosDB container on another notebook, so I'm sure it's possible to insert data (annex5).
- Labels:
-
Azure-cosmosdb-sqlapi
-
Dataframe
-
Streaming
Accepted Solutions
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
06-09-2022 09:14 AM
Problem solved!
Instead of trying to do everything directly with the .writeStream options I used the .forEachBatch() function which allows me to call a function outside the .writeStream().
In this function I get a dataFrame in parameter which is my stream dataFrame and I save it directly in CosmosDB with the save() function and my configuration.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
06-09-2022 09:14 AM
Problem solved!
Instead of trying to do everything directly with the .writeStream options I used the .forEachBatch() function which allows me to call a function outside the .writeStream().
In this function I get a dataFrame in parameter which is my stream dataFrame and I save it directly in CosmosDB with the save() function and my configuration.