cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Reading MongoDB collections into an RDD

Mathias_Peters
Contributor II

Hi, 

for a Spark job which does some custom computation, I need to access data from a MongoDB collection and access the elements as of type Document. The reason for this is, that I want to apply some custom type serialization which is already implemented and I have to reuse. 

Looking at the current mongo spark client, that client converts the documents already into rows of a dataset, which has implicit type conversion. 

Question 1: Is there a way to revert that and to get the original mongo document out of a row? 
Question 2: If 1 is not possible (likely), is there a way to load a mongodb collection directly into an RDD? I found some old code using MongoRDD which seems to have disappeared unfortunately. 

thank you

 

1 REPLY 1

Louis_Frolio
Databricks Employee
Databricks Employee

Greeting @Mathias_Peters , here are some suggestions for your consideration.

Analysis

You're encountering a common challenge when migrating to newer versions of the MongoDB Spark Connector. The architecture changed significantly between versions 2.x and 10.x, removing direct RDD access in favor of DataFrame/Dataset APIs with automatic type conversion.

Answer to Your Questions

Question 1: Reverting Row to Document

Short answer: No direct conversion exists, but you have workarounds:

Option A - Manual Reconstruction:
Convert the DataFrame to RDD and manually reconstruct Document objects:

```scala
import org.bson.Document

val df = spark.read.format("mongodb").load()
val documentsRDD = df.rdd.map { row =>
val doc = new Document()
row.schema.fields.foreach { field =>
doc.append(field.name, row.getAs[Any](field.name))
}
doc
}
```

Option B - Read as JSON String:Store documents as JSON strings during the read, then deserialize with your custom logic:

```scala
val df = spark.read.format("mongodb").load()
val jsonRDD = df.toJSON.rdd.map { jsonString =>
Document.parse(jsonString)
}
```

Question 2: Loading Directly into RDD

MongoRDD was deprecated and removed in connector version 10.x. The official MongoDB response confirms: "The support for RDD is not exposed in the v10.x of Spark Connector Drivers."

Alternative approaches:

Option A - Use Legacy Connector (2.4.x):
If feasible, continue using mongo-spark-connector 2.4.4 which still provides `MongoSpark.load()` returning `MongoRDD[Document]`:

```scala
import com.mongodb.spark.MongoSpark

val documentsRDD = MongoSpark.load(sc) // Returns MongoRDD[Document]
```

**Option B - Direct MongoDB Driver:**
Bypass Spark connector entirely and use MongoDB Java Driver with custom partitioning:

```scala
def fetchPartition(partitionId: Int): Iterator[Document] = {
val client = MongoClients.create("mongodb://localhost:27017")
val collection = client.getDatabase("db").getCollection("collection")
// Implement custom partitioning logic
collection.find().skip(partitionId * batchSize).limit(batchSize).iterator()
}

val documentsRDD = sc.parallelize(0 until numPartitions).flatMap(fetchPartition)
```

Option C - PyMongo with Spark:
If using PySpark, leverage PyMongo directly within RDD transformations:

```python
def read_partition(partition_range):
from pymongo import MongoClient
client = MongoClient("mongodb://localhost:27017")
collection = client.db.collection
return collection.find({"_id": {"$gte": partition_range[0], "$lt": partition_range[1]}})

ranges = [(0, 1000), (1000, 2000), ...] # Define partition ranges
documents_rdd = sc.parallelize(ranges).flatMap(read_partition)
```

Recommendation

For your use case with custom serialization requirements, **Option B (Direct MongoDB Driver)** provides the most control while maintaining Document type access. You'll need to implement partitioning logic manually, but you'll have full access to your existing serialization code without fighting the connector's type system.

If maintaining current connector versions isn't critical, staying on 2.4.x is the path of least resistance until you can refactor your serialization approach to work with Row types.

Hope this helps, Louis.

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local communityโ€”sign up today to get started!

Sign Up Now