cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Dataframe rows missing after write_to_delta and read_from_delta

mimezzz
Contributor

Hi, i am trying to load mongo into s3 using pyspark 3.1.1 by reading them into a parquet.

My code snippets are like:

df = spark \

.read \

.format("mongo") \

.options(**read_options) \

.load(schema=schema)

df = df.coalesce(64)

write_df_to_delta(spark, df, s3_path)

read_count = df.count()

inserted_df = read_delta_to_df(spark, s3_path)

inserted_count = inserted_df.count()

all sparksession, mongo connection and s3 path configured well. What i found is that read_count and inserted_df count do not match, there is a gap of around 300-1200 rows. But my write to delta did not give me any error. I wonder why is this the case? what's causing it?

what i can see form rancher: 'read_count': 1373432, 'inserted_count': 1372492

def read_delta_to_df(

spark: SparkSession,

s3_path: str

) -> DataFrame:

log.info("Reading delta table from path {} to df".format(s3_path))

df = spark \

.read \

.format("delta") \

.load(s3_path)

return df

def write_df_to_delta(

spark: SparkSession,

df: DataFrame,

s3_path: str,

mode: Optional[str] = "overwrite",

partition_by: Optional[Union[str, List[str]]]= None,

retention: Optional[int] = 0

) -> None:

log.info("Writing df to delta table, {}".format(s3_path))

df.printSchema()

try:

df \

.write \

.format("delta") \

.mode(mode) \

.option("overwriteSchema", "true") \

.save(

path=s3_path,

partitionBy=partition_by)

except Exception as e:

log.error(f"error occured with error msg: {e}")

1 ACCEPTED SOLUTION

Accepted Solutions

mimezzz
Contributor

So i think i have solved the mystery here😀 it was to do with the retention config. By setting the retentionEnabled to True and rention hours being 0, we somewhat loses a few rows in the first file as they were mistaken as files from last session and just got vacuumed. Further read please see here: https://learn.microsoft.com/en-us/azure/databricks/kb/delta/data-missing-vacuum-parallel-write

View solution in original post

8 REPLIES 8

Anonymous
Not applicable

In general, avoiding rm on Delta tables is a good idea Delta's transaction log can prevent eventual consistency issues in most cases; however, when you delete and recreate a table in a short time, different versions of the transaction log can flicker in and out of existence.

Instead, I'd recommend using the transactional primitives provided by Delta. For example, to overwrite the data in a table, you can:

df. write.format("delta").mode("overwrite").save("/delta/events")

hi @May Olszewski​ thanks for replying. the mode i used was "overwrite" initially already, i forgot to put it in the above demo code sorry as it's predefined. any other sugestions? i also did vacume that directory before writing the new delta table into it

Debayan
Esteemed Contributor III
Esteemed Contributor III

Hi @mime liu​ , Do you have any other error message other than the reported one?

Hi Debayan no no error reported thruout

Hubert-Dudek
Esteemed Contributor III

The code is correct. The only problem I can imagine is that on s3_path, something is left (like some lost partition). I think better it would be to register delta to metastore and use .write.table("table_name") instead of using the path.

hi @Hubert Dudek​ thanks for the reply, yes maybe worth trying, i am also considering removing format("delta") to see if the issue persists, to diagnose whether this is a delta-related issue

mimezzz
Contributor

still havent found an answer to this, just got back from holiday. will keep digging in if i found any cause will update here.

mimezzz
Contributor

So i think i have solved the mystery here😀 it was to do with the retention config. By setting the retentionEnabled to True and rention hours being 0, we somewhat loses a few rows in the first file as they were mistaken as files from last session and just got vacuumed. Further read please see here: https://learn.microsoft.com/en-us/azure/databricks/kb/delta/data-missing-vacuum-parallel-write

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group