cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Ingest Cosmos Mongo DB data using Databricks by applying filters

Swapnil1998
New Contributor III

I would need to add a filter condition while ingesting data from a Cosmos Mongo DB using Databricks,

I am using the below query to ingest data of a Cosmos Collection:

df = spark.read \

.format('com.mongodb.spark.sql.DefaultSource') \

.option('uri', sourceCosmosConnectionString) \

.option('database', sourceCosmosDocument) \

.option('collection', sourceCosmosCollection) \

.load()

How can I add a filter here to pick only selected data? Eg: I only want to ingest data where {"type" : "student"}

I would really appreciate if anyone can help in this

I gave a try with the below query but getting error as below:

query = {"type" : "student"}

df = spark.read \

.format('com.mongodb.spark.sql.DefaultSource') \

.option('uri', sourceCosmosConnectionString) \

.option('database', sourceCosmosDocument) \

.option('collection', sourceCosmosCollection) \

.option('pipeline', json.dumps(query)) \

.load()

Error:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 16.0 failed 4 times, most recent failure: Lost task 0.3 in stage 16.0 (TID 34) (10.139.64.5 executor 0): com.mongodb.MongoCommandException: Command failed with error 40324 (40324): 'Unrecognized pipeline stage name: type' on server xxxxxxx-xxxxx.mongo.cosmos.azure.com:10255. The full response is {"ok": 0.0, "errmsg": "Unrecognized pipeline stage name: type", "code": 40324, "codeName": "40324"}

1 ACCEPTED SOLUTION

Accepted Solutions

Kaniz
Community Manager
Community Manager

Hi @Swapnil Sarkar​, The error message means the stage name in your aggregation pipeline request wasn't recognised. The solution will be to ensure that all aggregation pipeline names are valid in your request.

This article describes common errors and solutions for deployments using the Azure Cosmos DB for MongoDB.

Give this a try:-

query = {'$match': { 'type':'student' }}
df = spark.read \
 
.format('com.mongodb.spark.sql.DefaultSource') \
 
.option('uri', sourceCosmosConnectionString) \
 
.option('database', sourceCosmosDocument) \
 
.option('collection', sourceCosmosCollection) \
 
.option('pipeline', query) \
 
.load()

View solution in original post

4 REPLIES 4

Kaniz
Community Manager
Community Manager

Hi @Swapnil Sarkar​, The error message means the stage name in your aggregation pipeline request wasn't recognised. The solution will be to ensure that all aggregation pipeline names are valid in your request.

This article describes common errors and solutions for deployments using the Azure Cosmos DB for MongoDB.

Give this a try:-

query = {'$match': { 'type':'student' }}
df = spark.read \
 
.format('com.mongodb.spark.sql.DefaultSource') \
 
.option('uri', sourceCosmosConnectionString) \
 
.option('database', sourceCosmosDocument) \
 
.option('collection', sourceCosmosCollection) \
 
.option('pipeline', query) \
 
.load()

Swapnil1998
New Contributor III

Hi Kaniz Fatma,

The above-mentioned query is working as expected.

Thanks a lot for the suggestion.

Regards,

Swapnil

Kaniz
Community Manager
Community Manager

Hi @Swapnil Sarkar​ ​, It would mean a lot if you could select the "Best Answer" to help others find the correct answer faster.

This makes that answer appear right after the question, so it's easier to find within a thread.

It also helps us mark the question as answered so we can have more eyes helping others with unanswered questions.

Done

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.