<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Can I have additional logic in a DLT notebook that is unrelated to directly creating DLTs? in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/can-i-have-additional-logic-in-a-dlt-notebook-that-is-unrelated/m-p/63092#M32169</link>
    <description>&lt;P&gt;I have an Azure Storage Data Table that I would like to update based on records that were just streamed into a Delta Live Table. Below is example code:&lt;/P&gt;&lt;LI-CODE lang="python"&gt;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/97035"&gt;@Dlt&lt;/a&gt;.create_table(
  comment="comment",
  table_properties={
    "pipelines.autoOptimize.managed": "true"
  },
  partition_cols = ["EventId"]
)
def live_table_test():
  return (
    spark.readStream.format("cloudFiles") 
        .options(**cloudfile) 
        .load(upload_path)
        .selectExpr("*", "_metadata.file_path as file_path")
  )&lt;/LI-CODE&gt;&lt;P&gt;I want to update an Azure Storage Data Table based on records that were just streamed into that table. Would something like this work?&lt;/P&gt;&lt;LI-CODE lang="python"&gt;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/97035"&gt;@Dlt&lt;/a&gt;.create_table(
  comment="comment",
  table_properties={
    "pipelines.autoOptimize.managed": "true"
  },
  partition_cols = ["EventId"]
)
def live_table_test():
  df = (
    spark.readStream.format("cloudFiles") 
        .options(**cloudfile) 
        .load(upload_path)
        .selectExpr("*", "_metadata.file_path as file_path")
  )
  update_azure_table(df)
  return df&lt;/LI-CODE&gt;&lt;P&gt;I need to be able to get the list of distinct EventIds from the df and pass them into that function. How can I do this? And should I be using the "dlt.create_table" decorator? Or should I use the "dlt.table" decorator? I'm finding almost no documentation about this online.&lt;/P&gt;</description>
    <pubDate>Fri, 08 Mar 2024 17:27:42 GMT</pubDate>
    <dc:creator>ac0</dc:creator>
    <dc:date>2024-03-08T17:27:42Z</dc:date>
    <item>
      <title>Can I have additional logic in a DLT notebook that is unrelated to directly creating DLTs?</title>
      <link>https://community.databricks.com/t5/data-engineering/can-i-have-additional-logic-in-a-dlt-notebook-that-is-unrelated/m-p/63092#M32169</link>
      <description>&lt;P&gt;I have an Azure Storage Data Table that I would like to update based on records that were just streamed into a Delta Live Table. Below is example code:&lt;/P&gt;&lt;LI-CODE lang="python"&gt;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/97035"&gt;@Dlt&lt;/a&gt;.create_table(
  comment="comment",
  table_properties={
    "pipelines.autoOptimize.managed": "true"
  },
  partition_cols = ["EventId"]
)
def live_table_test():
  return (
    spark.readStream.format("cloudFiles") 
        .options(**cloudfile) 
        .load(upload_path)
        .selectExpr("*", "_metadata.file_path as file_path")
  )&lt;/LI-CODE&gt;&lt;P&gt;I want to update an Azure Storage Data Table based on records that were just streamed into that table. Would something like this work?&lt;/P&gt;&lt;LI-CODE lang="python"&gt;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/97035"&gt;@Dlt&lt;/a&gt;.create_table(
  comment="comment",
  table_properties={
    "pipelines.autoOptimize.managed": "true"
  },
  partition_cols = ["EventId"]
)
def live_table_test():
  df = (
    spark.readStream.format("cloudFiles") 
        .options(**cloudfile) 
        .load(upload_path)
        .selectExpr("*", "_metadata.file_path as file_path")
  )
  update_azure_table(df)
  return df&lt;/LI-CODE&gt;&lt;P&gt;I need to be able to get the list of distinct EventIds from the df and pass them into that function. How can I do this? And should I be using the "dlt.create_table" decorator? Or should I use the "dlt.table" decorator? I'm finding almost no documentation about this online.&lt;/P&gt;</description>
      <pubDate>Fri, 08 Mar 2024 17:27:42 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/can-i-have-additional-logic-in-a-dlt-notebook-that-is-unrelated/m-p/63092#M32169</guid>
      <dc:creator>ac0</dc:creator>
      <dc:date>2024-03-08T17:27:42Z</dc:date>
    </item>
    <item>
      <title>Re: Can I have additional logic in a DLT notebook that is unrelated to directly creating DLTs?</title>
      <link>https://community.databricks.com/t5/data-engineering/can-i-have-additional-logic-in-a-dlt-notebook-that-is-unrelated/m-p/63100#M32172</link>
      <description>&lt;P&gt;Hello,&lt;/P&gt;
