cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

MongoDB to databricks driver killed and compute re-attached

DoredlaCharan
New Contributor III

I started reading the data from the mongodb using the spark read it uses mongo-spark-connector, by default there will be sample size as 1000 meaning referring only 1000 documents in the collection to make them as columns in the dataframe, so i increased size to the number of documents in the collection here in my case the document has 100+ keys.

Compute used: Legacy compute

Code:

df = spark.read \
    .format("mongodb") \
    .option("spark.mongodb.connection.uri", mongo_url) \
    .option("database", database) \
    .option("collection", collection) \
    .option("mergeSchema", "true")\
    .option("partitioner", "MongoShardedPartitioner") \
    .option("partitionerOptions.shardKey", "_id") \
    .option("sampleSize", "100000")\
    .load()

Error:

"The spark driver has stopped unexpectedly and is restarting. Your notebook will be automatically reattached.
	at com.databricks.spark.chauffeur.Chauffeur.onDriverStateChange(Chauffeur.scala:2035)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)"

 

5 REPLIES 5

saurabh18cs
Honored Contributor III
Increasing samplesize to a 100000 forces the Mongo Spark Connector to pull huge amounts of documents into the Spark driver, which overwhelms memory and causes the driver JVM to die and restart.
 
1) I would suggest you not to rely on schema inference which is more intensive operation when you have 100+ keys insted provide your own schema:
    .schema(schema) \
    .load()
 
2) bring it to default 1000 or lower
 
3) are you using new connector? some option seems avialble only for legacy connector
 
Br

@DoredlaCharan

1. It is the good way to pull data from mongodb, but its an application backend so cannot fix the schema structure
2. I am using the latest connector.

saurabh18cs
Honored Contributor III

ok try using .option("inferSchema", "false")

balajij8
Contributor

You can bump the driver node up if not possible to change options

SteveOstrowski
Databricks Employee
Databricks Employee

Hi @DoredlaCharan,

The root cause here is straightforward: setting sampleSize to 100,000 forces the MongoDB Spark Connector to pull 100K documents onto your driver node just for schema inference. With 100+ keys per document and mergeSchema enabled, the driver has to deserialize, merge, and hold all of those document schemas in memory on a single JVM. That overwhelms the driver heap and triggers the "driver has stopped unexpectedly" restart.

Here is a set of approaches to solve this, starting with the most effective.


APPROACH 1: USE A TWO-PASS STRATEGY FOR DYNAMIC SCHEMAS

Since you mentioned this is an application backend where the schema is not fixed, you can use a two-pass approach. First, read a moderate sample to discover the schema, then use that schema for the full read:

# Pass 1: infer schema from a reasonable sample
schema_df = (spark.read
.format("mongodb")
.option("spark.mongodb.connection.uri", mongo_url)
.option("database", database)
.option("collection", collection)
.option("mergeSchema", "true")
.option("sampleSize", "5000")
.load()
)
inferred_schema = schema_df.schema

# Pass 2: read the full collection using the inferred schema (no inference needed)
df = (spark.read
.format("mongodb")
.option("spark.mongodb.connection.uri", mongo_url)
.option("database", database)
.option("collection", collection)
.option("partitioner", "MongoShardedPartitioner")
.option("partitionerOptions.shardKey", "_id")
.schema(inferred_schema)
.load()
)

Pass 1 uses a much smaller sample (5000 instead of 100000), which is far less likely to crash the driver. Pass 2 skips schema inference entirely because you provide the schema explicitly. Any fields that exist in documents but were not in the 5000-document sample will appear as null rather than missing columns.


APPROACH 2: USE AN AGGREGATION PIPELINE TO ASSIST SAMPLING

You can use MongoDB's own aggregation pipeline to sample documents randomly before Spark even starts schema inference. This is more memory-efficient because MongoDB does the sampling server-side:

df = (spark.read
.format("mongodb")
.option("spark.mongodb.connection.uri", mongo_url)
.option("database", database)
.option("collection", collection)
.option("mergeSchema", "true")
.option("sampleSize", "1000")
.option("aggregation.pipeline", '[{"$sample": {"size": 10000}}]')
.load()
)

The $sample stage randomly selects 10,000 documents on the MongoDB server side. Then the connector only needs to infer schema from those 10,000 documents (or the sampleSize subset of them). This reduces memory pressure on the Spark driver significantly.


