cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Delta table cannot reach into info from previous table in the pipeline

anasse
New Contributor II

Hello

I am new to the databricks usage. I am trying to create some complex transformation in a delta table pipeline.

I have some table that are on streaming mode to collect data from an S3 then the silver layer to start the transformation but it seems to me like if the gold layer is starting before the end of the silver and so all my gold table are empty and i need to refresh them independently ...

Is there someting i am missing in the process ?

Thank you for your help

1 REPLY 1

Kaniz
Community Manager
Community Manager

Hi @Anasse Berahab​, You might be experiencing a synchronization issue between your silver and gold layers in your Delta Lake pipeline. To address this, you can use trigger and awaitTermination options to control the execution of your streaming queries.

Here's a general outline to help you set up the pipeline correctly:

  1. Read data from the source (e.g., S3) and write it to the bronze layer.
  2. Transform the data in the bronze layer and write it in the silver layer.
  3. Transform the data in the silver layer and write it in the gold layer.

To ensure that the gold layer transformation starts only after the silver layer transformation is complete, you can follow these steps:

  1. Start the streaming query for the silver layer transformation using write streams and specify a trigger to control the frequency at which the processing will be executed.

For example:

silver_streaming_query = (
    bronze_df
    .transform(silver_transform)
    .writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", "/path/to/silver/checkpoint")
    .trigger(processingTime="5 minutes")
    .start("/path/to/silver/output")
)

Here, silver_transform is a function that defines the transformation logic from bronze to silver. Replace /path/to/silver/checkpoint and /path/to/silver/output with the appropriate paths for your use case.

 

  1. Start the streaming query for the gold layer transformation using writeStream:
gold_streaming_query = (
    silver_df
    .transform(gold_transform)
    .writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", "/path/to/gold/checkpoint")
    .trigger(processingTime="10 minutes")
    .start("/path/to/gold/output")
)

Here, gold_transform is a function that defines the transformation logic from silver to gold. Replace /path/to/gold/checkpoint and /path/to/gold/output with the appropriate paths for your use case.

  1. Use awaitTermination to wait for the silver and gold streaming queries to finish:
silver_streaming_query.awaitTermination()
gold_streaming_query.awaitTermination()
 

By setting the trigger options and using awaitTermination, you can control the order and frequency of your streaming transformations. This way, you can ensure that the gold layer transformation starts only after the silver layer transformation is complete.

If you still encounter issues, consider examining the logs for any error messages or warnings.

Additionally, ensure that your transformation logic is correct and that you're not filtering out all the data during the transformations.

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.