&lt;P&gt;Thank you for reaching out with your questions. I'm Raphael, and I'm here to assist you.&lt;/P&gt;
&lt;P class="lia-indent-padding-left-30px"&gt;&lt;FONT color="#999999"&gt;&lt;EM&gt;I want to update an Azure Storage Data Table based on records that were just streamed into that table. Would something like this work?&lt;/EM&gt;&lt;/FONT&gt;&lt;/P&gt;
&lt;P&gt;Implementing `update_azure_table(df)` directly in your workflow should not immediately cause errors. However, this practice is not typically recommended due to potential future complications. Since DLT manages the `&lt;STRONG&gt;live_table_test&lt;/STRONG&gt;` stream but does not oversee the `&lt;STRONG&gt;update_azure_table&lt;/STRONG&gt;(df)` query, discrepancies in DLT management could lead to unintended consequences, such as premature cluster termination.&lt;/P&gt;
&lt;P&gt;&lt;STRONG&gt;To effectively update two downstream streaming tables within your DLT pipeline, consider the following approaches:&lt;/STRONG&gt;&lt;/P&gt;
&lt;P&gt;1. DLT Tables: Define both downstream targets as DLT tables using separate `@dlt.table` functions for each.&lt;/P&gt;
&lt;P&gt;2. Non-DLT Environment: Execute `update_azure_table` outside the DLT-managed environment. Utilizing your downstream DLT table as a source for updates is feasible as well as using your original Autoloader query.&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;DIV id="bodyDisplay" class="lia-message-body lia-component-message-view-widget-body lia-component-body-signature-highlight-escalation lia-component-message-view-widget-body-signature-highlight-escalation"&gt;
&lt;DIV class="lia-message-body-content"&gt;
&lt;P class="lia-indent-padding-left-30px"&gt;&lt;EM&gt;&lt;FONT color="#999999"&gt;And should I be using the "dlt.create_table" decorator? Or should I use the "dlt.table" decorator? I'm finding almost no documentation about this online.&lt;/FONT&gt;&lt;/EM&gt;&lt;/P&gt;
&lt;/DIV&gt;
&lt;/DIV&gt;
&lt;P&gt;&lt;STRONG&gt;You should employ the `@dlt.table` decorator for defining tables within a DLT pipeline&lt;/STRONG&gt;. This decorator is essential for specifying tables to be managed by DLT. For documentation and examples, you may find&amp;nbsp;&lt;A href="https://docs.databricks.com/en/delta-live-tables/python-ref.html#python-delta-live-tables-properties" target="_blank" rel="noopener"&gt;Delta Live Tables Python Reference&lt;/A&gt; useful:&lt;/P&gt;
&lt;P class="lia-indent-padding-left-30px"&gt;&lt;FONT color="#999999"&gt;&lt;EM&gt;I need to be able to get the list of distinct EventIds from the df and pass them into that function. How can I do this?&lt;/EM&gt;&lt;/FONT&gt;&lt;/P&gt;
&lt;P&gt;Streaming queries typically do not maintain state, which means performing distinct operations across the entire table is not straightforward due to the lack of defined boundaries.&lt;/P&gt;
&lt;P&gt;To introduce state into your streaming query, you might need to apply watermarking and select distinct EventIds within specified watermarks or data segments. A detailed explanation and guide for implementing such functionality can be found in the &lt;A title="Structured Streaming Programming Guide" href="https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#window-operations-on-event-time" target="_self"&gt;Structured Streaming Programming Guide&lt;/A&gt;&amp;nbsp;on the Apache Spark documentation site.&lt;/P&gt;
&lt;DIV&gt;
&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;
&lt;DIV&gt;&lt;SPAN&gt;I've created a sample scenario in which I've selected distinct Models for a sample car dataset given a sample timestamp column.&lt;/SPAN&gt;&lt;/DIV&gt;
&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;
&lt;DIV&gt;&lt;LI-CODE lang="python"&gt;from pyspark.sql import SparkSession
from pyspark.sql.functions import col, window, collect_set

# Sample readStream (assuming a source that provides eventTime and eventId, like Kafka, file, etc.)
# This part of the code is just for illustration. Replace it with your actual stream source
streamingInputDF = (
    spark 
    .readStream 
    .option("withEventTimeOrder", "true") # Apply this if the initial snapshot should be processed with event time order.
    .format("delta") 
    .table("balogo.delta_managed.df_car")
    .withWatermark("ManufactureDate", "30 days") # Apply watermarking to define state
)

