Ingest Cosmos Mongo DB data using Databricks by applying filters
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
11-11-2022 02:00 AM
I would need to add a filter condition while ingesting data from a Cosmos Mongo DB using Databricks,
I am using the below query to ingest data of a Cosmos Collection:
df = spark.read \
.format('com.mongodb.spark.sql.DefaultSource') \
.option('uri', sourceCosmosConnectionString) \
.option('database', sourceCosmosDocument) \
.option('collection', sourceCosmosCollection) \
.load()
How can I add a filter here to pick only selected data? Eg: I only want to ingest data where {"type" : "student"}
I would really appreciate if anyone can help in this
I gave a try with the below query but getting error as below:
query = {"type" : "student"}
df = spark.read \
.format('com.mongodb.spark.sql.DefaultSource') \
.option('uri', sourceCosmosConnectionString) \
.option('database', sourceCosmosDocument) \
.option('collection', sourceCosmosCollection) \
.option('pipeline', json.dumps(query)) \
.load()
Error:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 16.0 failed 4 times, most recent failure: Lost task 0.3 in stage 16.0 (TID 34) (10.139.64.5 executor 0): com.mongodb.MongoCommandException: Command failed with error 40324 (40324): 'Unrecognized pipeline stage name: type' on server xxxxxxx-xxxxx.mongo.cosmos.azure.com:10255. The full response is {"ok": 0.0, "errmsg": "Unrecognized pipeline stage name: type", "code": 40324, "codeName": "40324"}
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
11-13-2022 11:28 PM
Hi Kaniz Fatma,
The above-mentioned query is working as expected.
Thanks a lot for the suggestion.
Regards,
Swapnil
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
12-01-2022 08:58 AM
Done