<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic How to access bronze dlt in silver dlt in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/how-to-access-bronze-dlt-in-silver-dlt/m-p/100407#M40286</link>
    <description>&lt;P&gt;I have a job in Workflows thatt runs two DLT pipelines, one for Bronze_Transaction and on for Silver_Transaction. The reason for two DLT pipelines is because i want the tables to be created in bronze catalog and erp schema, and silver catalog and erp schema.&lt;BR /&gt;&lt;BR /&gt;The notebooks for each DLT are:&lt;BR /&gt;Bronze:&lt;BR /&gt;&lt;BR /&gt;&lt;/P&gt;&lt;DIV&gt;&lt;PRE&gt;&lt;SPAN&gt;import&lt;/SPAN&gt;&lt;SPAN&gt; dlt&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;from&lt;/SPAN&gt;&lt;SPAN&gt; pyspark.sql &lt;/SPAN&gt;&lt;SPAN&gt;import&lt;/SPAN&gt;&lt;SPAN&gt; SparkSession&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;from&lt;/SPAN&gt;&lt;SPAN&gt; pyspark.sql.functions &lt;/SPAN&gt;&lt;SPAN&gt;import&lt;/SPAN&gt;&lt;SPAN&gt; col, lit, expr&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;from&lt;/SPAN&gt;&lt;SPAN&gt; pyspark.sql.functions &lt;/SPAN&gt;&lt;SPAN&gt;import&lt;/SPAN&gt; &lt;SPAN&gt;round&lt;/SPAN&gt;&lt;SPAN&gt;, unix_timestamp, current_timestamp&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;from&lt;/SPAN&gt;&lt;SPAN&gt; pyspark.sql &lt;/SPAN&gt;&lt;SPAN&gt;import&lt;/SPAN&gt;&lt;SPAN&gt; functions &lt;/SPAN&gt;&lt;SPAN&gt;as&lt;/SPAN&gt;&lt;SPAN&gt; F&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;from&lt;/SPAN&gt;&lt;SPAN&gt; pyspark.sql.types &lt;/SPAN&gt;&lt;SPAN&gt;import&lt;/SPAN&gt; &lt;SPAN&gt;*&lt;BR /&gt;&lt;BR /&gt;&lt;BR /&gt;storage_account = 'xxxxxxxxx'&lt;BR /&gt;container = 'xxxxxxxxxxxxx'&lt;BR /&gt;landing_path = 'Landing/Transactions'&lt;BR /&gt;landing_json_path = f"abfss://{container}@{storage_account}.dfs.core.windows.net/{landing_path}"&lt;BR /&gt;&lt;BR /&gt;# Transactions_Bronze&lt;BR /&gt;# Serve as the raw source for downstream processing in silver or gold tables in the Delta Lakehouse.&lt;BR /&gt;&lt;BR /&gt;@dlt.table(&lt;BR /&gt;&amp;nbsp; &amp;nbsp; name = "Transactions_Bronze",&lt;BR /&gt;&amp;nbsp; &amp;nbsp; table_properties={&lt;BR /&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "delta.enableChangeDataFeed": "true",&lt;BR /&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "quality": "bronze"&lt;BR /&gt;&amp;nbsp; &amp;nbsp; }&lt;BR /&gt;)&lt;BR /&gt;&lt;BR /&gt;def Transactions_Bronze():&lt;BR /&gt;&amp;nbsp; &amp;nbsp; # Read data from JSON files&lt;BR /&gt;&amp;nbsp; &amp;nbsp; df = (spark.readStream.format("cloudFiles")&lt;BR /&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .option("cloudFiles.format", "json")&lt;BR /&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .option("inferSchema", True)&lt;BR /&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .option("cloudFiles.inferColumnTypes", "true")&lt;BR /&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .option("recursiveFileLookup", "true")&lt;BR /&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .load(landing_json_path)&lt;BR /&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; )&lt;BR /&gt;&lt;BR /&gt;&amp;nbsp; &amp;nbsp; # Add metdadata column for insertion time&lt;BR /&gt;&amp;nbsp; &amp;nbsp; df = df.withColumn("SDA_Inserted", F.date_format(current_timestamp(), "yyyy-mm-dd HH:mm:ss"))&lt;BR /&gt;&amp;nbsp; &amp;nbsp; return df&lt;BR /&gt;&lt;/SPAN&gt;&lt;/PRE&gt;&lt;P&gt;&lt;SPAN&gt;&lt;BR /&gt;SILVER:&lt;/SPAN&gt;&lt;/P&gt;&lt;DIV&gt;&lt;PRE&gt;&lt;SPAN&gt;import&lt;/SPAN&gt;&lt;SPAN&gt; dlt&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;from&lt;/SPAN&gt;&lt;SPAN&gt; pyspark.sql &lt;/SPAN&gt;&lt;SPAN&gt;import&lt;/SPAN&gt;&lt;SPAN&gt; SparkSession&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;from&lt;/SPAN&gt;&lt;SPAN&gt; pyspark.sql.functions &lt;/SPAN&gt;&lt;SPAN&gt;import&lt;/SPAN&gt;&lt;SPAN&gt; col, lit, expr&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;from&lt;/SPAN&gt;&lt;SPAN&gt; pyspark.sql.functions &lt;/SPAN&gt;&lt;SPAN&gt;import&lt;/SPAN&gt; &lt;SPAN&gt;round&lt;/SPAN&gt;&lt;SPAN&gt;, unix_timestamp, current_timestamp&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;from&lt;/SPAN&gt;&lt;SPAN&gt; pyspark.sql &lt;/SPAN&gt;&lt;SPAN&gt;import&lt;/SPAN&gt;&lt;SPAN&gt; functions &lt;/SPAN&gt;&lt;SPAN&gt;as&lt;/SPAN&gt;&lt;SPAN&gt; F&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;from&lt;/SPAN&gt;&lt;SPAN&gt; pyspark.sql.types &lt;/SPAN&gt;&lt;SPAN&gt;import&lt;/SPAN&gt; &lt;SPAN&gt;*&lt;BR /&gt;&lt;BR /&gt;# Transactions_Silver&lt;BR /&gt;dlt.create_streaming_table(&lt;BR /&gt;&amp;nbsp; &amp;nbsp; name="Transactions_Silver", &amp;nbsp;# No database qualifier in the table name&lt;BR /&gt;&amp;nbsp; &amp;nbsp; table_properties={&lt;BR /&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "quality": "silver"&lt;BR /&gt;&amp;nbsp; &amp;nbsp; }&lt;BR /&gt;)&lt;BR /&gt;&lt;BR /&gt;# Define the path to the external table&lt;BR /&gt;bronze_df = (&lt;BR /&gt;&amp;nbsp; &amp;nbsp; spark.readStream&lt;BR /&gt;&amp;nbsp; &amp;nbsp; .format("delta")&lt;BR /&gt;&amp;nbsp; &amp;nbsp; .table("bronze.erp.Transactions_Bronze")&lt;BR /&gt;)&lt;BR /&gt;&lt;BR /&gt;# Apply changes from bronze to silver&lt;BR /&gt;dlt.apply_changes(&lt;BR /&gt;&amp;nbsp; &amp;nbsp; source=bronze_df, #first i tried to just write bronze.erp.transactions_bronze, but that failed to&lt;BR /&gt;&amp;nbsp; &amp;nbsp; target="Transactions_Silver",&lt;BR /&gt;&amp;nbsp; &amp;nbsp; keys=["Company", "VoucherType", "AccountingYear", "VoucherNo", "RowNo"],&lt;BR /&gt;&amp;nbsp; &amp;nbsp; stored_as_scd_type="2",&lt;BR /&gt;&amp;nbsp; &amp;nbsp; sequence_by="Inserted"&lt;BR /&gt;)&lt;BR /&gt;&lt;BR /&gt;&lt;/SPAN&gt;&lt;/PRE&gt;&lt;DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;The bronze works without any issue, but the workflows fails on silver dlt: The error message I get is:&lt;BR /&gt;"timestamp": "2024-11-29T09:03:48.672Z",&lt;BR /&gt;"message": "Failed to resolve flow: 'transactions_silver'.",&lt;BR /&gt;"level": "ERROR",&lt;BR /&gt;"error": {&lt;BR /&gt;"exceptions": [&lt;BR /&gt;{&lt;BR /&gt;"class_name": "pyspark.errors.exceptions.base.PySparkAttributeError",&lt;BR /&gt;"message": "Traceback (most recent call last):\npyspark.errors.exceptions.base.PySparkAttributeError: [ATTRIBUTE_NOT_SUPPORTED] Attribute `_get_object_id` is not supported.",&lt;BR /&gt;"error_class": "ATTRIBUTE_NOT_SUPPORTED"&lt;BR /&gt;}&lt;BR /&gt;],&lt;BR /&gt;"fatal": true&lt;BR /&gt;},&lt;BR /&gt;"details": {&lt;BR /&gt;"flow_progress": {&lt;BR /&gt;"status": "FAILED"&lt;BR /&gt;}&lt;BR /&gt;&lt;BR /&gt;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;Appreciative for all help I can get with this one&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;</description>
    <pubDate>Fri, 29 Nov 2024 09:28:42 GMT</pubDate>
    <dc:creator>issa</dc:creator>
    <dc:date>2024-11-29T09:28:42Z</dc:date>
    <item>
      <title>How to access bronze dlt in silver dlt</title>
      <link>https://community.databricks.com/t5/data-engineering/how-to-access-bronze-dlt-in-silver-dlt/m-p/100407#M40286</link>
      <description>&lt;P&gt;I have a job in Workflows thatt runs two DLT pipelines, one for Bronze_Transaction and on for Silver_Transaction. The reason for two DLT pipelines is because i want the tables to be created in bronze catalog and erp schema, and silver catalog and erp schema.&lt;BR /&gt;&lt;BR /&gt;The notebooks for each DLT are:&lt;BR /&gt;Bronze:&lt;BR /&gt;&lt;BR /&gt;&lt;/P&gt;&lt;DIV&gt;&lt;PRE&gt;&lt;SPAN&gt;import&lt;/SPAN&gt;&lt;SPAN&gt; dlt&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;from&lt;/SPAN&gt;&lt;SPAN&gt; pyspark.sql &lt;/SPAN&gt;&lt;SPAN&gt;import&lt;/SPAN&gt;&lt;SPAN&gt; SparkSession&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;from&lt;/SPAN&gt;&lt;SPAN&gt; pyspark.sql.functions &lt;/SPAN&gt;&lt;SPAN&gt;import&lt;/SPAN&gt;&lt;SPAN&gt; col, lit, expr&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;from&lt;/SPAN&gt;&lt;SPAN&gt; pyspark.sql.functions &lt;/SPAN&gt;&lt;SPAN&gt;import&lt;/SPAN&gt; &lt;SPAN&gt;round&lt;/SPAN&gt;&lt;SPAN&gt;, unix_timestamp, current_timestamp&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;from&lt;/SPAN&gt;&lt;SPAN&gt; pyspark.sql &lt;/SPAN&gt;&lt;SPAN&gt;import&lt;/SPAN&gt;&lt;SPAN&gt; functions &lt;/SPAN&gt;&lt;SPAN&gt;as&lt;/SPAN&gt;&lt;SPAN&gt; F&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;from&lt;/SPAN&gt;&lt;SPAN&gt; pyspark.sql.types &lt;/SPAN&gt;&lt;SPAN&gt;import&lt;/SPAN&gt; &lt;SPAN&gt;*&lt;BR /&gt;&lt;BR /&gt;&lt;BR /&gt;storage_account = 'xxxxxxxxx'&lt;BR /&gt;container = 'xxxxxxxxxxxxx'&lt;BR /&gt;landing_path = 'Landing/Transactions'&lt;BR /&gt;landing_json_path = f"abfss://{container}@{storage_account}.dfs.core.windows.net/{landing_path}"&lt;BR /&gt;&lt;BR /&gt;# Transactions_Bronze&lt;BR /&gt;# Serve as the raw source for downstream processing in silver or gold tables in the Delta Lakehouse.&lt;BR /&gt;&lt;BR /&gt;@dlt.table(&lt;BR /&gt;&amp;nbsp; &amp;nbsp; name = "Transactions_Bronze",&lt;BR /&gt;&amp;nbsp; &amp;nbsp; table_properties={&lt;BR /&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "delta.enableChangeDataFeed": "true",&lt;BR /&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "quality": "bronze"&lt;BR /&gt;&amp;nbsp; &amp;nbsp; }&lt;BR /&gt;)&lt;BR /&gt;&lt;BR /&gt;def Transactions_Bronze():&lt;BR /&gt;&amp;nbsp; &amp;nbsp; # Read data from JSON files&lt;BR /&gt;&amp;nbsp; &amp;nbsp; df = (spark.readStream.format("cloudFiles")&lt;BR /&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .option("cloudFiles.format", "json")&lt;BR /&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .option("inferSchema", True)&lt;BR /&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .option("cloudFiles.inferColumnTypes", "true")&lt;BR /&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .option("recursiveFileLookup", "true")&lt;BR /&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .load(landing_json_path)&lt;BR /&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; )&lt;BR /&gt;&lt;BR /&gt;&amp;nbsp; &amp;nbsp; # Add metdadata column for insertion time&lt;BR /&gt;&amp;nbsp; &amp;nbsp; df = df.withColumn("SDA_Inserted", F.date_format(current_timestamp(), "yyyy-mm-dd HH:mm:ss"))&lt;BR /&gt;&amp;nbsp; &amp;nbsp; return df&lt;BR /&gt;&lt;/SPAN&gt;&lt;/PRE&gt;&lt;P&gt;&lt;SPAN&gt;&lt;BR /&gt;SILVER:&lt;/SPAN&gt;&lt;/P&gt;&lt;DIV&gt;&lt;PRE&gt;&lt;SPAN&gt;import&lt;/SPAN&gt;&lt;SPAN&gt; dlt&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;from&lt;/SPAN&gt;&lt;SPAN&gt; pyspark.sql &lt;/SPAN&gt;&lt;SPAN&gt;import&lt;/SPAN&gt;&lt;SPAN&gt; SparkSession&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;from&lt;/SPAN&gt;&lt;SPAN&gt; pyspark.sql.functions &lt;/SPAN&gt;&lt;SPAN&gt;import&lt;/SPAN&gt;&lt;SPAN&gt; col, lit, expr&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;from&lt;/SPAN&gt;&lt;SPAN&gt; pyspark.sql.functions &lt;/SPAN&gt;&lt;SPAN&gt;import&lt;/SPAN&gt; &lt;SPAN&gt;round&lt;/SPAN&gt;&lt;SPAN&gt;, unix_timestamp, current_timestamp&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;from&lt;/SPAN&gt;&lt;SPAN&gt; pyspark.sql &lt;/SPAN&gt;&lt;SPAN&gt;import&lt;/SPAN&gt;&lt;SPAN&gt; functions &lt;/SPAN&gt;&lt;SPAN&gt;as&lt;/SPAN&gt;&lt;SPAN&gt; F&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;from&lt;/SPAN&gt;&lt;SPAN&gt; pyspark.sql.types &lt;/SPAN&gt;&lt;SPAN&gt;import&lt;/SPAN&gt; &lt;SPAN&gt;*&lt;BR /&gt;&lt;BR /&gt;# Transactions_Silver&lt;BR /&gt;dlt.create_streaming_table(&lt;BR /&gt;&amp;nbsp; &amp;nbsp; name="Transactions_Silver", &amp;nbsp;# No database qualifier in the table name&lt;BR /&gt;&amp;nbsp; &amp;nbsp; table_properties={&lt;BR /&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "quality": "silver"&lt;BR /&gt;&amp;nbsp; &amp;nbsp; }&lt;BR /&gt;)&lt;BR /&gt;&lt;BR /&gt;# Define the path to the external table&lt;BR /&gt;bronze_df = (&lt;BR /&gt;&amp;nbsp; &amp;nbsp; spark.readStream&lt;BR /&gt;&amp;nbsp; &amp;nbsp; .format("delta")&lt;BR /&gt;&amp;nbsp; &amp;nbsp; .table("bronze.erp.Transactions_Bronze")&lt;BR /&gt;)&lt;BR /&gt;&lt;BR /&gt;# Apply changes from bronze to silver&lt;BR /&gt;dlt.apply_changes(&lt;BR /&gt;&amp;nbsp; &amp;nbsp; source=bronze_df, #first i tried to just write bronze.erp.transactions_bronze, but that failed to&lt;BR /&gt;&amp;nbsp; &amp;nbsp; target="Transactions_Silver",&lt;BR /&gt;&amp;nbsp; &amp;nbsp; keys=["Company", "VoucherType", "AccountingYear", "VoucherNo", "RowNo"],&lt;BR /&gt;&amp;nbsp; &amp;nbsp; stored_as_scd_type="2",&lt;BR /&gt;&amp;nbsp; &amp;nbsp; sequence_by="Inserted"&lt;BR /&gt;)&lt;BR /&gt;&lt;BR /&gt;&lt;/SPAN&gt;&lt;/PRE&gt;&lt;DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;The bronze works without any issue, but the workflows fails on silver dlt: The error message I get is:&lt;BR /&gt;"timestamp": "2024-11-29T09:03:48.672Z",&lt;BR /&gt;"message": "Failed to resolve flow: 'transactions_silver'.",&lt;BR /&gt;"level": "ERROR",&lt;BR /&gt;"error": {&lt;BR /&gt;"exceptions": [&lt;BR /&gt;{&lt;BR /&gt;"class_name": "pyspark.errors.exceptions.base.PySparkAttributeError",&lt;BR /&gt;"message": "Traceback (most recent call last):\npyspark.errors.exceptions.base.PySparkAttributeError: [ATTRIBUTE_NOT_SUPPORTED] Attribute `_get_object_id` is not supported.",&lt;BR /&gt;"error_class": "ATTRIBUTE_NOT_SUPPORTED"&lt;BR /&gt;}&lt;BR /&gt;],&lt;BR /&gt;"fatal": true&lt;BR /&gt;},&lt;BR /&gt;"details": {&lt;BR /&gt;"flow_progress": {&lt;BR /&gt;"status": "FAILED"&lt;BR /&gt;}&lt;BR /&gt;&lt;BR /&gt;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;Appreciative for all help I can get with this one&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;</description>
      <pubDate>Fri, 29 Nov 2024 09:28:42 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/how-to-access-bronze-dlt-in-silver-dlt/m-p/100407#M40286</guid>
      <dc:creator>issa</dc:creator>
      <dc:date>2024-11-29T09:28:42Z</dc:date>
    </item>
    <item>
      <title>Re: How to access bronze dlt in silver dlt</title>
      <link>https://community.databricks.com/t5/data-engineering/how-to-access-bronze-dlt-in-silver-dlt/m-p/101004#M40511</link>
      <description>&lt;P&gt;&lt;BR /&gt;By looking at your code, I can suggest a few changes that you can try out.&lt;/P&gt;&lt;P&gt;Instead of spark.readStream, you can use bronze_df = dlt.read_stream("Transactions_Bronze").&lt;/P&gt;&lt;P&gt;Revised Code:&lt;/P&gt;&lt;P&gt;bronze_df = dlt.read_stream("Transactions_Bronze")&lt;BR /&gt;dlt.apply_changes(&lt;BR /&gt;source=bronze_df,&lt;BR /&gt;target="Transactions_Silver",&lt;BR /&gt;keys=["Company", "VoucherType", "AccountingYear", "VoucherNo", "RowNo"],&lt;BR /&gt;stored_as_scd_type="2",&lt;BR /&gt;sequence_by="Inserted"&lt;BR /&gt;)&lt;/P&gt;&lt;P&gt;&lt;BR /&gt;Additionally, you have created a column called SDA_Inserted in the bronze table, but in the silver table, the sequence_by parameter is based on the Inserted column. Make sure to correct the column name if there is no Inserted column already present in the data.&lt;/P&gt;</description>
      <pubDate>Thu, 05 Dec 2024 05:28:19 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/how-to-access-bronze-dlt-in-silver-dlt/m-p/101004#M40511</guid>
      <dc:creator>ozaaditya</dc:creator>
      <dc:date>2024-12-05T05:28:19Z</dc:date>
    </item>
    <item>
      <title>Re: How to access bronze dlt in silver dlt</title>
      <link>https://community.databricks.com/t5/data-engineering/how-to-access-bronze-dlt-in-silver-dlt/m-p/101032#M40516</link>
      <description>&lt;P&gt;Hi&amp;nbsp;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/134301"&gt;@issa&lt;/a&gt;&amp;nbsp;,&lt;/P&gt;&lt;P&gt;As suggested by&amp;nbsp;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/132044"&gt;@ozaaditya&lt;/a&gt;&amp;nbsp;, the issue is with the way you define the source parameter.&lt;BR /&gt;It cannot be a simple dataframe, but a DLT table or view.&lt;/P&gt;&lt;P&gt;Here is the code from &lt;A href="https://docs.databricks.com/en/delta-live-tables/cdc.html" target="_self"&gt;documentation&lt;/A&gt;:&lt;/P&gt;&lt;LI-CODE lang="python"&gt;import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  apply_as_truncates = expr("operation = 'TRUNCATE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = 1
)&lt;/LI-CODE&gt;&lt;P&gt;So in your case the code for source should look like:&lt;/P&gt;&lt;LI-CODE lang="python"&gt;@dlt.view
def transactions_bronze():
  return spark.readStream.table("bronze.erp.transactions_bronze")&lt;/LI-CODE&gt;&lt;P&gt;And then you pass "transaction_bronze" as a source.&lt;/P&gt;</description>
      <pubDate>Thu, 05 Dec 2024 08:18:36 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/how-to-access-bronze-dlt-in-silver-dlt/m-p/101032#M40516</guid>
      <dc:creator>filipniziol</dc:creator>
      <dc:date>2024-12-05T08:18:36Z</dc:date>
    </item>
    <item>
      <title>Re: How to access bronze dlt in silver dlt</title>
      <link>https://community.databricks.com/t5/data-engineering/how-to-access-bronze-dlt-in-silver-dlt/m-p/101453#M40671</link>
      <description>&lt;P&gt;Hi&amp;nbsp;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/117376"&gt;@filipniziol&lt;/a&gt;&amp;nbsp;and&amp;nbsp;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/132044"&gt;@ozaaditya&lt;/a&gt;,&lt;BR /&gt;&lt;BR /&gt;Thank you both for you input. I changed the code, since I figured that the SCD should be on the bronze layer and that i then should filter out open rows in silver.&lt;BR /&gt;&lt;BR /&gt;However, that dosent work.&lt;BR /&gt;&lt;BR /&gt;My idea was:&lt;BR /&gt;&lt;BR /&gt;Bronze layer:&lt;BR /&gt;&lt;BR /&gt;&lt;/P&gt;&lt;DIV&gt;&lt;PRE&gt;&lt;SPAN&gt;import&lt;/SPAN&gt;&lt;SPAN&gt; dlt&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;from&lt;/SPAN&gt;&lt;SPAN&gt; pyspark.sql.functions &lt;/SPAN&gt;&lt;SPAN&gt;import&lt;/SPAN&gt;&lt;SPAN&gt; col, lit, current_timestamp&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;from&lt;/SPAN&gt;&lt;SPAN&gt; pyspark.sql &lt;/SPAN&gt;&lt;SPAN&gt;import&lt;/SPAN&gt;&lt;SPAN&gt; functions &lt;/SPAN&gt;&lt;SPAN&gt;as&lt;/SPAN&gt;&lt;SPAN&gt; F&lt;BR /&gt;&lt;BR /&gt;@dlt.table(&lt;BR /&gt;&amp;nbsp; &amp;nbsp; name="Transactions_Bronze_Raw",&lt;BR /&gt;&amp;nbsp; &amp;nbsp; table_properties={&lt;BR /&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "delta.enableChangeDataFeed": "true",&lt;BR /&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "quality": "bronze"&lt;BR /&gt;&amp;nbsp; &amp;nbsp; }&lt;BR /&gt;)&lt;BR /&gt;&lt;BR /&gt;def Transactions_Bronze_Raw():&lt;BR /&gt;&amp;nbsp; &amp;nbsp; # Read streaming data from the landing path (source of data)&lt;BR /&gt;&amp;nbsp; &amp;nbsp; df = (&lt;BR /&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; spark.readStream.format("cloudFiles")&lt;BR /&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .option("cloudFiles.format", "json")&lt;BR /&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .option("inferSchema", True)&lt;BR /&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .option("cloudFiles.inferColumnTypes", "true")&lt;BR /&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .option("recursiveFileLookup", "true")&lt;BR /&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .load(landing_json_path)&lt;BR /&gt;&amp;nbsp; &amp;nbsp; )&lt;BR /&gt;&lt;BR /&gt;&amp;nbsp; &amp;nbsp; # Add metadata for inserted time&lt;BR /&gt;&amp;nbsp; &amp;nbsp; df = df.withColumn("SDA_Inserted", F.date_format(F.current_timestamp(), "yyyy-MM-dd HH:mm:ss"))&lt;BR /&gt;&amp;nbsp; &amp;nbsp; return df&lt;BR /&gt;&lt;BR /&gt;# Transactions_Bronze&lt;BR /&gt;dlt.create_streaming_table(&lt;BR /&gt;&amp;nbsp; &amp;nbsp; name="Transactions_Bronze", &amp;nbsp;# No database qualifier in the table name&lt;BR /&gt;&amp;nbsp; &amp;nbsp; table_properties={&lt;BR /&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "quality": "silver",&lt;BR /&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "delta.enableChangeDataFeed": "true"&lt;BR /&gt;&amp;nbsp; &amp;nbsp; }&lt;BR /&gt;)&lt;BR /&gt;&lt;BR /&gt;# Apply change data capture logic to handle SCD Type 2 for the streaming table&lt;BR /&gt;dlt.apply_changes(&lt;BR /&gt;&amp;nbsp; &amp;nbsp; target="Transactions_Bronze", &amp;nbsp;&lt;BR /&gt;&amp;nbsp; &amp;nbsp; source="Transactions_Bronze_Raw", &amp;nbsp;&lt;BR /&gt;&amp;nbsp; &amp;nbsp; keys=["Company", "VoucherType", "AccountingYear", "VoucherNo", "RowNo"],&lt;BR /&gt;&amp;nbsp; &amp;nbsp; sequence_by=F.col("SDA_Inserted"), &amp;nbsp;&lt;BR /&gt;&amp;nbsp; &amp;nbsp; stored_as_scd_type=2 &amp;nbsp;&lt;BR /&gt;)&lt;BR /&gt;&lt;BR /&gt;&lt;/SPAN&gt;&lt;/PRE&gt;&lt;P&gt;&lt;SPAN&gt;Silver layer:&lt;BR /&gt;&lt;/SPAN&gt;&lt;/P&gt;&lt;DIV&gt;&lt;PRE&gt;&lt;SPAN&gt;@&lt;/SPAN&gt;&lt;SPAN&gt;dlt&lt;/SPAN&gt;&lt;SPAN&gt;.&lt;/SPAN&gt;&lt;SPAN&gt;table&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;name&lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt;"Transactions_Silver"&lt;/SPAN&gt;&lt;SPAN&gt;,&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;table_properties&lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt;{&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;"quality"&lt;/SPAN&gt;&lt;SPAN&gt;: &lt;/SPAN&gt;&lt;SPAN&gt;"silver"&lt;/SPAN&gt;&lt;SPAN&gt;,&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;"delta.enableChangeDataFeed"&lt;/SPAN&gt;&lt;SPAN&gt;: &lt;/SPAN&gt;&lt;SPAN&gt;"true"&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; }&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;BR /&gt;&lt;BR /&gt;&lt;SPAN&gt;def&lt;/SPAN&gt; &lt;SPAN&gt;Transactions_Silver&lt;/SPAN&gt;&lt;SPAN&gt;():&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;# Read from the Bronze table&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; bronze_df &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt; spark.readStream.&lt;/SPAN&gt;&lt;SPAN&gt;format&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"delta"&lt;/SPAN&gt;&lt;SPAN&gt;).&lt;/SPAN&gt;&lt;SPAN&gt;table&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"sda_edw_bronze.erp.Transactions_Bronze"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;BR /&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;# Filter only open rows&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; silver_df &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt; bronze_df&lt;/SPAN&gt;&lt;SPAN&gt;.filter&lt;/SPAN&gt;&lt;SPAN&gt;(F.col(&lt;/SPAN&gt;&lt;SPAN&gt;"&lt;/SPAN&gt;&lt;SPAN&gt;__END_AT&lt;/SPAN&gt;&lt;SPAN&gt;"&lt;/SPAN&gt;&lt;SPAN&gt;).&lt;/SPAN&gt;&lt;SPAN&gt;isNull&lt;/SPAN&gt;&lt;SPAN&gt;())&lt;/SPAN&gt;&lt;BR /&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;# Add a new SDA_Inserted column&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; silver_df &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt; silver_df.&lt;/SPAN&gt;&lt;SPAN&gt;withColumn&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;"SDA_Inserted"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; F.&lt;/SPAN&gt;&lt;SPAN&gt;date_format&lt;/SPAN&gt;&lt;SPAN&gt;(F.&lt;/SPAN&gt;&lt;SPAN&gt;current_timestamp&lt;/SPAN&gt;&lt;SPAN&gt;(), &lt;/SPAN&gt;&lt;SPAN&gt;"yyyy-MM-dd HH:mm:ss"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; )&lt;/SPAN&gt;&lt;BR /&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;# Return the transformed DataFrame&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;return&lt;/SPAN&gt;&lt;SPAN&gt; silver_df&lt;/SPAN&gt;&lt;/PRE&gt;&lt;/DIV&gt;&lt;P&gt;&amp;nbsp;The issue is that the bronze layer fails,&amp;nbsp;&lt;BR /&gt;"message": "Update ff5d14 is FAILED.",&lt;BR /&gt;"level": "ERROR",&lt;BR /&gt;"error": {&lt;BR /&gt;"exceptions": [&lt;BR /&gt;{&lt;BR /&gt;"message": "INTERNAL_ERROR: Pipeline cluster is not healthy."&lt;BR /&gt;}&lt;BR /&gt;]&lt;BR /&gt;&lt;BR /&gt;But the cluster is fine, because I can run other notebooks without issues.&lt;BR /&gt;&lt;BR /&gt;Do you see any issue with my code for the bronze layer?&lt;BR /&gt;&lt;BR /&gt;First I wanted to create a view (as you suggested&amp;nbsp;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/117376"&gt;@filipniziol&lt;/a&gt;&amp;nbsp;for the previous issue) of the input data and use that as a source for the bronze table, but that didnt work either. The pipeline ran without any issue, but the data dint get ingested. This is how that ide looked:&lt;BR /&gt;&lt;BR /&gt;&lt;/P&gt;&lt;DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;# Define a view as the source&lt;/SPAN&gt;&lt;/DIV&gt;&lt;PRE&gt;&lt;SPAN&gt;@&lt;/SPAN&gt;&lt;SPAN&gt;dlt&lt;/SPAN&gt;&lt;SPAN&gt;.&lt;/SPAN&gt;&lt;SPAN&gt;view&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;def&lt;/SPAN&gt; &lt;SPAN&gt;Transactions_Bronze_View&lt;/SPAN&gt;&lt;SPAN&gt;():&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;return&lt;/SPAN&gt;&lt;SPAN&gt; (&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; spark.readStream.&lt;/SPAN&gt;&lt;SPAN&gt;format&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"cloudFiles"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .&lt;/SPAN&gt;&lt;SPAN&gt;option&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"cloudFiles.format"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;"json"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .&lt;/SPAN&gt;&lt;SPAN&gt;option&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"inferSchema"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;True&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .&lt;/SPAN&gt;&lt;SPAN&gt;option&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"cloudFiles.inferColumnTypes"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;"true"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .&lt;/SPAN&gt;&lt;SPAN&gt;option&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"recursiveFileLookup"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;"true"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .&lt;/SPAN&gt;&lt;SPAN&gt;load&lt;/SPAN&gt;&lt;SPAN&gt;(landing_json_path)&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .&lt;/SPAN&gt;&lt;SPAN&gt;withColumn&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"SDA_Inserted"&lt;/SPAN&gt;&lt;SPAN&gt;, F.&lt;/SPAN&gt;&lt;SPAN&gt;date_format&lt;/SPAN&gt;&lt;SPAN&gt;(F.&lt;/SPAN&gt;&lt;SPAN&gt;current_timestamp&lt;/SPAN&gt;&lt;SPAN&gt;(), &lt;/SPAN&gt;&lt;SPAN&gt;"yyyy-MM-dd HH:mm:ss"&lt;/SPAN&gt;&lt;SPAN&gt;)) &amp;nbsp;&lt;/SPAN&gt;&lt;SPAN&gt;# Add metadata&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; )&lt;BR /&gt;&lt;BR /&gt;# Transactions_Bronze&lt;BR /&gt;dlt.create_streaming_table(&lt;BR /&gt;&amp;nbsp; &amp;nbsp; name="Transactions_Bronze", &amp;nbsp;# No database qualifier in the table name&lt;BR /&gt;&amp;nbsp; &amp;nbsp; table_properties={&lt;BR /&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "quality": "bronze",&lt;BR /&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "delta.enableChangeDataFeed": "true"&lt;BR /&gt;&amp;nbsp; &amp;nbsp; }&lt;BR /&gt;)&lt;BR /&gt;&lt;BR /&gt;# Use the view as the source for SCD2 table creation&lt;BR /&gt;dlt.apply_changes(&lt;BR /&gt;&amp;nbsp; &amp;nbsp; target="Transactions_Bronze",&lt;BR /&gt;&amp;nbsp; &amp;nbsp; source="Transactions_Bronze_View",&lt;BR /&gt;&amp;nbsp; &amp;nbsp; keys=["Company", "VoucherType", "AccountingYear", "VoucherNo", "RowNo"],&lt;BR /&gt;&amp;nbsp; &amp;nbsp; sequence_by=F.col("SDA_Inserted"), &amp;nbsp;# Timestamp column to sequence updates&lt;BR /&gt;&amp;nbsp; &amp;nbsp; stored_as_scd_type=2&lt;BR /&gt;)&lt;BR /&gt;&lt;/SPAN&gt;&lt;/PRE&gt;&lt;DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;/DIV&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;Would appreciate all help I can get on building the bronze table as a SCD2.&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;Thanks in advance!&lt;/P&gt;&lt;/DIV&gt;&lt;/DIV&gt;</description>
      <pubDate>Mon, 09 Dec 2024 10:47:46 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/how-to-access-bronze-dlt-in-silver-dlt/m-p/101453#M40671</guid>
      <dc:creator>issa</dc:creator>
      <dc:date>2024-12-09T10:47:46Z</dc:date>
    </item>
    <item>
      <title>Re: How to access bronze dlt in silver dlt</title>
      <link>https://community.databricks.com/t5/data-engineering/how-to-access-bronze-dlt-in-silver-dlt/m-p/102461#M41125</link>
      <description>&lt;P&gt;Hi&amp;nbsp;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/134301"&gt;@issa&lt;/a&gt;&amp;nbsp;,&lt;/P&gt;&lt;P&gt;Could you debug step by step.&lt;/P&gt;&lt;P&gt;Could you have a DLT pipeline that would do just this:&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;import dlt