# Perform an aggregation to gather distinct event IDs within each window of time
aggregatedDF = streamingInputDF.groupBy(
    window(col("ManufactureDate"), "30 days")
).agg(
    collect_set("DriveType").alias("DistinctDriveTypeValues")
)

# Is needed you can define Define the foreachBatch function to process each micro-batch DataFrame
def process_batch(batch, epoch_id):
    # Example: Save the distinct event IDs to a database, write to a file, etc.
    (
        batch
        .select("window.start", "window.end", "DistinctDriveTypeValues")
        .write
        .format("delta")
        .mode("append")
        .option("checkpointLocation", "dbfs:/tmp/balogo/distinctStreamingExample/_checkpoints/")
        .saveAsTable("balogo.delta_managed.distinct_drive_types")
    )
    
# Apply the foreachBatch operation to process each micro-batch using the defined function
query = aggregatedDF.writeStream.foreachBatch(process_batch).start()

# Note: Adjust the code as necessary, this is just a sample reference that I've created for testing &lt;/LI-CODE&gt;&lt;/DIV&gt;
&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;
&lt;DIV&gt;&lt;SPAN&gt;Some rows I took from the output table as a sample:&lt;/SPAN&gt;&lt;/DIV&gt;
&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;
&lt;DIV&gt;
&lt;TABLE border="1" width="100%"&gt;
&lt;TBODY&gt;
&lt;TR&gt;
&lt;TD width="33.333333333333336%" height="30px"&gt;Start&lt;/TD&gt;
&lt;TD width="33.333333333333336%" height="30px"&gt;End&lt;/TD&gt;
&lt;TD width="33.333333333333336%" height="30px"&gt;DistinctModels&lt;/TD&gt;
&lt;/TR&gt;
&lt;TR&gt;
&lt;TD width="33.333333333333336%" height="30px"&gt;2005-01-27T00:00:00.000+00:00&lt;/TD&gt;
&lt;TD width="33.333333333333336%" height="30px"&gt;2005-02-26T00:00:00.000+00:00&lt;/TD&gt;
&lt;TD width="33.333333333333336%" height="30px"&gt;["Model H"]&lt;/TD&gt;
&lt;/TR&gt;
&lt;TR&gt;
&lt;TD width="33.333333333333336%" height="30px"&gt;2005-02-26T00:00:00.000+00:00&lt;/TD&gt;
&lt;TD width="33.333333333333336%" height="30px"&gt;2005-03-28T00:00:00.000+00:00&lt;/TD&gt;
&lt;TD width="33.333333333333336%" height="30px"&gt;["Model I","Model G","Model H"]&lt;/TD&gt;
&lt;/TR&gt;
&lt;TR&gt;
&lt;TD width="33.333333333333336%" height="30px"&gt;2005-05-27T00:00:00.000+00:00&lt;/TD&gt;
&lt;TD width="33.333333333333336%" height="30px"&gt;2005-06-26T00:00:00.000+00:00&lt;/TD&gt;
&lt;TD width="33.333333333333336%" height="30px"&gt;["Model F","Model E"]&lt;/TD&gt;
&lt;/TR&gt;
&lt;/TBODY&gt;
&lt;/TABLE&gt;
&lt;/DIV&gt;
&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;
&lt;DIV&gt;&lt;SPAN&gt;I hope these insights are helpful to you. Should you have further questions or require additional clarification, please don't hesitate to ask&amp;nbsp;&lt;span class="lia-unicode-emoji" title=":slightly_smiling_face:"&gt;🙂&lt;/span&gt;&lt;/SPAN&gt;&lt;/DIV&gt;
&lt;/DIV&gt;
&lt;P&gt;Best regards,&lt;BR /&gt;-Raphael&lt;/P&gt;</description>
      <pubDate>Sat, 09 Mar 2024 00:03:46 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/can-i-have-additional-logic-in-a-dlt-notebook-that-is-unrelated/m-p/63100#M32172</guid>
      <dc:creator>raphaelblg</dc:creator>
      <dc:date>2024-03-09T00:03:46Z</dc:date>
    </item>
    <item>
      <title>Re: Can I have additional logic in a DLT notebook that is unrelated to directly creating DLTs?</title>
      <link>https://community.databricks.com/t5/data-engineering/can-i-have-additional-logic-in-a-dlt-notebook-that-is-unrelated/m-p/63101#M32173</link>
      <description>&lt;P&gt;Hello,&lt;/P&gt;
