SM
New Contributor III
def map_explode(df):
  metric = create_map(list(chain(*(
    (lit(name), col(name).cast(StringType())) for name in df.columns if name not in ['timestamp', 'path']
  ))))
  df = df.withColumn('mapCol', metric)
  result = df.select('timestamp', 'path', explode(df.mapCol).alias('metric','value'))
  return result
 
def load_data(path):
  data =(spark.read
       .format("parquet")
       .option("recursiveFileLookup","true")
       .load(path))
  columns = data.columns
  sft_df= (data.withColumn("timestamp", (col("_time")/1000).cast("timestamp"))
        .withColumn("path", input_file_name())
        .select('timestamp', 'path', *columns))
  
  df_transpose =  map_explode(sft_df)
  df_transpose = (df_transpose.withColumn("asset", regexp_extract(col("path"),asset_str,0))
                 .withColumn("aspect", regexp_extract(col("path"),aspect_str,0))
                 .select(*fact_columns)
                 )
 
  return df_transpose
 
aspect_paths ={}
df_append=[]
@dlt.table()
def fact_table():
  aspect_master = spark.read.table("default.master").select("name")
  for var in master.toLocalIterator():
    aspect_name = str(var["name"])
    aspect_paths.update({aspect_name: ([aspect.path for asset in dbutils.fs.ls(filepath) if 'entityId' in asset.name for aspect in dbutils.fs.ls(asset.path) if aspect_name in aspect.name])})
  
  
  for asp in aspect_paths.keys():
    if len(aspect_paths[asp])!=0:
      data = load_data(aspect_paths[asp])
      df_append.append(data)
    
  fact_df = reduce(DataFrame.unionAll, df_append)
  return fact_df

From this code I have tried to directly write to a delta table not via DLT and I dont see any duplicates. Here is the code.