<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: Reading MongoDB collections into an RDD in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/reading-mongodb-collections-into-an-rdd/m-p/137631#M50785</link>
    <description>&lt;P&gt;Greeting&amp;nbsp;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/98051"&gt;@Mathias_Peters&lt;/a&gt;&amp;nbsp;, here are some suggestions for your consideration.&lt;/P&gt;
&lt;H2&gt;Analysis&lt;/H2&gt;
&lt;P&gt;You're encountering a common challenge when migrating to newer versions of the MongoDB Spark Connector. The architecture changed significantly between versions 2.x and 10.x, removing direct RDD access in favor of DataFrame/Dataset APIs with automatic type conversion.&lt;/P&gt;
&lt;H2&gt;Answer to Your Questions&lt;/H2&gt;
&lt;H3&gt;Question 1: Reverting Row to Document&lt;/H3&gt;
&lt;P&gt;&lt;STRONG&gt;Short answer: No direct conversion exists&lt;/STRONG&gt;, but you have workarounds:&lt;/P&gt;
&lt;P&gt;&lt;STRONG&gt;Option A - Manual Reconstruction:&lt;/STRONG&gt;&lt;BR /&gt;Convert the DataFrame to RDD and manually reconstruct Document objects:&lt;/P&gt;
&lt;P&gt;```scala&lt;BR /&gt;import org.bson.Document&lt;/P&gt;
&lt;P&gt;val df = spark.read.format("mongodb").load()&lt;BR /&gt;val documentsRDD = df.rdd.map { row =&amp;gt;&lt;BR /&gt;val doc = new Document()&lt;BR /&gt;row.schema.fields.foreach { field =&amp;gt;&lt;BR /&gt;doc.append(field.name, row.getAs[Any](field.name))&lt;BR /&gt;}&lt;BR /&gt;doc&lt;BR /&gt;}&lt;BR /&gt;```&lt;/P&gt;
&lt;P&gt;&lt;STRONG&gt;Option B - Read as JSON String:&lt;/STRONG&gt;Store documents as JSON strings during the read, then deserialize with your custom logic:&lt;/P&gt;
&lt;P&gt;```scala&lt;BR /&gt;val df = spark.read.format("mongodb").load()&lt;BR /&gt;val jsonRDD = df.toJSON.rdd.map { jsonString =&amp;gt;&lt;BR /&gt;Document.parse(jsonString)&lt;BR /&gt;}&lt;BR /&gt;```&lt;/P&gt;
&lt;H2&gt;Question 2: Loading Directly into RDD&lt;/H2&gt;
&lt;P&gt;&lt;STRONG&gt;MongoRDD was deprecated and removed in connector version 10.x.&lt;/STRONG&gt; The official MongoDB response confirms: "The support for RDD is not exposed in the v10.x of Spark Connector Drivers."&lt;/P&gt;
&lt;P&gt;&lt;STRONG&gt;Alternative approaches:&lt;/STRONG&gt;&lt;/P&gt;
&lt;P&gt;&lt;STRONG&gt;Option A - Use Legacy Connector (2.4.x):&lt;/STRONG&gt;&lt;BR /&gt;If feasible, continue using mongo-spark-connector 2.4.4 which still provides `MongoSpark.load()` returning `MongoRDD[Document]`:&lt;/P&gt;
&lt;P&gt;```scala&lt;BR /&gt;import com.mongodb.spark.MongoSpark&lt;/P&gt;
&lt;P&gt;val documentsRDD = MongoSpark.load(sc) // Returns MongoRDD[Document]&lt;BR /&gt;```&lt;/P&gt;
&lt;P&gt;**Option B - Direct MongoDB Driver:**&lt;BR /&gt;Bypass Spark connector entirely and use MongoDB Java Driver with custom partitioning:&lt;/P&gt;
&lt;P&gt;```scala&lt;BR /&gt;def fetchPartition(partitionId: Int): Iterator[Document] = {&lt;BR /&gt;val client = MongoClients.create("mongodb://localhost:27017")&lt;BR /&gt;val collection = client.getDatabase("db").getCollection("collection")&lt;BR /&gt;// Implement custom partitioning logic&lt;BR /&gt;collection.find().skip(partitionId * batchSize).limit(batchSize).iterator()&lt;BR /&gt;}&lt;/P&gt;
&lt;P&gt;val documentsRDD = sc.parallelize(0 until numPartitions).flatMap(fetchPartition)&lt;BR /&gt;```&lt;/P&gt;
&lt;P&gt;&lt;STRONG&gt;Option C - PyMongo with Spark:&lt;/STRONG&gt;&lt;BR /&gt;If using PySpark, leverage PyMongo directly within RDD transformations:&lt;/P&gt;
&lt;P&gt;```python&lt;BR /&gt;def read_partition(partition_range):&lt;BR /&gt;from pymongo import MongoClient&lt;BR /&gt;client = MongoClient("mongodb://localhost:27017")&lt;BR /&gt;collection = client.db.collection&lt;BR /&gt;return collection.find({"_id": {"$gte": partition_range[0], "$lt": partition_range[1]}})&lt;/P&gt;
&lt;P&gt;ranges = [(0, 1000), (1000, 2000), ...] # Define partition ranges&lt;BR /&gt;documents_rdd = sc.parallelize(ranges).flatMap(read_partition)&lt;BR /&gt;```&lt;/P&gt;
&lt;H2&gt;Recommendation&lt;/H2&gt;
&lt;P&gt;For your use case with custom serialization requirements, **Option B (Direct MongoDB Driver)** provides the most control while maintaining Document type access. You'll need to implement partitioning logic manually, but you'll have full access to your existing serialization code without fighting the connector's type system.&lt;/P&gt;
&lt;P&gt;If maintaining current connector versions isn't critical, &lt;STRONG&gt;staying on 2.4.x&lt;/STRONG&gt;&amp;nbsp;is the path of least resistance until you can refactor your serialization approach to work with Row types.&lt;/P&gt;
&lt;P&gt;Hope this helps, Louis.&lt;/P&gt;</description>
    <pubDate>Tue, 04 Nov 2025 18:33:19 GMT</pubDate>
    <dc:creator>Louis_Frolio</dc:creator>
    <dc:date>2025-11-04T18:33:19Z</dc:date>
    <item>
      <title>Reading MongoDB collections into an RDD</title>
      <link>https://community.databricks.com/t5/data-engineering/reading-mongodb-collections-into-an-rdd/m-p/137412#M50738</link>
      <description>&lt;P&gt;Hi,&amp;nbsp;&lt;/P&gt;&lt;P&gt;for a Spark job which does some custom computation, I need to access data from a MongoDB collection and access the elements as of type Document. The reason for this is, that I want to apply some custom type serialization which is already implemented and I have to reuse.&amp;nbsp;&lt;BR /&gt;&lt;BR /&gt;Looking at the current mongo spark client, that client converts the documents already into rows of a dataset, which has implicit type conversion.&amp;nbsp;&lt;BR /&gt;&lt;BR /&gt;Question 1: Is there a way to revert that and to get the original mongo document out of a row?&amp;nbsp;&lt;BR /&gt;Question 2: If 1 is not possible (likely), is there a way to load a mongodb collection directly into an RDD? I found some old code using MongoRDD which seems to have disappeared unfortunately.&amp;nbsp;&lt;BR /&gt;&lt;BR /&gt;thank you&lt;BR /&gt;&lt;BR /&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Mon, 03 Nov 2025 14:56:31 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/reading-mongodb-collections-into-an-rdd/m-p/137412#M50738</guid>
      <dc:creator>Mathias_Peters</dc:creator>
      <dc:date>2025-11-03T14:56:31Z</dc:date>
    </item>
    <item>
      <title>Re: Reading MongoDB collections into an RDD</title>
      <link>https://community.databricks.com/t5/data-engineering/reading-mongodb-collections-into-an-rdd/m-p/137631#M50785</link>
      <description>&lt;P&gt;Greeting&amp;nbsp;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/98051"&gt;@Mathias_Peters&lt;/a&gt;&amp;nbsp;, here are some suggestions for your consideration.&lt;/P&gt;