&lt;P&gt;Thank you for reaching out with your questions. I'm Raphael, and I'm here to assist you.&lt;/P&gt;
&lt;P class="lia-indent-padding-left-30px"&gt;&lt;FONT color="#999999"&gt;&lt;EM&gt;I want to update an Azure Storage Data Table based on records that were just streamed into that table. Would something like this work?&lt;/EM&gt;&lt;/FONT&gt;&lt;/P&gt;
&lt;P&gt;Implementing `update_azure_table(df)` directly in your workflow should not immediately cause errors. However, this practice is not typically recommended due to potential future complications. Since DLT manages the `&lt;STRONG&gt;live_table_test&lt;/STRONG&gt;` stream but does not oversee the `&lt;STRONG&gt;update_azure_table&lt;/STRONG&gt;(df)` query, discrepancies in DLT management could lead to unintended consequences, such as premature cluster termination.&lt;/P&gt;
&lt;P&gt;&lt;STRONG&gt;To effectively update two downstream streaming tables within your DLT pipeline, consider the following approaches:&lt;/STRONG&gt;&lt;/P&gt;
&lt;P&gt;1. DLT Tables: Define both downstream targets as DLT tables using separate `@dlt.table` functions for each.&lt;/P&gt;
&lt;P&gt;2. Non-DLT Environment: Execute `update_azure_table` outside the DLT-managed environment. Utilizing your downstream DLT table as a source for updates is feasible as well as using your original Autoloader query.&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;DIV id="bodyDisplay" class="lia-message-body lia-component-message-view-widget-body lia-component-body-signature-highlight-escalation lia-component-message-view-widget-body-signature-highlight-escalation"&gt;
&lt;DIV class="lia-message-body-content"&gt;
&lt;P class="lia-indent-padding-left-30px"&gt;&lt;EM&gt;&lt;FONT color="#999999"&gt;And should I be using the "dlt.create_table" decorator? Or should I use the "dlt.table" decorator? I'm finding almost no documentation about this online.&lt;/FONT&gt;&lt;/EM&gt;&lt;/P&gt;
&lt;/DIV&gt;
&lt;/DIV&gt;
&lt;P&gt;&lt;STRONG&gt;You should employ the `@dlt.table` decorator for defining tables within a DLT pipeline&lt;/STRONG&gt;. This decorator is essential for specifying tables to be managed by DLT. For documentation and examples, you may find&amp;nbsp;&lt;A href="https://docs.databricks.com/en/delta-live-tables/python-ref.html#python-delta-live-tables-properties" target="_blank" rel="noopener"&gt;Delta Live Tables Python Reference&lt;/A&gt; useful:&lt;/P&gt;
&lt;P class="lia-indent-padding-left-30px"&gt;&lt;FONT color="#999999"&gt;&lt;EM&gt;I need to be able to get the list of distinct EventIds from the df and pass them into that function. How can I do this?&lt;/EM&gt;&lt;/FONT&gt;&lt;/P&gt;
&lt;P&gt;Streaming queries typically do not maintain state, which means performing distinct operations across the entire table is not straightforward due to the lack of defined boundaries.&lt;/P&gt;
&lt;P&gt;To introduce state into your streaming query, you might need to apply watermarking and select distinct EventIds within specified watermarks or data segments. A detailed explanation and guide for implementing such functionality can be found in the &lt;A title="Structured Streaming Programming Guide" href="https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#window-operations-on-event-time" target="_self"&gt;Structured Streaming Programming Guide&lt;/A&gt;&amp;nbsp;on the Apache Spark documentation site.&lt;/P&gt;
&lt;DIV&gt;
&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;
&lt;DIV&gt;&lt;SPAN&gt;I've created a sample scenario in which I've selected distinct Models for a sample car dataset given a sample timestamp column.&lt;/SPAN&gt;&lt;/DIV&gt;
&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;
&lt;DIV&gt;&lt;LI-CODE lang="python"&gt;from pyspark.sql import SparkSession
from pyspark.sql.functions import col, window, collect_set

# Sample readStream (assuming a source that provides eventTime and eventId, like Kafka, file, etc.)
# This part of the code is just for illustration. Replace it with your actual stream source
streamingInputDF = (
    spark 
    .readStream 
    .option("withEventTimeOrder", "true") # Apply this if the initial snapshot should be processed with event time order.
    .format("delta") 
    .table("balogo.delta_managed.df_car")
    .withWatermark("ManufactureDate", "30 days") # Apply watermarking to define state
)

# Perform an aggregation to gather distinct event IDs within each window of time
aggregatedDF = streamingInputDF.groupBy(
    window(col("ManufactureDate"), "30 days")
).agg(
    collect_set("DriveType").alias("DistinctDriveTypeValues")
)