APPROACH 3: INCREASE DRIVER NODE MEMORY

If you need to keep the large sampleSize, you should use a memory-optimized driver node. In your cluster configuration:

1. Go to Advanced Options > Spark Config and add:

spark.driver.memory 32g
spark.driver.maxResultSize 16g

2. Or better yet, select a memory-optimized instance type for the driver node specifically. In the cluster config UI, you can choose the driver type separately from worker types. Look for instance types with high memory-to-CPU ratios (for example, r5.2xlarge or r5.4xlarge on AWS, Standard_E8s_v3 or Standard_E16s_v3 on Azure).

Note: On Databricks, spark.driver.memory is automatically set based on your chosen instance type, so picking a bigger driver instance is often the simplest path.


APPROACH 4: INCREMENTAL SCHEMA DISCOVERY

If the schema truly changes over time and you want to capture all possible fields, you can build the schema incrementally across multiple smaller reads:

from functools import reduce
from pyspark.sql import DataFrame

schemas = []
for offset in range(0, 100000, 5000):
sample_df = (spark.read
.format("mongodb")
.option("spark.mongodb.connection.uri", mongo_url)
.option("database", database)
.option("collection", collection)
.option("mergeSchema", "true")
.option("sampleSize", "5000")
.option("aggregation.pipeline",
f'[{{"$skip": {offset}}}, {{"$limit": 5000}}]')
.load()
)
schemas.append(sample_df.schema)

# Merge all discovered schemas
merged_fields = {}
for s in schemas:
for field in s.fields:
if field.name not in merged_fields:
merged_fields[field.name] = field

from pyspark.sql.types import StructType
full_schema = StructType(list(merged_fields.values()))

# Now read with the complete schema
df = (spark.read
.format("mongodb")
.option("spark.mongodb.connection.uri", mongo_url)
.option("database", database)
.option("collection", collection)
.option("partitioner", "MongoShardedPartitioner")
.option("partitionerOptions.shardKey", "_id")
.schema(full_schema)
.load()
)

This reads small batches of 5,000 documents at a time, collects the schema from each batch, merges them, and then does a final full read with the merged schema. It avoids loading 100K documents into the driver at once.


APPROACH 5: READ AS RAW STRING AND PARSE IN SPARK

If schema variability is extreme, consider reading documents as raw JSON strings and parsing them in Spark:

from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StringType

raw_df = (spark.read
.format("mongodb")
.option("spark.mongodb.connection.uri", mongo_url)
.option("database", database)
.option("collection", collection)
.option("sampleSize", "1")
.load()
.selectExpr("to_json(struct(*)) as json_string")
)

# Write raw JSON to a Delta staging table first
raw_df.write.format("delta").mode("overwrite").saveAsTable("catalog.schema.mongo_raw_staging")

# Then infer schema from the Delta table (much faster)
staged_df = spark.read.table("catalog.schema.mongo_raw_staging")


QUICK RECOMMENDATION

For your specific case (dynamic schema, 100+ keys, sharded collection), I would start with Approach 1 (two-pass with sampleSize of 5000). If 5000 documents do not capture enough schema variation, try Approach 4 (incremental schema discovery). Both avoid the large single-pass memory spike that is crashing your driver.

Also, regardless of which approach you use, make sure you are not calling .collect(), .toPandas(), or display() on the full DataFrame after loading it. Write directly to a Delta table instead:

df.write.format("delta").mode("overwrite").saveAsTable("catalog.schema.my_mongo_table")


USEFUL DOCUMENTATION

- Databricks compute configuration (memory sizing):
https://docs.databricks.com/en/compute/configure.html

- MongoDB Spark Connector documentation:
https://www.mongodb.com/docs/spark-connector/current/

- Databricks cluster event log (to confirm OOM):
https://docs.databricks.com/en/compute/cluster-event-log.html

Let me know which approach works best for your use case or if you need help adapting any of the code examples to your specific setup.

* This reply used an agent system I built to research and draft this response based on the wide set of documentation I have available and previous memory. I personally review the draft for any obvious issues and for monitoring system reliability and update it when I detect any drift, but there is still a small chance that something is inaccurate, especially if you are experimenting with brand new features.