cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Streaming read and writing with aggregation

Guillermo-HR
New Contributor

Hi,

I have the following problem: on a medallion architecture on a bronze volume I get files every month containing the data for each sensor reading during the period 1 of month 00:00 to last day 23:00. I have a manual job that calls the python files to load and transform from landing -> bronze-> silver. Now I want to add the aggregated data to a gold table. I have the following code:

df_silver_swell_metrics = (

    spark.readStream

        .format("delta")

        .table(f"cor_project.silver.swell_metrics")

        .withWatermark("datetime", "0 second")

)

df_silver_swell_metrics_transformed = (

    df_silver_swell_metrics

    .groupBy(

        F.window("datetime", "1 day").alias("window"),

        "coast_name"

    ).agg(

...

            )

)

df_gold_wave_daily_summary = (

    df_silver_swell_metrics_transformed

    .select(

        ...

    )

)

(

    df_gold_wave_daily_summary.writeStream

        .format("delta")

        .outputMode("append")

        .trigger(availableNow=True)

        .option("checkpointLocation", f"/Volumes/cor_{ambiente}/gold/data/checkpoints/wave_daily_summary")

        .toTable(f"cor_project.gold.wave_daily_summary")

)

This works it doesn't repeat data that has already been processed  but it always misses the last day of the month. I have asked in other groups and they told me to use batch processing. Any thoughts on posible solutions?

Thanks for any comment 

0 REPLIES 0