# Is needed you can define Define the foreachBatch function to process each micro-batch DataFrame
def process_batch(batch, epoch_id):
    # Example: Save the distinct event IDs to a database, write to a file, etc.
    (
        batch
        .select("window.start", "window.end", "DistinctDriveTypeValues")
        .write
        .format("delta")
        .mode("append")
        .option("checkpointLocation", "dbfs:/tmp/balogo/distinctStreamingExample/_checkpoints/")
        .saveAsTable("balogo.delta_managed.distinct_drive_types")
    )
    
# Apply the foreachBatch operation to process each micro-batch using the defined function
query = aggregatedDF.writeStream.foreachBatch(process_batch).start()

# Note: Adjust the code as necessary, this is just a sample reference that I've created for testing &lt;/LI-CODE&gt;&lt;/DIV&gt;
&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;
&lt;DIV&gt;&lt;SPAN&gt;Some rows I took from the output table as a sample:&lt;/SPAN&gt;&lt;/DIV&gt;
&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;
&lt;DIV&gt;
&lt;TABLE border="1" width="100%"&gt;
&lt;TBODY&gt;
&lt;TR&gt;
&lt;TD width="33.333333333333336%" height="30px"&gt;Start&lt;/TD&gt;
&lt;TD width="33.333333333333336%" height="30px"&gt;End&lt;/TD&gt;
&lt;TD width="33.333333333333336%" height="30px"&gt;DistinctModels&lt;/TD&gt;
&lt;/TR&gt;
&lt;TR&gt;
&lt;TD width="33.333333333333336%" height="30px"&gt;2005-01-27T00:00:00.000+00:00&lt;/TD&gt;
&lt;TD width="33.333333333333336%" height="30px"&gt;2005-02-26T00:00:00.000+00:00&lt;/TD&gt;
&lt;TD width="33.333333333333336%" height="30px"&gt;["Model H"]&lt;/TD&gt;
&lt;/TR&gt;
&lt;TR&gt;
&lt;TD width="33.333333333333336%" height="30px"&gt;2005-02-26T00:00:00.000+00:00&lt;/TD&gt;
&lt;TD width="33.333333333333336%" height="30px"&gt;2005-03-28T00:00:00.000+00:00&lt;/TD&gt;
&lt;TD width="33.333333333333336%" height="30px"&gt;["Model I","Model G","Model H"]&lt;/TD&gt;
&lt;/TR&gt;
&lt;TR&gt;
&lt;TD width="33.333333333333336%" height="30px"&gt;2005-05-27T00:00:00.000+00:00&lt;/TD&gt;
&lt;TD width="33.333333333333336%" height="30px"&gt;2005-06-26T00:00:00.000+00:00&lt;/TD&gt;
&lt;TD width="33.333333333333336%" height="30px"&gt;["Model F","Model E"]&lt;/TD&gt;
&lt;/TR&gt;
&lt;/TBODY&gt;
&lt;/TABLE&gt;
&lt;/DIV&gt;
&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;
&lt;DIV&gt;&lt;SPAN&gt;I hope these insights are helpful to you. Should you have further questions or require additional clarification, please don't hesitate to ask&amp;nbsp;&lt;span class="lia-unicode-emoji" title=":slightly_smiling_face:"&gt;🙂&lt;/span&gt;&lt;/SPAN&gt;&lt;/DIV&gt;
&lt;/DIV&gt;</description>
      <pubDate>Sat, 09 Mar 2024 00:12:10 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/can-i-have-additional-logic-in-a-dlt-notebook-that-is-unrelated/m-p/63101#M32173</guid>
      <dc:creator>raphaelblg</dc:creator>
      <dc:date>2024-03-09T00:12:10Z</dc:date>
    </item>
    <item>
      <title>Re: Can I have additional logic in a DLT notebook that is unrelated to directly creating DLTs?</title>
      <link>https://community.databricks.com/t5/data-engineering/can-i-have-additional-logic-in-a-dlt-notebook-that-is-unrelated/m-p/63104#M32174</link>
      <description>&lt;P&gt;Hi&amp;nbsp;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/97999"&gt;@ac0&lt;/a&gt;,&lt;/P&gt;
&lt;P&gt;Please check&amp;nbsp;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/97998"&gt;@raphaelblg&lt;/a&gt;&amp;nbsp;'s response and let us know if this helped to resolve your issue. If it did, please mark it as the accepted solution.&lt;/P&gt;</description>
      <pubDate>Sat, 09 Mar 2024 00:44:07 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/can-i-have-additional-logic-in-a-dlt-notebook-that-is-unrelated/m-p/63104#M32174</guid>
      <dc:creator>jose_gonzalez</dc:creator>
      <dc:date>2024-03-09T00:44:07Z</dc:date>
    </item>
  </channel>
</rss>

