11-11-2022 02:00 AM
I would need to add a filter condition while ingesting data from a Cosmos Mongo DB using Databricks,
I am using the below query to ingest data of a Cosmos Collection:
df = spark.read \
.format('com.mongodb.spark.sql.DefaultSource') \
.option('uri', sourceCosmosConnectionString) \
.option('database', sourceCosmosDocument) \
.option('collection', sourceCosmosCollection) \
.load()
How can I add a filter here to pick only selected data? Eg: I only want to ingest data where {"type" : "student"}
I would really appreciate if anyone can help in this
I gave a try with the below query but getting error as below:
query = {"type" : "student"}
df = spark.read \
.format('com.mongodb.spark.sql.DefaultSource') \
.option('uri', sourceCosmosConnectionString) \
.option('database', sourceCosmosDocument) \
.option('collection', sourceCosmosCollection) \
.option('pipeline', json.dumps(query)) \
.load()
Error:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 16.0 failed 4 times, most recent failure: Lost task 0.3 in stage 16.0 (TID 34) (10.139.64.5 executor 0): com.mongodb.MongoCommandException: Command failed with error 40324 (40324): 'Unrecognized pipeline stage name: type' on server xxxxxxx-xxxxx.mongo.cosmos.azure.com:10255. The full response is {"ok": 0.0, "errmsg": "Unrecognized pipeline stage name: type", "code": 40324, "codeName": "40324"}
11-13-2022 02:00 PM
Hi @Swapnil Sarkar, The error message means the stage name in your aggregation pipeline request wasn't recognised. The solution will be to ensure that all aggregation pipeline names are valid in your request.
This article describes common errors and solutions for deployments using the Azure Cosmos DB for MongoDB.
Give this a try:-
query = {'$match': { 'type':'student' }}
df = spark.read \
.format('com.mongodb.spark.sql.DefaultSource') \
.option('uri', sourceCosmosConnectionString) \
.option('database', sourceCosmosDocument) \
.option('collection', sourceCosmosCollection) \
.option('pipeline', query) \
.load()
11-13-2022 02:00 PM
Hi @Swapnil Sarkar, The error message means the stage name in your aggregation pipeline request wasn't recognised. The solution will be to ensure that all aggregation pipeline names are valid in your request.
This article describes common errors and solutions for deployments using the Azure Cosmos DB for MongoDB.
Give this a try:-
query = {'$match': { 'type':'student' }}
df = spark.read \
.format('com.mongodb.spark.sql.DefaultSource') \
.option('uri', sourceCosmosConnectionString) \
.option('database', sourceCosmosDocument) \
.option('collection', sourceCosmosCollection) \
.option('pipeline', query) \
.load()
11-13-2022 11:28 PM
Hi Kaniz Fatma,
The above-mentioned query is working as expected.
Thanks a lot for the suggestion.
Regards,
Swapnil
11-13-2022 11:38 PM
Hi @Swapnil Sarkar , It would mean a lot if you could select the "Best Answer" to help others find the correct answer faster.
This makes that answer appear right after the question, so it's easier to find within a thread.
It also helps us mark the question as answered so we can have more eyes helping others with unanswered questions.
12-01-2022 08:58 AM
Done
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group