cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Delta Live Tables has duplicates created by multiple workers

SM
New Contributor III

Hello, I am working with Delta Live Tables, I am trying to create a DLT from a combination of Dataframes from a 'for loop' which are unioned and then DLT is created over the Unioned Dataframe. However I noticed that the delta table has duplciates. And the Number of Duplicates per Unique Row is the number of workers in the Pool Cluster. How can I avoid this duplication from happening.

1 ACCEPTED SOLUTION

Accepted Solutions

-werners-
Esteemed Contributor III

mixing python and pyspark often gives issues.

better to go all-in on pandas and then convert to dataframe or go for pyspark (like you did now).

View solution in original post

8 REPLIES 8

-werners-
Esteemed Contributor III

Are you sure it is not because of your loop?

Hubert-Dudek
Esteemed Contributor III

Please share your code, as @Werner Stinckens​ said it is other issue

SM
New Contributor III
def map_explode(df):
  metric = create_map(list(chain(*(
    (lit(name), col(name).cast(StringType())) for name in df.columns if name not in ['timestamp', 'path']
  ))))
  df = df.withColumn('mapCol', metric)
  result = df.select('timestamp', 'path', explode(df.mapCol).alias('metric','value'))
  return result
 
def load_data(path):
  data =(spark.read
       .format("parquet")
       .option("recursiveFileLookup","true")
       .load(path))
  columns = data.columns
  sft_df= (data.withColumn("timestamp", (col("_time")/1000).cast("timestamp"))
        .withColumn("path", input_file_name())
        .select('timestamp', 'path', *columns))
  
  df_transpose =  map_explode(sft_df)
  df_transpose = (df_transpose.withColumn("asset", regexp_extract(col("path"),asset_str,0))
                 .withColumn("aspect", regexp_extract(col("path"),aspect_str,0))
                 .select(*fact_columns)
                 )
 
  return df_transpose
 
aspect_paths ={}
df_append=[]
@dlt.table()
def fact_table():
  aspect_master = spark.read.table("default.master").select("name")
  for var in master.toLocalIterator():
    aspect_name = str(var["name"])
    aspect_paths.update({aspect_name: ([aspect.path for asset in dbutils.fs.ls(filepath) if 'entityId' in asset.name for aspect in dbutils.fs.ls(asset.path) if aspect_name in aspect.name])})
  
  
  for asp in aspect_paths.keys():
    if len(aspect_paths[asp])!=0:
      data = load_data(aspect_paths[asp])
      df_append.append(data)
    
  fact_df = reduce(DataFrame.unionAll, df_append)
  return fact_df

From this code I have tried to directly write to a delta table not via DLT and I dont see any duplicates. Here is the code.

-werners-
Esteemed Contributor III

Hm hard to tell. You use a mix of pyspark and python objects, perhaps that is the reason as some will be executed on the driver and others over the workers.

Can I ask why you use the toLocalIterator and the append as a list (df_append) which you then reduce with functools?

Hubert-Dudek
Esteemed Contributor III

This shared code is not correct for delta live tables. It is kind of streaming tables and here every time it loop hive metastore and load data for all tables so results can be unexpected. Just use normal job notebook without delta live tables.

SM
New Contributor III

I use 'toLocalIterator' to iterate over a dataframe to make a dictionary item which is then for looped. For each key value in the Dictionary, a dataframe is returned after processing. That dataframe is appended to a list of dataframes. Using Reduce I create one DF from the list of DFs. From this the DLT is created.

Based on your answer I removed the append and reduce fucntions and directly used union as when dataframe is returned and this solved the duplciate issue.

Thank you for the hint.

-werners-
Esteemed Contributor III

mixing python and pyspark often gives issues.

better to go all-in on pandas and then convert to dataframe or go for pyspark (like you did now).

Anonymous
Not applicable

@Shikha Mathew​ - Does your last answer mean that your issue is resolved? Would you be happy to mark whichever answer helped as best? Or, if it wasn't a specific one, would you tell us what worked?

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.