Please suggest any recommendations.
@dlt.view
def lookup_keys():
return (
dlt.read_stream("pixels_db_df_dlt")
.where(F.col('ExecException').isNull())
.select(
F.upper(F.col("TrackingTag")).alias("BTag"),
F.upper(F.col("SubscriberGuid")).alias("SubscriberGuid")
)
.distinct()
)
@dlt.table(name="pixel_subset")
def pixel_subset():
keys = dlt.read_stream("lookup_keys")
# # Force Delta metadata refresh so Spark sees new files quickly
# spark.catalog.refreshTable(f"operational_{env}.silver_data.pixel_tracking_data")
base = (spark.read.table(f"operational_{env}.silver_data.pixel_tracking_data")
.select(
F.col("OperatorId").alias("SubscriberGuid"),
F.col("BTag"),
F.col("PublishPointID"),
F.col("AffiliateID"),
F.col("extraParameters"),
F.col("AdID").alias("AdId"),
F.col("MarketingSourceGroupID").alias("SourceGroupID")
))
return (
keys.join(base,["SubscriberGuid", "BTag"], "inner")
)
@dlt.table(name="lookup_df")
def lookup_df_combined():
logger.info(str(dt.now()) + " - Started reading data for lookup_df_combined")
pixels_df = (dlt.read_stream("pixels_db_df_dlt")
.where(F.col('ExecException').isNull())
.drop(F.col('ExecException'))
)
matched_btags = spark.read.table(f"operational_{env}.silver_data.pixel_subset")
# Step 4: Perform final join (combination of both tables)
result = (
pixels_df.alias('curr')
.join(
matched_btags.alias('lookup'),
(F.col("curr.SubscriberGuid") == F.col("lookup.SubscriberGuid")) & (F.col("curr.TrackingTag") == F.col("lookup.BTag")) ,'left'
)
.withColumn('retry', F.lit(0))
.withColumn(
'resolved',
F.when(F.col('lookup.BTag').isNotNull(), F.lit(True)).otherwise(False)
)
.select(
F.col('curr.*'),
F.col('lookup.PublishPointID').alias('PublishPointID'),
F.col('lookup.AffiliateID').alias('AffiliateID'),
# F.col('lookup.AdLogActionID').alias('AdLogActionID'),
F.col('lookup.extraParameters').alias('extraParameters'),
F.col('lookup.SourceGroupID').alias('SourceGroupID'),
F.col('lookup.AdID').alias('AdId'),
F.col('retry'),
F.col('resolved')
).withColumn("commit_timestamp",F.expr("current_timestamp()"))
.withColumn("loaded_at", F.expr("current_timestamp()"))
)
logger.info(str(dt.now()) + " - Finished processing lookup_df_combined")
return result