Options
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
02-27-2022 10:53 PM
def map_explode(df):
metric = create_map(list(chain(*(
(lit(name), col(name).cast(StringType())) for name in df.columns if name not in ['timestamp', 'path']
))))
df = df.withColumn('mapCol', metric)
result = df.select('timestamp', 'path', explode(df.mapCol).alias('metric','value'))
return result
def load_data(path):
data =(spark.read
.format("parquet")
.option("recursiveFileLookup","true")
.load(path))
columns = data.columns
sft_df= (data.withColumn("timestamp", (col("_time")/1000).cast("timestamp"))
.withColumn("path", input_file_name())
.select('timestamp', 'path', *columns))
df_transpose = map_explode(sft_df)
df_transpose = (df_transpose.withColumn("asset", regexp_extract(col("path"),asset_str,0))
.withColumn("aspect", regexp_extract(col("path"),aspect_str,0))
.select(*fact_columns)
)
return df_transpose
aspect_paths ={}
df_append=[]
@dlt.table()
def fact_table():
aspect_master = spark.read.table("default.master").select("name")
for var in master.toLocalIterator():
aspect_name = str(var["name"])
aspect_paths.update({aspect_name: ([aspect.path for asset in dbutils.fs.ls(filepath) if 'entityId' in asset.name for aspect in dbutils.fs.ls(asset.path) if aspect_name in aspect.name])})
for asp in aspect_paths.keys():
if len(aspect_paths[asp])!=0:
data = load_data(aspect_paths[asp])
df_append.append(data)
fact_df = reduce(DataFrame.unionAll, df_append)
return fact_dfFrom this code I have tried to directly write to a delta table not via DLT and I dont see any duplicates. Here is the code.