cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Can I have additional logic in a DLT notebook that is unrelated to directly creating DLTs?

ac0
New Contributor III

I have an Azure Storage Data Table that I would like to update based on records that were just streamed into a Delta Live Table. Below is example code:

@Dlt.create_table(
  comment="comment",
  table_properties={
    "pipelines.autoOptimize.managed": "true"
  },
  partition_cols = ["EventId"]
)
def live_table_test():
  return (
    spark.readStream.format("cloudFiles") 
        .options(**cloudfile) 
        .load(upload_path)
        .selectExpr("*", "_metadata.file_path as file_path")
  )

I want to update an Azure Storage Data Table based on records that were just streamed into that table. Would something like this work?

@Dlt.create_table(
  comment="comment",
  table_properties={
    "pipelines.autoOptimize.managed": "true"
  },
  partition_cols = ["EventId"]
)
def live_table_test():
  df = (
    spark.readStream.format("cloudFiles") 
        .options(**cloudfile) 
        .load(upload_path)
        .selectExpr("*", "_metadata.file_path as file_path")
  )
  update_azure_table(df)
  return df

I need to be able to get the list of distinct EventIds from the df and pass them into that function. How can I do this? And should I be using the "dlt.create_table" decorator? Or should I use the "dlt.table" decorator? I'm finding almost no documentation about this online.

1 ACCEPTED SOLUTION

Accepted Solutions

raphaelblg
New Contributor III
New Contributor III

Hello,

Thank you for reaching out with your questions. I'm Raphael, and I'm here to assist you.

I want to update an Azure Storage Data Table based on records that were just streamed into that table. Would something like this work?

Implementing `update_azure_table(df)` directly in your workflow should not immediately cause errors. However, this practice is not typically recommended due to potential future complications. Since DLT manages the `live_table_test` stream but does not oversee the `update_azure_table(df)` query, discrepancies in DLT management could lead to unintended consequences, such as premature cluster termination.

To effectively update two downstream streaming tables within your DLT pipeline, consider the following approaches:

1. DLT Tables: Define both downstream targets as DLT tables using separate `@dlt.table` functions for each.

2. Non-DLT Environment: Execute `update_azure_table` outside the DLT-managed environment. Utilizing your downstream DLT table as a source for updates is feasible as well as using your original Autoloader query.

 

And should I be using the "dlt.create_table" decorator? Or should I use the "dlt.table" decorator? I'm finding almost no documentation about this online.

You should employ the `@dlt.table` decorator for defining tables within a DLT pipeline. This decorator is essential for specifying tables to be managed by DLT. For documentation and examples, you may find Delta Live Tables Python Reference useful:

I need to be able to get the list of distinct EventIds from the df and pass them into that function. How can I do this?

Streaming queries typically do not maintain state, which means performing distinct operations across the entire table is not straightforward due to the lack of defined boundaries.

To introduce state into your streaming query, you might need to apply watermarking and select distinct EventIds within specified watermarks or data segments. A detailed explanation and guide for implementing such functionality can be found in the Structured Streaming Programming Guide on the Apache Spark documentation site.

 
I've created a sample scenario in which I've selected distinct Models for a sample car dataset given a sample timestamp column.
 
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, window, collect_set

# Sample readStream (assuming a source that provides eventTime and eventId, like Kafka, file, etc.)
# This part of the code is just for illustration. Replace it with your actual stream source
streamingInputDF = (
    spark 
    .readStream 
    .option("withEventTimeOrder", "true") # Apply this if the initial snapshot should be processed with event time order.
    .format("delta") 
    .table("balogo.delta_managed.df_car")
    .withWatermark("ManufactureDate", "30 days") # Apply watermarking to define state
)

# Perform an aggregation to gather distinct event IDs within each window of time
aggregatedDF = streamingInputDF.groupBy(
    window(col("ManufactureDate"), "30 days")
).agg(
    collect_set("DriveType").alias("DistinctDriveTypeValues")
)

# Is needed you can define Define the foreachBatch function to process each micro-batch DataFrame
def process_batch(batch, epoch_id):
    # Example: Save the distinct event IDs to a database, write to a file, etc.
    (
        batch
        .select("window.start", "window.end", "DistinctDriveTypeValues")
        .write
        .format("delta")
        .mode("append")
        .option("checkpointLocation", "dbfs:/tmp/balogo/distinctStreamingExample/_checkpoints/")
        .saveAsTable("balogo.delta_managed.distinct_drive_types")
    )
    
# Apply the foreachBatch operation to process each micro-batch using the defined function
query = aggregatedDF.writeStream.foreachBatch(process_batch).start()

# Note: Adjust the code as necessary, this is just a sample reference that I've created for testing 
 
Some rows I took from the output table as a sample:
 
