Wednesday
Hello Guys,
Whats is best way to build sync process which sync data for two engine database like delta table and Nosql table ( Mongo) ?
Thanx
Cordially,
Wednesday - last edited Wednesday
@seefoods , you can use Databricks to perform synchronization in both directions with the Spark MongoDB connector. This connector supports both streaming and batch modes.
The most straightforward approach would be to create a pipeline that, once a day, reads from MongoDB and writes the data to a Delta table.
Then, you would need to create a similar pipeline in the opposite direction, where once a day the pipeline reads from the Delta table and writes the data back to MongoDB.
But if you want, you can take a more ambitious approach. Since the connector supports streaming, you could set up a job that once in a day reads the changes applied to your MongoDB database.
Similarly, you can enable Change Data Feed (CDF) on your Delta table and use streaming to read only the changes applied there, then write those incremental updates back to your MongoDB collection.
For further reading - batch mode:
https://www.mongodb.com/docs/spark-connector/current/batch-mode/batch-read
https://www.mongodb.com/docs/spark-connector/current/batch-mode/batch-write
For further reading - streaming mode:
https://www.mongodb.com/docs/spark-connector/current/batch-mode/batch-write
MongoDB ChangeStream & Spark Delta Table : An Alliance | by Rajesh Vinayagam | Medium
Wednesday
Hi @seefoods ,
Could you provide more deatils, i.e do you need some kind of one way sync? For example from Mongo -> Delta Table?
Wednesday
Hello @szymon_dybczak ,
I need to write Mongo -> Delta table and Delta table -> to Mongo
Thanks
Wednesday
Thanks @seefoods . One more question. Do you need sync in near real-time or once in a day is alright?
Wednesday
I need once in a day
Wednesday
To write a Delta table to MongoDB, you'll need to:
Sample code:
from pyspark.sql import SparkSession
from pymongo import MongoClient
# Step 1: Initialize Spark session
spark = SparkSession.builder \
.appName("DeltaToMongo") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
# Step 2: Read Delta table
delta_df = spark.read.format("delta").load("/path/to/delta/table")
# Step 3: Convert to Pandas DataFrame
pandas_df = delta_df.toPandas()
# Step 4: Connect to MongoDB
client = MongoClient("mongodb://localhost:27017/")
db = client["your_database"]
collection = db["your_collection"]
# Step 5: Insert data into MongoDB
collection.insert_many(pandas_df.to_dict("records"))
Wednesday
Its not a right way to use spark connector mongo directly. but want to ensure that sync of delta table and mongodb is the same. Someone best pratices to that
Wednesday
There is a mongo spark connector maven package.
org.mongodb.spark:mongo-spark-connector_2.12:10.1.1(link: https://mvnrepository.com/artifact/org.mongodb.spark/mongo-spark-connector_2.12/10.1.1)
spark.conf.set("spark.mongodb.write.connection.uri", "mongodb://<username>:<password>@<host>:<port>/<database>")
spark.conf.set("spark.mongodb.write.database", "your_database")
spark.conf.set("spark.mongodb.write.collection", "your_collection")
##read table
delta_df = spark.table("your_database.your_table")
##write to mongo.
delta_df.write \
.format("mongodb") \
.mode("append") \
.save()
Wednesday - last edited Wednesday
i want to do the same thing but reversely <> read collection in order to update the delta table
Wednesday - last edited Wednesday
@seefoods , you can use Databricks to perform synchronization in both directions with the Spark MongoDB connector. This connector supports both streaming and batch modes.
The most straightforward approach would be to create a pipeline that, once a day, reads from MongoDB and writes the data to a Delta table.
Then, you would need to create a similar pipeline in the opposite direction, where once a day the pipeline reads from the Delta table and writes the data back to MongoDB.
But if you want, you can take a more ambitious approach. Since the connector supports streaming, you could set up a job that once in a day reads the changes applied to your MongoDB database.
Similarly, you can enable Change Data Feed (CDF) on your Delta table and use streaming to read only the changes applied there, then write those incremental updates back to your MongoDB collection.
For further reading - batch mode:
https://www.mongodb.com/docs/spark-connector/current/batch-mode/batch-read
https://www.mongodb.com/docs/spark-connector/current/batch-mode/batch-write
For further reading - streaming mode:
https://www.mongodb.com/docs/spark-connector/current/batch-mode/batch-write
MongoDB ChangeStream & Spark Delta Table : An Alliance | by Rajesh Vinayagam | Medium
Wednesday
The other option I can think of is change streams. Here is a blogpost on it.
Passionate about hosting events and connecting people? Help us grow a vibrant local communityโsign up today to get started!
Sign Up Now