11-02-2022 06:46 PM
Hi, i am trying to load mongo into s3 using pyspark 3.1.1 by reading them into a parquet.
My code snippets are like:
df = spark \
.read \
.format("mongo") \
.options(**read_options) \
.load(schema=schema)
df = df.coalesce(64)
write_df_to_delta(spark, df, s3_path)
read_count = df.count()
inserted_df = read_delta_to_df(spark, s3_path)
inserted_count = inserted_df.count()
all sparksession, mongo connection and s3 path configured well. What i found is that read_count and inserted_df count do not match, there is a gap of around 300-1200 rows. But my write to delta did not give me any error. I wonder why is this the case? what's causing it?
what i can see form rancher: 'read_count': 1373432, 'inserted_count': 1372492
def read_delta_to_df(
spark: SparkSession,
s3_path: str
) -> DataFrame:
log.info("Reading delta table from path {} to df".format(s3_path))
df = spark \
.read \
.format("delta") \
.load(s3_path)
return df
def write_df_to_delta(
spark: SparkSession,
df: DataFrame,
s3_path: str,
mode: Optional[str] = "overwrite",
partition_by: Optional[Union[str, List[str]]]= None,
retention: Optional[int] = 0
) -> None:
log.info("Writing df to delta table, {}".format(s3_path))
df.printSchema()
try:
df \
.write \
.format("delta") \
.mode(mode) \
.option("overwriteSchema", "true") \
.save(
path=s3_path,
partitionBy=partition_by)
except Exception as e:
log.error(f"error occured with error msg: {e}")
01-26-2023 09:45 PM
So i think i have solved the mystery here😀 it was to do with the retention config. By setting the retentionEnabled to True and rention hours being 0, we somewhat loses a few rows in the first file as they were mistaken as files from last session and just got vacuumed. Further read please see here: https://learn.microsoft.com/en-us/azure/databricks/kb/delta/data-missing-vacuum-parallel-write
11-02-2022 11:01 PM
In general, avoiding rm on Delta tables is a good idea Delta's transaction log can prevent eventual consistency issues in most cases; however, when you delete and recreate a table in a short time, different versions of the transaction log can flicker in and out of existence.
Instead, I'd recommend using the transactional primitives provided by Delta. For example, to overwrite the data in a table, you can:
df. write.format("delta").mode("overwrite").save("/delta/events")
11-03-2022 03:44 PM
hi @May Olszewski thanks for replying. the mode i used was "overwrite" initially already, i forgot to put it in the above demo code sorry as it's predefined. any other sugestions? i also did vacume that directory before writing the new delta table into it
11-02-2022 11:11 PM
Hi @mime liu , Do you have any other error message other than the reported one?
11-03-2022 03:42 PM
Hi Debayan no no error reported thruout
11-03-2022 02:07 AM
The code is correct. The only problem I can imagine is that on s3_path, something is left (like some lost partition). I think better it would be to register delta to metastore and use .write.table("table_name") instead of using the path.
11-03-2022 03:45 PM
hi @Hubert Dudek thanks for the reply, yes maybe worth trying, i am also considering removing format("delta") to see if the issue persists, to diagnose whether this is a delta-related issue
11-29-2022 01:20 AM
still havent found an answer to this, just got back from holiday. will keep digging in if i found any cause will update here.
01-26-2023 09:45 PM
So i think i have solved the mystery here😀 it was to do with the retention config. By setting the retentionEnabled to True and rention hours being 0, we somewhat loses a few rows in the first file as they were mistaken as files from last session and just got vacuumed. Further read please see here: https://learn.microsoft.com/en-us/azure/databricks/kb/delta/data-missing-vacuum-parallel-write
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group