I would need to add a filter condition while ingesting data from a Cosmos Mongo DB using Databricks,
I am using the below query to ingest data of a Cosmos Collection:
df = spark.read \
.format('com.mongodb.spark.sql.DefaultSource') \
.option('uri', sourceCosmosConnectionString) \
.option('database', sourceCosmosDocument) \
.option('collection', sourceCosmosCollection) \
.load()
How can I add a filter here to pick only selected data? Eg: I only want to ingest data where {"type" : "student"}
I would really appreciate if anyone can help in this
I gave a try with the below query but getting error as below:
query = {"type" : "student"}
df = spark.read \
.format('com.mongodb.spark.sql.DefaultSource') \
.option('uri', sourceCosmosConnectionString) \
.option('database', sourceCosmosDocument) \
.option('collection', sourceCosmosCollection) \
.option('pipeline', json.dumps(query)) \
.load()
Error:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 16.0 failed 4 times, most recent failure: Lost task 0.3 in stage 16.0 (TID 34) (10.139.64.5 executor 0): com.mongodb.MongoCommandException: Command failed with error 40324 (40324): 'Unrecognized pipeline stage name: type' on server xxxxxxx-xxxxx.mongo.cosmos.azure.com:10255. The full response is {"ok": 0.0, "errmsg": "Unrecognized pipeline stage name: type", "code": 40324, "codeName": "40324"}