Start End DistinctModels
2005-01-27T00:00:00.000+00:00 2005-02-26T00:00:00.000+00:00 ["Model H"]
2005-02-26T00:00:00.000+00:00 2005-03-28T00:00:00.000+00:00 ["Model I","Model G","Model H"]
2005-05-27T00:00:00.000+00:00 2005-06-26T00:00:00.000+00:00 ["Model F","Model E"]
 
I hope these insights are helpful to you. Should you have further questions or require additional clarification, please don't hesitate to ask 🙂
Best regards,

Raphael Balogo
Sr. Technical Solutions Engineer
Databricks

View solution in original post

2 REPLIES 2

raphaelblg
New Contributor III
New Contributor III

Hello,

Thank you for reaching out with your questions. I'm Raphael, and I'm here to assist you.

I want to update an Azure Storage Data Table based on records that were just streamed into that table. Would something like this work?

Implementing `update_azure_table(df)` directly in your workflow should not immediately cause errors. However, this practice is not typically recommended due to potential future complications. Since DLT manages the `live_table_test` stream but does not oversee the `update_azure_table(df)` query, discrepancies in DLT management could lead to unintended consequences, such as premature cluster termination.

To effectively update two downstream streaming tables within your DLT pipeline, consider the following approaches:

1. DLT Tables: Define both downstream targets as DLT tables using separate `@dlt.table` functions for each.

2. Non-DLT Environment: Execute `update_azure_table` outside the DLT-managed environment. Utilizing your downstream DLT table as a source for updates is feasible as well as using your original Autoloader query.

 

And should I be using the "dlt.create_table" decorator? Or should I use the "dlt.table" decorator? I'm finding almost no documentation about this online.

You should employ the `@dlt.table` decorator for defining tables within a DLT pipeline. This decorator is essential for specifying tables to be managed by DLT. For documentation and examples, you may find Delta Live Tables Python Reference useful:

I need to be able to get the list of distinct EventIds from the df and pass them into that function. How can I do this?

Streaming queries typically do not maintain state, which means performing distinct operations across the entire table is not straightforward due to the lack of defined boundaries.

To introduce state into your streaming query, you might need to apply watermarking and select distinct EventIds within specified watermarks or data segments. A detailed explanation and guide for implementing such functionality can be found in the Structured Streaming Programming Guide on the Apache Spark documentation site.

 
I've created a sample scenario in which I've selected distinct Models for a sample car dataset given a sample timestamp column.
 
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, window, collect_set

# Sample readStream (assuming a source that provides eventTime and eventId, like Kafka, file, etc.)
# This part of the code is just for illustration. Replace it with your actual stream source
streamingInputDF = (
    spark 
    .readStream 
    .option("withEventTimeOrder", "true") # Apply this if the initial snapshot should be processed with event time order.
    .format("delta") 
    .table("balogo.delta_managed.df_car")
    .withWatermark("ManufactureDate", "30 days") # Apply watermarking to define state
)

# Perform an aggregation to gather distinct event IDs within each window of time
aggregatedDF = streamingInputDF.groupBy(
    window(col("ManufactureDate"), "30 days")
).agg(
    collect_set("DriveType").alias("DistinctDriveTypeValues")
)

# Is needed you can define Define the foreachBatch function to process each micro-batch DataFrame
def process_batch(batch, epoch_id):
    # Example: Save the distinct event IDs to a database, write to a file, etc.
    (
        batch
        .select("window.start", "window.end", "DistinctDriveTypeValues")
        .write
        .format("delta")
        .mode("append")
        .option("checkpointLocation", "dbfs:/tmp/balogo/distinctStreamingExample/_checkpoints/")
        .saveAsTable("balogo.delta_managed.distinct_drive_types")
    )
    
# Apply the foreachBatch operation to process each micro-batch using the defined function
query = aggregatedDF.writeStream.foreachBatch(process_batch).start()

# Note: Adjust the code as necessary, this is just a sample reference that I've created for testing 
 
Some rows I took from the output table as a sample:
 
Start End DistinctModels
2005-01-27T00:00:00.000+00:00 2005-02-26T00:00:00.000+00:00 ["Model H"]
2005-02-26T00:00:00.000+00:00 2005-03-28T00:00:00.000+00:00 ["Model I","Model G","Model H"]
2005-05-27T00:00:00.000+00:00 2005-06-26T00:00:00.000+00:00 ["Model F","Model E"]
 
I hope these insights are helpful to you. Should you have further questions or require additional clarification, please don't hesitate to ask 🙂
Best regards,

Raphael Balogo
Sr. Technical Solutions Engineer
Databricks

jose_gonzalez
Moderator
Moderator

Hi @ac0,

Please check @raphaelblg 's response and let us know if this helped to resolve your issue. If it did, please mark it as the accepted solution.

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.