&lt;H2&gt;Analysis&lt;/H2&gt;
&lt;P&gt;You're encountering a common challenge when migrating to newer versions of the MongoDB Spark Connector. The architecture changed significantly between versions 2.x and 10.x, removing direct RDD access in favor of DataFrame/Dataset APIs with automatic type conversion.&lt;/P&gt;
&lt;H2&gt;Answer to Your Questions&lt;/H2&gt;
&lt;H3&gt;Question 1: Reverting Row to Document&lt;/H3&gt;
&lt;P&gt;&lt;STRONG&gt;Short answer: No direct conversion exists&lt;/STRONG&gt;, but you have workarounds:&lt;/P&gt;
&lt;P&gt;&lt;STRONG&gt;Option A - Manual Reconstruction:&lt;/STRONG&gt;&lt;BR /&gt;Convert the DataFrame to RDD and manually reconstruct Document objects:&lt;/P&gt;
&lt;P&gt;```scala&lt;BR /&gt;import org.bson.Document&lt;/P&gt;
&lt;P&gt;val df = spark.read.format("mongodb").load()&lt;BR /&gt;val documentsRDD = df.rdd.map { row =&amp;gt;&lt;BR /&gt;val doc = new Document()&lt;BR /&gt;row.schema.fields.foreach { field =&amp;gt;&lt;BR /&gt;doc.append(field.name, row.getAs[Any](field.name))&lt;BR /&gt;}&lt;BR /&gt;doc&lt;BR /&gt;}&lt;BR /&gt;```&lt;/P&gt;
&lt;P&gt;&lt;STRONG&gt;Option B - Read as JSON String:&lt;/STRONG&gt;Store documents as JSON strings during the read, then deserialize with your custom logic:&lt;/P&gt;
&lt;P&gt;```scala&lt;BR /&gt;val df = spark.read.format("mongodb").load()&lt;BR /&gt;val jsonRDD = df.toJSON.rdd.map { jsonString =&amp;gt;&lt;BR /&gt;Document.parse(jsonString)&lt;BR /&gt;}&lt;BR /&gt;```&lt;/P&gt;
&lt;H2&gt;Question 2: Loading Directly into RDD&lt;/H2&gt;
&lt;P&gt;&lt;STRONG&gt;MongoRDD was deprecated and removed in connector version 10.x.&lt;/STRONG&gt; The official MongoDB response confirms: "The support for RDD is not exposed in the v10.x of Spark Connector Drivers."&lt;/P&gt;
&lt;P&gt;&lt;STRONG&gt;Alternative approaches:&lt;/STRONG&gt;&lt;/P&gt;
&lt;P&gt;&lt;STRONG&gt;Option A - Use Legacy Connector (2.4.x):&lt;/STRONG&gt;&lt;BR /&gt;If feasible, continue using mongo-spark-connector 2.4.4 which still provides `MongoSpark.load()` returning `MongoRDD[Document]`:&lt;/P&gt;
&lt;P&gt;```scala&lt;BR /&gt;import com.mongodb.spark.MongoSpark&lt;/P&gt;
&lt;P&gt;val documentsRDD = MongoSpark.load(sc) // Returns MongoRDD[Document]&lt;BR /&gt;```&lt;/P&gt;
&lt;P&gt;**Option B - Direct MongoDB Driver:**&lt;BR /&gt;Bypass Spark connector entirely and use MongoDB Java Driver with custom partitioning:&lt;/P&gt;
&lt;P&gt;```scala&lt;BR /&gt;def fetchPartition(partitionId: Int): Iterator[Document] = {&lt;BR /&gt;val client = MongoClients.create("mongodb://localhost:27017")&lt;BR /&gt;val collection = client.getDatabase("db").getCollection("collection")&lt;BR /&gt;// Implement custom partitioning logic&lt;BR /&gt;collection.find().skip(partitionId * batchSize).limit(batchSize).iterator()&lt;BR /&gt;}&lt;/P&gt;
&lt;P&gt;val documentsRDD = sc.parallelize(0 until numPartitions).flatMap(fetchPartition)&lt;BR /&gt;```&lt;/P&gt;
&lt;P&gt;&lt;STRONG&gt;Option C - PyMongo with Spark:&lt;/STRONG&gt;&lt;BR /&gt;If using PySpark, leverage PyMongo directly within RDD transformations:&lt;/P&gt;
&lt;P&gt;```python&lt;BR /&gt;def read_partition(partition_range):&lt;BR /&gt;from pymongo import MongoClient&lt;BR /&gt;client = MongoClient("mongodb://localhost:27017")&lt;BR /&gt;collection = client.db.collection&lt;BR /&gt;return collection.find({"_id": {"$gte": partition_range[0], "$lt": partition_range[1]}})&lt;/P&gt;
&lt;P&gt;ranges = [(0, 1000), (1000, 2000), ...] # Define partition ranges&lt;BR /&gt;documents_rdd = sc.parallelize(ranges).flatMap(read_partition)&lt;BR /&gt;```&lt;/P&gt;
&lt;H2&gt;Recommendation&lt;/H2&gt;
&lt;P&gt;For your use case with custom serialization requirements, **Option B (Direct MongoDB Driver)** provides the most control while maintaining Document type access. You'll need to implement partitioning logic manually, but you'll have full access to your existing serialization code without fighting the connector's type system.&lt;/P&gt;
&lt;P&gt;If maintaining current connector versions isn't critical, &lt;STRONG&gt;staying on 2.4.x&lt;/STRONG&gt;&amp;nbsp;is the path of least resistance until you can refactor your serialization approach to work with Row types.&lt;/P&gt;
&lt;P&gt;Hope this helps, Louis.&lt;/P&gt;</description>
      <pubDate>Tue, 04 Nov 2025 18:33:19 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/reading-mongodb-collections-into-an-rdd/m-p/137631#M50785</guid>
      <dc:creator>Louis_Frolio</dc:creator>
      <dc:date>2025-11-04T18:33:19Z</dc:date>
    </item>
  </channel>
</rss>

