cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Delta Live Tables has duplicates created by multiple workers

SM
New Contributor III

Hello, I am working with Delta Live Tables, I am trying to create a DLT from a combination of Dataframes from a 'for loop' which are unioned and then DLT is created over the Unioned Dataframe. However I noticed that the delta table has duplciates. And the Number of Duplicates per Unique Row is the number of workers in the Pool Cluster. How can I avoid this duplication from happening.

1 ACCEPTED SOLUTION

Accepted Solutions

-werners-
Esteemed Contributor III

mixing python and pyspark often gives issues.

better to go all-in on pandas and then convert to dataframe or go for pyspark (like you did now).

View solution in original post

8 REPLIES 8

-werners-
Esteemed Contributor III

Are you sure it is not because of your loop?

Hubert-Dudek
Esteemed Contributor III

Please share your code, as @Werner Stinckens​ said it is other issue

SM
New Contributor III
def map_explode(df):
  metric = create_map(list(chain(*(
    (lit(name), col(name).cast(StringType())) for name in df.columns if name not in ['timestamp', 'path']
  ))))
  df = df.withColumn('mapCol', metric)
  result = df.select('timestamp', 'path', explode(df.mapCol).alias('metric','value'))
  return result
 
def load_data(path):
  data =(spark.read
       .format("parquet")
       .option("recursiveFileLookup","true")
       .load(path))
  columns = data.columns
  sft_df= (data.withColumn("timestamp", (col("_time")/1000).cast("timestamp"))
        .withColumn("path", input_file_name())
        .select('timestamp', 'path', *columns))
  
  df_transpose =  map_explode(sft_df)
  df_transpose = (df_transpose.withColumn("asset", regexp_extract(col("path"),asset_str,0))
                 .withColumn("aspect", regexp_extract(col("path"),aspect_str,0))
                 .select(*fact_columns)
                 )
 
  return df_transpose
 
aspect_paths ={}
df_append=[]
@dlt.table()
def fact_table():
  aspect_master = spark.read.table("default.master").select("name")
  for var in master.toLocalIterator():
    aspect_name = str(var["name"])
    aspect_paths.update({aspect_name: ([aspect.path for asset in dbutils.fs.ls(filepath) if 'entityId' in asset.name for aspect in dbutils.fs.ls(asset.path) if aspect_name in aspect.name])})
  
  
  for asp in aspect_paths.keys():
    if len(aspect_paths[asp])!=0:
      data = load_data(aspect_paths[asp])
      df_append.append(data)
    
  fact_df = reduce(DataFrame.unionAll, df_append)
  return fact_df

From this code I have tried to directly write to a delta table not via DLT and I dont see any duplicates. Here is the code.

-werners-
Esteemed Contributor III

Hm hard to tell. You use a mix of pyspark and python objects, perhaps that is the reason as some will be executed on the driver and others over the workers.

Can I ask why you use the toLocalIterator and the append as a list (df_append) which you then reduce with functools?

Hubert-Dudek
Esteemed Contributor III

This shared code is not correct for delta live tables. It is kind of streaming tables and here every time it loop hive metastore and load data for all tables so results can be unexpected. Just use normal job notebook without delta live tables.

SM
New Contributor III

I use 'toLocalIterator' to iterate over a dataframe to make a dictionary item which is then for looped. For each key value in the Dictionary, a dataframe is returned after processing. That dataframe is appended to a list of dataframes. Using Reduce I create one DF from the list of DFs. From this the DLT is created.

Based on your answer I removed the append and reduce fucntions and directly used union as when dataframe is returned and this solved the duplciate issue.

Thank you for the hint.

-werners-
Esteemed Contributor III

mixing python and pyspark often gives issues.

better to go all-in on pandas and then convert to dataframe or go for pyspark (like you did now).

Anonymous
Not applicable

@Shikha Mathew​ - Does your last answer mean that your issue is resolved? Would you be happy to mark whichever answer helped as best? Or, if it wasn't a specific one, would you tell us what worked?

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group