from pyspark.sql.functions import col, lit, current_timestamp
from pyspark.sql import functions as F

@dlt.table(
    name="Transactions_Bronze_Raw",
    table_properties={
        "delta.enableChangeDataFeed": "true",
        "quality": "bronze"
    }
)

def Transactions_Bronze_Raw():
    # Read streaming data from the landing path (source of data)
    df = (
        spark.readStream.format("cloudFiles")
        .option("cloudFiles.format", "json")
        .option("inferSchema", True)
        .option("cloudFiles.inferColumnTypes", "true")
        .option("recursiveFileLookup", "true")
        .load(landing_json_path)
    )

    # Add metadata for inserted time
    df = df.withColumn("SDA_Inserted", F.date_format(F.current_timestamp(), "yyyy-MM-dd HH:mm:ss"))
    return df&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;Is the table Transactions_Bronze_Raw correctly created?&lt;BR /&gt;&lt;BR /&gt;&lt;/P&gt;</description>
      <pubDate>Wed, 18 Dec 2024 10:31:16 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/how-to-access-bronze-dlt-in-silver-dlt/m-p/102461#M41125</guid>
      <dc:creator>filipniziol</dc:creator>
      <dc:date>2024-12-18T10:31:16Z</dc:date>
    </item>
    <item>
      <title>Re: How to access bronze dlt in silver dlt</title>
      <link>https://community.databricks.com/t5/data-engineering/how-to-access-bronze-dlt-in-silver-dlt/m-p/102471#M41128</link>
      <description>&lt;P&gt;Yes, this part is fine:&lt;BR /&gt;&lt;BR /&gt;&lt;/P&gt;&lt;PRE&gt;import dlt&lt;BR /&gt;from pyspark.sql.functions import col, lit, current_timestamp&lt;BR /&gt;from pyspark.sql import functions as F&lt;BR /&gt;&lt;BR /&gt;storage_account = 'xxxxx'&lt;BR /&gt;container = 'xxxxxx'&lt;BR /&gt;landing_path = 'xxxxxx/Transactions'&lt;BR /&gt;&lt;BR /&gt;landing_json_path = f"abfss://{container}@{storage_account}.dfs.core.windows.net/{landing_path}"&lt;BR /&gt;&lt;BR /&gt;&lt;BR /&gt;@dlt.table(&lt;BR /&gt;name="Transactions_Bronze_Raw",&lt;BR /&gt;table_properties={&lt;BR /&gt;"delta.enableChangeDataFeed": "true",&lt;BR /&gt;"quality": "bronze"&lt;BR /&gt;}&lt;BR /&gt;)&lt;BR /&gt;&lt;BR /&gt;def Transactions_Bronze_Raw():&lt;BR /&gt;# Read streaming data from the landing path (source of data)&lt;BR /&gt;df = (&lt;BR /&gt;spark.readStream.format("cloudFiles")&lt;BR /&gt;.option("cloudFiles.format", "json")&lt;BR /&gt;.option("inferSchema", True)&lt;BR /&gt;.option("cloudFiles.inferColumnTypes", "true")&lt;BR /&gt;.option("recursiveFileLookup", "true")&lt;BR /&gt;.load(landing_json_path)&lt;BR /&gt;)&lt;BR /&gt;&lt;BR /&gt;# Add metadata for inserted time&lt;BR /&gt;df = df.withColumn("SDA_Inserted", F.date_format(F.current_timestamp(), "yyyy-MM-dd HH:mm:ss"))&lt;BR /&gt;return df&lt;BR /&gt;&lt;BR /&gt;&amp;nbsp;&lt;/PRE&gt;&lt;P&gt;&amp;nbsp;This issue is with this part:&lt;/P&gt;&lt;PRE&gt;import dlt&lt;BR /&gt;from pyspark.sql.functions import col, lit, current_timestamp&lt;BR /&gt;from pyspark.sql import functions as F&lt;BR /&gt;&lt;BR /&gt;@dlt.view&lt;BR /&gt;def Transactions_Bronze_Raw():&lt;BR /&gt;return spark.readStream.table("sda_edw_bronze.erp.Transactions_Bronze_Raw")&lt;BR /&gt;&lt;BR /&gt;&lt;BR /&gt;dlt.create_streaming_table(&lt;BR /&gt;name="Transactions_Bronze", # No database qualifier in the table name&lt;BR /&gt;table_properties={&lt;BR /&gt;"quality": "bronze",&lt;BR /&gt;"delta.enableChangeDataFeed": "true"&lt;BR /&gt;}&lt;BR /&gt;)&lt;BR /&gt;&lt;BR /&gt;&lt;BR /&gt;dlt.apply_changes(&lt;BR /&gt;source="Transactions_Bronze_Raw", &lt;BR /&gt;target="Transactions_Bronze",&lt;BR /&gt;keys=["Company", "VoucherType", "AccountingYear", "VoucherNo", "RowNo"],&lt;BR /&gt;sequence_by=F.col("SDA_Inserted"), &lt;BR /&gt;stored_as_scd_type=2 &lt;BR /&gt;)&lt;/PRE&gt;&lt;P&gt;It gives me this (see message below). Although the cluster is fine, because it created the&amp;nbsp;Transactions_Bronze_Raw with any issue. But fail when running&amp;nbsp;Transactions_Bronze&lt;/P&gt;&lt;PRE&gt;"timestamp": "2024-12-18T11:59:42.736Z",&lt;BR /&gt;"message": "Update 1171f1 is FAILED.",&lt;BR /&gt;"level": "ERROR",&lt;BR /&gt;"error": {&lt;BR /&gt;"exceptions": [&lt;BR /&gt;{&lt;BR /&gt;"message": "INTERNAL_ERROR: Pipeline cluster is not healthy."&lt;BR /&gt;}&lt;BR /&gt;],&lt;BR /&gt;"internal_exceptions": [&lt;BR /&gt;{&lt;BR /&gt;"class_name": "com.databricks.api.base.DatabricksServiceException",&lt;BR /&gt;"message": "INTERNAL_ERROR: Pipeline cluster is not healthy.",&lt;BR /&gt;"stack": [&lt;BR /&gt;{&lt;BR /&gt;"declaring_class": "com.databricks.api.base.DatabricksServiceException$",&lt;BR /&gt;"method_name": "apply",&lt;BR /&gt;"file_name": "DatabricksServiceException.scala",&lt;BR /&gt;"line_number": 400&lt;BR /&gt;},&lt;/PRE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Wed, 18 Dec 2024 12:10:49 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/how-to-access-bronze-dlt-in-silver-dlt/m-p/102471#M41128</guid>
      <dc:creator>issa</dc:creator>
      <dc:date>2024-12-18T12:10:49Z</dc:date>
    </item>
    <item>
      <title>Re: How to access bronze dlt in silver dlt</title>
      <link>https://community.databricks.com/t5/data-engineering/how-to-access-bronze-dlt-in-silver-dlt/m-p/102491#M41141</link>
      <description>&lt;P&gt;Hi&amp;nbsp;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/134301"&gt;@issa&lt;/a&gt;&amp;nbsp;,&lt;/P&gt;&lt;P&gt;Ok, so&amp;nbsp;Transactions_Bronze_Raw is created and the data is in place. I do not see any obvious reason why it is not working, so in your situation I would try to debug step by step changing one setting at a time.&lt;BR /&gt;Here a couple of things I would test:&lt;/P&gt;&lt;P&gt;1. Could you make sure that&amp;nbsp;"Company", "VoucherType", "AccountingYear", "VoucherNo", "RowNo" for a given&amp;nbsp;SDA_Inserted does not return any duplicates?&lt;/P&gt;&lt;P&gt;2. Could you remove table properties from Transactions_Bronze:&lt;/P&gt;&lt;LI-CODE lang="python"&gt;dlt.create_streaming_table(
