Hi,
I have the following problem: on a medallion architecture on a bronze volume I get files every month containing the data for each sensor reading during the period 1 of month 00:00 to last day 23:00. I have a manual job that calls the python files to load and transform from landing -> bronze-> silver. Now I want to add the aggregated data to a gold table. I have the following code:
df_silver_swell_metrics = (
spark.readStream
.format("delta")
.table(f"cor_project.silver.swell_metrics")
.withWatermark("datetime", "0 second")
)
df_silver_swell_metrics_transformed = (
df_silver_swell_metrics
.groupBy(
F.window("datetime", "1 day").alias("window"),
"coast_name"
).agg(
...
)
)
df_gold_wave_daily_summary = (
df_silver_swell_metrics_transformed
.select(
...
)
)
(
df_gold_wave_daily_summary.writeStream
.format("delta")
.outputMode("append")
.trigger(availableNow=True)
.option("checkpointLocation", f"/Volumes/cor_{ambiente}/gold/data/checkpoints/wave_daily_summary")
.toTable(f"cor_project.gold.wave_daily_summary")
)
This works it doesn't repeat data that has already been processed but it always misses the last day of the month. I have asked in other groups and they told me to use batch processing. Any thoughts on posible solutions?
Thanks for any comment