02-24-2022 11:40 PM
Hello, I am working with Delta Live Tables, I am trying to create a DLT from a combination of Dataframes from a 'for loop' which are unioned and then DLT is created over the Unioned Dataframe. However I noticed that the delta table has duplciates. And the Number of Duplicates per Unique Row is the number of workers in the Pool Cluster. How can I avoid this duplication from happening.
02-28-2022 06:25 AM
mixing python and pyspark often gives issues.
better to go all-in on pandas and then convert to dataframe or go for pyspark (like you did now).
02-25-2022 01:54 AM
Are you sure it is not because of your loop?
02-25-2022 04:59 AM
Please share your code, as @Werner Stinckens said it is other issue
02-27-2022 10:53 PM
def map_explode(df):
metric = create_map(list(chain(*(
(lit(name), col(name).cast(StringType())) for name in df.columns if name not in ['timestamp', 'path']
))))
df = df.withColumn('mapCol', metric)
result = df.select('timestamp', 'path', explode(df.mapCol).alias('metric','value'))
return result
def load_data(path):
data =(spark.read
.format("parquet")
.option("recursiveFileLookup","true")
.load(path))
columns = data.columns
sft_df= (data.withColumn("timestamp", (col("_time")/1000).cast("timestamp"))
.withColumn("path", input_file_name())
.select('timestamp', 'path', *columns))
df_transpose = map_explode(sft_df)
df_transpose = (df_transpose.withColumn("asset", regexp_extract(col("path"),asset_str,0))
.withColumn("aspect", regexp_extract(col("path"),aspect_str,0))
.select(*fact_columns)
)
return df_transpose
aspect_paths ={}
df_append=[]
@dlt.table()
def fact_table():
aspect_master = spark.read.table("default.master").select("name")
for var in master.toLocalIterator():
aspect_name = str(var["name"])
aspect_paths.update({aspect_name: ([aspect.path for asset in dbutils.fs.ls(filepath) if 'entityId' in asset.name for aspect in dbutils.fs.ls(asset.path) if aspect_name in aspect.name])})
for asp in aspect_paths.keys():
if len(aspect_paths[asp])!=0:
data = load_data(aspect_paths[asp])
df_append.append(data)
fact_df = reduce(DataFrame.unionAll, df_append)
return fact_df
From this code I have tried to directly write to a delta table not via DLT and I dont see any duplicates. Here is the code.
02-27-2022 11:34 PM
Hm hard to tell. You use a mix of pyspark and python objects, perhaps that is the reason as some will be executed on the driver and others over the workers.
Can I ask why you use the toLocalIterator and the append as a list (df_append) which you then reduce with functools?
02-28-2022 02:20 AM
This shared code is not correct for delta live tables. It is kind of streaming tables and here every time it loop hive metastore and load data for all tables so results can be unexpected. Just use normal job notebook without delta live tables.
02-28-2022 06:22 AM
I use 'toLocalIterator' to iterate over a dataframe to make a dictionary item which is then for looped. For each key value in the Dictionary, a dataframe is returned after processing. That dataframe is appended to a list of dataframes. Using Reduce I create one DF from the list of DFs. From this the DLT is created.
Based on your answer I removed the append and reduce fucntions and directly used union as when dataframe is returned and this solved the duplciate issue.
Thank you for the hint.
02-28-2022 06:25 AM
mixing python and pyspark often gives issues.
better to go all-in on pandas and then convert to dataframe or go for pyspark (like you did now).
03-06-2022 04:25 PM
@Shikha Mathew - Does your last answer mean that your issue is resolved? Would you be happy to mark whichever answer helped as best? Or, if it wasn't a specific one, would you tell us what worked?
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group