name="Transactions_Bronze"
)&lt;/LI-CODE&gt;&lt;P&gt;3. Change&amp;nbsp;stored_as_scd_type=1&lt;/P&gt;&lt;P&gt;4. I would change the table names to lowercase&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Wed, 18 Dec 2024 14:09:21 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/how-to-access-bronze-dlt-in-silver-dlt/m-p/102491#M41141</guid>
      <dc:creator>filipniziol</dc:creator>
      <dc:date>2024-12-18T14:09:21Z</dc:date>
    </item>
    <item>
      <title>Re: How to access bronze dlt in silver dlt</title>
      <link>https://community.databricks.com/t5/data-engineering/how-to-access-bronze-dlt-in-silver-dlt/m-p/102626#M41183</link>
      <description>&lt;P&gt;Hi&amp;nbsp;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/134301"&gt;@issa&lt;/a&gt;&amp;nbsp;,&lt;BR /&gt;&lt;BR /&gt;Also, could you share what is the Databricks Runtime that is used to run you DLT?&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Thu, 19 Dec 2024 08:35:33 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/how-to-access-bronze-dlt-in-silver-dlt/m-p/102626#M41183</guid>
      <dc:creator>filipniziol</dc:creator>
      <dc:date>2024-12-19T08:35:33Z</dc:date>
    </item>
    <item>
      <title>Re: How to access bronze dlt in silver dlt</title>
      <link>https://community.databricks.com/t5/data-engineering/how-to-access-bronze-dlt-in-silver-dlt/m-p/102635#M41187</link>
      <description>&lt;P&gt;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/117376"&gt;@filipniziol&lt;/a&gt;&amp;nbsp;- its resolved now, thanks. What I did to solve it was to delete the tables and runt everything again. Even though I did a full refresh from DLT, that didnt seem to do the trick. So I deleted the tables thinking that it could be some cached metadata issue for the bronze table.&lt;/P&gt;</description>
      <pubDate>Thu, 19 Dec 2024 10:20:47 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/how-to-access-bronze-dlt-in-silver-dlt/m-p/102635#M41187</guid>
      <dc:creator>issa</dc:creator>
      <dc:date>2024-12-19T10:20:47Z</dc:date>
    </item>
    <item>
      <title>Re: How to access bronze dlt in silver dlt</title>
      <link>https://community.databricks.com/t5/data-engineering/how-to-access-bronze-dlt-in-silver-dlt/m-p/102636#M41188</link>
      <description>&lt;P&gt;Final solution for the Bronze:&lt;BR /&gt;&lt;BR /&gt;&lt;/P&gt;&lt;PRE&gt;&lt;SPAN&gt;# Define view as the source&lt;BR /&gt;@&lt;/SPAN&gt;&lt;SPAN&gt;dlt&lt;/SPAN&gt;&lt;SPAN&gt;.&lt;/SPAN&gt;&lt;SPAN&gt;view&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;def&lt;/SPAN&gt; &lt;SPAN&gt;Transactions_Bronze_View&lt;/SPAN&gt;&lt;SPAN&gt;():&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;return&lt;/SPAN&gt;&lt;SPAN&gt; (&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; spark.readStream.&lt;/SPAN&gt;&lt;SPAN&gt;format&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"cloudFiles"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .&lt;/SPAN&gt;&lt;SPAN&gt;option&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"cloudFiles.format"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;"json"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .&lt;/SPAN&gt;&lt;SPAN&gt;option&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"inferSchema"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;True&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .&lt;/SPAN&gt;&lt;SPAN&gt;option&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"cloudFiles.inferColumnTypes"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;"true"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .&lt;/SPAN&gt;&lt;SPAN&gt;option&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"recursiveFileLookup"&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;"true"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .&lt;/SPAN&gt;&lt;SPAN&gt;load&lt;/SPAN&gt;&lt;SPAN&gt;(landing_json_path)&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .&lt;/SPAN&gt;&lt;SPAN&gt;withColumn&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"SDA_Inserted"&lt;/SPAN&gt;&lt;SPAN&gt;, F.&lt;/SPAN&gt;&lt;SPAN&gt;date_format&lt;/SPAN&gt;&lt;SPAN&gt;(F.&lt;/SPAN&gt;&lt;SPAN&gt;current_timestamp&lt;/SPAN&gt;&lt;SPAN&gt;(), &lt;/SPAN&gt;&lt;SPAN&gt;"yyyy-MM-dd HH:mm:ss"&lt;/SPAN&gt;&lt;SPAN&gt;)) &amp;nbsp;&lt;/SPAN&gt;&lt;SPAN&gt;# Add metadata&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; )&lt;BR /&gt;&lt;BR /&gt;# Define a DLT to store the bronze data with Change Data Feed enabled&lt;BR /&gt;dlt.create_streaming_table(&lt;BR /&gt;&amp;nbsp; &amp;nbsp; name="Transactions_Bronze", &amp;nbsp;# No database qualifier in the table name&lt;BR /&gt;&amp;nbsp; &amp;nbsp; table_properties={&lt;BR /&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "quality": "bronze",&lt;BR /&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "delta.enableChangeDataFeed": "true"&lt;BR /&gt;&amp;nbsp; &amp;nbsp; }&lt;BR /&gt;)&lt;BR /&gt;&lt;BR /&gt;# Implement SCD type 2&lt;BR /&gt;dlt.apply_changes(&lt;BR /&gt;&amp;nbsp; &amp;nbsp; target="Transactions_Bronze",&lt;BR /&gt;&amp;nbsp; &amp;nbsp; source="Transactions_Bronze_View",&lt;BR /&gt;&amp;nbsp; &amp;nbsp; keys=["Company", "VoucherType", "AccountingYear", "VoucherNo", "RowNo"],&lt;BR /&gt;&amp;nbsp; &amp;nbsp; sequence_by=F.col("SDA_Inserted"), &amp;nbsp;# Timestamp column to sequence updates&lt;BR /&gt;&amp;nbsp; &amp;nbsp; stored_as_scd_type=2&lt;BR /&gt;)&lt;/SPAN&gt;&lt;/PRE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Thu, 19 Dec 2024 10:26:17 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/how-to-access-bronze-dlt-in-silver-dlt/m-p/102636#M41188</guid>
      <dc:creator>issa</dc:creator>
      <dc:date>2024-12-19T10:26:17Z</dc:date>
    </item>
  </channel>
</rss>

