Ingest Cosmos Mongo DB data using Databricks by applying filters

Swapnil1998
New Contributor III

I would need to add a filter condition while ingesting data from a Cosmos Mongo DB using Databricks,

I am using the below query to ingest data of a Cosmos Collection:

df = spark.read \

.format('com.mongodb.spark.sql.DefaultSource') \

.option('uri', sourceCosmosConnectionString) \

.option('database', sourceCosmosDocument) \

.option('collection', sourceCosmosCollection) \

.load()

How can I add a filter here to pick only selected data? Eg: I only want to ingest data where {"type" : "student"}

I would really appreciate if anyone can help in this

I gave a try with the below query but getting error as below:

query = {"type" : "student"}

df = spark.read \

.format('com.mongodb.spark.sql.DefaultSource') \

.option('uri', sourceCosmosConnectionString) \

.option('database', sourceCosmosDocument) \

.option('collection', sourceCosmosCollection) \

.option('pipeline', json.dumps(query)) \

.load()

Error:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 16.0 failed 4 times, most recent failure: Lost task 0.3 in stage 16.0 (TID 34) (10.139.64.5 executor 0): com.mongodb.MongoCommandException: Command failed with error 40324 (40324): 'Unrecognized pipeline stage name: type' on server xxxxxxx-xxxxx.mongo.cosmos.azure.com:10255. The full response is {"ok": 0.0, "errmsg": "Unrecognized pipeline stage name: type", "code": 40324, "codeName": "40324"}

Hi Kaniz Fatma,

The above-mentioned query is working as expected.

Thanks a lot for the suggestion.

Regards,

Swapnil

Done