Greeting @Mathias_Peters , here are some suggestions for your consideration.
Analysis
You're encountering a common challenge when migrating to newer versions of the MongoDB Spark Connector. The architecture changed significantly between versions 2.x and 10.x, removing direct RDD access in favor of DataFrame/Dataset APIs with automatic type conversion.
Answer to Your Questions
Question 1: Reverting Row to Document
Short answer: No direct conversion exists, but you have workarounds:
Option A - Manual Reconstruction:
Convert the DataFrame to RDD and manually reconstruct Document objects:
```scala
import org.bson.Document
val df = spark.read.format("mongodb").load()
val documentsRDD = df.rdd.map { row =>
val doc = new Document()
row.schema.fields.foreach { field =>
doc.append(field.name, row.getAs[Any](field.name))
}
doc
}
```
Option B - Read as JSON String:Store documents as JSON strings during the read, then deserialize with your custom logic:
```scala
val df = spark.read.format("mongodb").load()
val jsonRDD = df.toJSON.rdd.map { jsonString =>
Document.parse(jsonString)
}
```
Question 2: Loading Directly into RDD
MongoRDD was deprecated and removed in connector version 10.x. The official MongoDB response confirms: "The support for RDD is not exposed in the v10.x of Spark Connector Drivers."
Alternative approaches:
Option A - Use Legacy Connector (2.4.x):
If feasible, continue using mongo-spark-connector 2.4.4 which still provides `MongoSpark.load()` returning `MongoRDD[Document]`:
```scala
import com.mongodb.spark.MongoSpark
val documentsRDD = MongoSpark.load(sc) // Returns MongoRDD[Document]
```
**Option B - Direct MongoDB Driver:**
Bypass Spark connector entirely and use MongoDB Java Driver with custom partitioning:
```scala
def fetchPartition(partitionId: Int): Iterator[Document] = {
val client = MongoClients.create("mongodb://localhost:27017")
val collection = client.getDatabase("db").getCollection("collection")
// Implement custom partitioning logic
collection.find().skip(partitionId * batchSize).limit(batchSize).iterator()
}
val documentsRDD = sc.parallelize(0 until numPartitions).flatMap(fetchPartition)
```
Option C - PyMongo with Spark:
If using PySpark, leverage PyMongo directly within RDD transformations:
```python
def read_partition(partition_range):
from pymongo import MongoClient
client = MongoClient("mongodb://localhost:27017")
collection = client.db.collection
return collection.find({"_id": {"$gte": partition_range[0], "$lt": partition_range[1]}})
ranges = [(0, 1000), (1000, 2000), ...] # Define partition ranges
documents_rdd = sc.parallelize(ranges).flatMap(read_partition)
```
Recommendation
For your use case with custom serialization requirements, **Option B (Direct MongoDB Driver)** provides the most control while maintaining Document type access. You'll need to implement partitioning logic manually, but you'll have full access to your existing serialization code without fighting the connector's type system.
If maintaining current connector versions isn't critical, staying on 2.4.x is the path of least resistance until you can refactor your serialization approach to work with Row types.
Hope this helps, Louis.