cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

process mongo table to delta table databricks

seefoods
Contributor II

Hello Guys,

I have a table mongo which size is 67GB, I use streaming to ingest but is very slow to copying all data to Delta table. 
Someone have an answer to this?  I use connector mongodb V10.5 

this is my code 


pipeline_mongo_sec = [
    {
        "$unwind": "$data"
    },
    {
        "$project": {
            "_id":0,
            "point": { 
                "$toUpper": "$point" 
            },
            "since": { 
                "$dateFromString": {
                    "dateString": "$since",
                    "timezone": "UTC"
                }
            },
            "date": {
                "$dateFromString": {
                    "dateString": "$data.date",
                    "timezone": "UTC"
                }
            },
            "label": "$data.label",
            "measure_type": "$data.measure",
            "tariff": "$data.cost",
            "unit": "$data.unit",
            "value": {
                "$toDecimal": "$data.value"
            }
        }
            
    }
]



self.spark.read.format("mongo")
            .option("spark.mongodb.input.uri", self.mongo_uri)
            .option("database", self.mongo_database)
            .option("collection", self.collection)
            .option("partitioner", "MongoSamplePartitioner")




def _write_to_output(self, df) -> bool:

        try:
            if(len(self.partition_columns) > 0😞
                print(f"Avec partitionning : {','.join(self.partition_columns)}")
                df.write.partitionBy(*self.partition_columns).mode("overwrite").parquet(self.output_path)
            else:
                df.write.mode(self.write_mode).parquet(self.output_path)
            
            return True
        except Exception as e:
            print(f" Erreur d'ecriture pour {self.mongo_database}.{self.collection} : {str(e)}")
            return False

 

1 REPLY 1

-werners-
Esteemed Contributor III

What if you do not update the delta table for each incoming microbatch but f.e. only do this every 15 min/hour/whatever.
Like that you can keep on ingesting in a streaming way, but the actual update towards the delta table is more batch approached so the overhead of the merge is less of an issue.

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now