Hello Guys,
I have a table mongo which size is 67GB, I use streaming to ingest but is very slow to copying all data to Delta table.
Someone have an answer to this? I use connector mongodb V10.5
this is my code
pipeline_mongo_sec = [
{
"$unwind": "$data"
},
{
"$project": {
"_id":0,
"point": {
"$toUpper": "$point"
},
"since": {
"$dateFromString": {
"dateString": "$since",
"timezone": "UTC"
}
},
"date": {
"$dateFromString": {
"dateString": "$data.date",
"timezone": "UTC"
}
},
"label": "$data.label",
"measure_type": "$data.measure",
"tariff": "$data.cost",
"unit": "$data.unit",
"value": {
"$toDecimal": "$data.value"
}
}
}
]
self.spark.read.format("mongo")
.option("spark.mongodb.input.uri", self.mongo_uri)
.option("database", self.mongo_database)
.option("collection", self.collection)
.option("partitioner", "MongoSamplePartitioner")
def _write_to_output(self, df) -> bool:
try:
if(len(self.partition_columns) > 0😞
print(f"Avec partitionning : {','.join(self.partition_columns)}")
df.write.partitionBy(*self.partition_columns).mode("overwrite").parquet(self.output_path)
else:
df.write.mode(self.write_mode).parquet(self.output_path)
return True
except Exception as e:
print(f"❌ Erreur d'ecriture pour {self.mongo_database}.{self.collection} : {str(e)}")
return False