How to access bronze dlt in silver dlt

issa
New Contributor III

I have a job in Workflows thatt runs two DLT pipelines, one for Bronze_Transaction and on for Silver_Transaction. The reason for two DLT pipelines is because i want the tables to be created in bronze catalog and erp schema, and silver catalog and erp schema.

The notebooks for each DLT are:
Bronze:

import dlt
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, expr
from pyspark.sql.functions import round, unix_timestamp, current_timestamp
from pyspark.sql import functions as F
from pyspark.sql.types import *


storage_account = 'xxxxxxxxx'
container = 'xxxxxxxxxxxxx'
landing_path = 'Landing/Transactions'
landing_json_path = f"abfss://{container}@{storage_account}.dfs.core.windows.net/{landing_path}"

# Transactions_Bronze
# Serve as the raw source for downstream processing in silver or gold tables in the Delta Lakehouse.

@dlt.table(
    name = "Transactions_Bronze",
    table_properties={
        "delta.enableChangeDataFeed": "true",
        "quality": "bronze"
    }
)

def Transactions_Bronze():
    # Read data from JSON files
    df = (spark.readStream.format("cloudFiles")
          .option("cloudFiles.format", "json")
          .option("inferSchema", True)
          .option("cloudFiles.inferColumnTypes", "true")
          .option("recursiveFileLookup", "true")
          .load(landing_json_path)
          )

    # Add metdadata column for insertion time
    df = df.withColumn("SDA_Inserted", F.date_format(current_timestamp(), "yyyy-mm-dd HH:mm:ss"))
    return df


SILVER:

import dlt
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, expr
from pyspark.sql.functions import round, unix_timestamp, current_timestamp
from pyspark.sql import functions as F
from pyspark.sql.types import *

# Transactions_Silver
dlt.create_streaming_table(
    name="Transactions_Silver",  # No database qualifier in the table name
    table_properties={
        "quality": "silver"
    }
)

# Define the path to the external table
bronze_df = (
    spark.readStream
    .format("delta")
    .table("bronze.erp.Transactions_Bronze")
)

# Apply changes from bronze to silver
dlt.apply_changes(
    source=bronze_df, #first i tried to just write bronze.erp.transactions_bronze, but that failed to
    target="Transactions_Silver",
    keys=["Company", "VoucherType", "AccountingYear", "VoucherNo", "RowNo"],
    stored_as_scd_type="2",
    sequence_by="Inserted"
)

 
The bronze works without any issue, but the workflows fails on silver dlt: The error message I get is:
"timestamp": "2024-11-29T09:03:48.672Z",
"message": "Failed to resolve flow: 'transactions_silver'.",
"level": "ERROR",
"error": {
"exceptions": [
{
"class_name": "pyspark.errors.exceptions.base.PySparkAttributeError",
"message": "Traceback (most recent call last):\npyspark.errors.exceptions.base.PySparkAttributeError: [ATTRIBUTE_NOT_SUPPORTED] Attribute `_get_object_id` is not supported.",
"error_class": "ATTRIBUTE_NOT_SUPPORTED"
}
],
"fatal": true
},
"details": {
"flow_progress": {
"status": "FAILED"
}

Appreciative for all help I can get with this one

ozaaditya
Databricks Partner


By looking at your code, I can suggest a few changes that you can try out.

Instead of spark.readStream, you can use bronze_df = dlt.read_stream("Transactions_Bronze").

Revised Code:

bronze_df = dlt.read_stream("Transactions_Bronze")
dlt.apply_changes(
source=bronze_df,
target="Transactions_Silver",
keys=["Company", "VoucherType", "AccountingYear", "VoucherNo", "RowNo"],
stored_as_scd_type="2",
sequence_by="Inserted"
)


Additionally, you have created a column called SDA_Inserted in the bronze table, but in the silver table, the sequence_by parameter is based on the Inserted column. Make sure to correct the column name if there is no Inserted column already present in the data.

filipniziol
Esteemed Contributor

Hi @issa ,

As suggested by @ozaaditya , the issue is with the way you define the source parameter.
It cannot be a simple dataframe, but a DLT table or view.

Here is the code from documentation:

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  apply_as_truncates = expr("operation = 'TRUNCATE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = 1
)

So in your case the code for source should look like:

@dlt.view
def transactions_bronze():
  return spark.readStream.table("bronze.erp.transactions_bronze")

And then you pass "transaction_bronze" as a source.

issa
New Contributor III

Hi @filipniziol and @ozaaditya,

Thank you both for you input. I changed the code, since I figured that the SCD should be on the bronze layer and that i then should filter out open rows in silver.

However, that dosent work.

My idea was:

Bronze layer:

import dlt
from pyspark.sql.functions import col, lit, current_timestamp
from pyspark.sql import functions as F

@dlt.table(
    name="Transactions_Bronze_Raw",
    table_properties={
        "delta.enableChangeDataFeed": "true",
        "quality": "bronze"
    }
)

def Transactions_Bronze_Raw():
    # Read streaming data from the landing path (source of data)
    df = (
        spark.readStream.format("cloudFiles")
        .option("cloudFiles.format", "json")
        .option("inferSchema", True)
        .option("cloudFiles.inferColumnTypes", "true")
        .option("recursiveFileLookup", "true")
        .load(landing_json_path)
    )

    # Add metadata for inserted time
    df = df.withColumn("SDA_Inserted", F.date_format(F.current_timestamp(), "yyyy-MM-dd HH:mm:ss"))
    return df

# Transactions_Bronze
dlt.create_streaming_table(
    name="Transactions_Bronze",  # No database qualifier in the table name
    table_properties={
        "quality": "silver",
        "delta.enableChangeDataFeed": "true"
    }
)

# Apply change data capture logic to handle SCD Type 2 for the streaming table
dlt.apply_changes(
    target="Transactions_Bronze",  
    source="Transactions_Bronze_Raw",  
    keys=["Company", "VoucherType", "AccountingYear", "VoucherNo", "RowNo"],
    sequence_by=F.col("SDA_Inserted"),  
    stored_as_scd_type=2  
)

Silver layer:

@dlt.table(
    name="Transactions_Silver",
    table_properties={
        "quality": "silver",
        "delta.enableChangeDataFeed": "true"
    }
)

def Transactions_Silver():
    # Read from the Bronze table
    bronze_df = spark.readStream.format("delta").table("sda_edw_bronze.erp.Transactions_Bronze")

    # Filter only open rows
    silver_df = bronze_df.filter(F.col("__END_AT").isNull())

    # Add a new SDA_Inserted column
    silver_df = silver_df.withColumn(
        "SDA_Inserted",
        F.date_format(F.current_timestamp(), "yyyy-MM-dd HH:mm:ss")
    )

    # Return the transformed DataFrame
    return silver_df

 The issue is that the bronze layer fails, 
"message": "Update ff5d14 is FAILED.",
"level": "ERROR",
"error": {
"exceptions": [
{
"message": "INTERNAL_ERROR: Pipeline cluster is not healthy."
}
]

But the cluster is fine, because I can run other notebooks without issues.

Do you see any issue with my code for the bronze layer?

First I wanted to create a view (as you suggested @filipniziol for the previous issue) of the input data and use that as a source for the bronze table, but that didnt work either. The pipeline ran without any issue, but the data dint get ingested. This is how that ide looked:

# Define a view as the source
@dlt.view
def Transactions_Bronze_View():
    return (
        spark.readStream.format("cloudFiles")
        .option("cloudFiles.format", "json")
        .option("inferSchema", True)
        .option("cloudFiles.inferColumnTypes", "true")
        .option("recursiveFileLookup", "true")
        .load(landing_json_path)
        .withColumn("SDA_Inserted", F.date_format(F.current_timestamp(), "yyyy-MM-dd HH:mm:ss"))  # Add metadata
    )

# Transactions_Bronze
dlt.create_streaming_table(
    name="Transactions_Bronze",  # No database qualifier in the table name
    table_properties={
        "quality": "bronze",
        "delta.enableChangeDataFeed": "true"
    }
)

# Use the view as the source for SCD2 table creation
dlt.apply_changes(
    target="Transactions_Bronze",
    source="Transactions_Bronze_View",
    keys=["Company", "VoucherType", "AccountingYear", "VoucherNo", "RowNo"],
    sequence_by=F.col("SDA_Inserted"),  # Timestamp column to sequence updates
    stored_as_scd_type=2
)
 

 

Would appreciate all help I can get on building the bronze table as a SCD2.

Thanks in advance!

filipniziol
Esteemed Contributor

Hi @issa ,

Could you debug step by step.

Could you have a DLT pipeline that would do just this:

 

import dlt
from pyspark.sql.functions import col, lit, current_timestamp
from pyspark.sql import functions as F

@dlt.table(
    name="Transactions_Bronze_Raw",
    table_properties={
        "delta.enableChangeDataFeed": "true",
        "quality": "bronze"
    }
)

def Transactions_Bronze_Raw():
    # Read streaming data from the landing path (source of data)
    df = (
        spark.readStream.format("cloudFiles")
        .option("cloudFiles.format", "json")
        .option("inferSchema", True)
        .option("cloudFiles.inferColumnTypes", "true")
        .option("recursiveFileLookup", "true")
        .load(landing_json_path)
    )

    # Add metadata for inserted time
    df = df.withColumn("SDA_Inserted", F.date_format(F.current_timestamp(), "yyyy-MM-dd HH:mm:ss"))
    return df

 

Is the table Transactions_Bronze_Raw correctly created?

issa
New Contributor III

Yes, this part is fine:

import dlt
from pyspark.sql.functions import col, lit, current_timestamp
from pyspark.sql import functions as F

storage_account = 'xxxxx'
container = 'xxxxxx'
landing_path = 'xxxxxx/Transactions'

landing_json_path = f"abfss://{container}@{storage_account}.dfs.core.windows.net/{landing_path}"


@dlt.table(
name="Transactions_Bronze_Raw",
table_properties={
"delta.enableChangeDataFeed": "true",
"quality": "bronze"
}
)

def Transactions_Bronze_Raw():
# Read streaming data from the landing path (source of data)
df = (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("inferSchema", True)
.option("cloudFiles.inferColumnTypes", "true")
.option("recursiveFileLookup", "true")
.load(landing_json_path)
)

# Add metadata for inserted time
df = df.withColumn("SDA_Inserted", F.date_format(F.current_timestamp(), "yyyy-MM-dd HH:mm:ss"))
return df

 

 This issue is with this part:

import dlt
from pyspark.sql.functions import col, lit, current_timestamp
from pyspark.sql import functions as F

@dlt.view
def Transactions_Bronze_Raw():
return spark.readStream.table("sda_edw_bronze.erp.Transactions_Bronze_Raw")


dlt.create_streaming_table(
name="Transactions_Bronze", # No database qualifier in the table name
table_properties={
"quality": "bronze",
"delta.enableChangeDataFeed": "true"
}
)


dlt.apply_changes(
source="Transactions_Bronze_Raw",
target="Transactions_Bronze",
keys=["Company", "VoucherType", "AccountingYear", "VoucherNo", "RowNo"],
sequence_by=F.col("SDA_Inserted"),
stored_as_scd_type=2
)

It gives me this (see message below). Although the cluster is fine, because it created the Transactions_Bronze_Raw with any issue. But fail when running Transactions_Bronze

"timestamp": "2024-12-18T11:59:42.736Z",
"message": "Update 1171f1 is FAILED.",
"level": "ERROR",
"error": {
"exceptions": [
{
"message": "INTERNAL_ERROR: Pipeline cluster is not healthy."
}
],
"internal_exceptions": [
{
"class_name": "com.databricks.api.base.DatabricksServiceException",
"message": "INTERNAL_ERROR: Pipeline cluster is not healthy.",
"stack": [
{
"declaring_class": "com.databricks.api.base.DatabricksServiceException$",
"method_name": "apply",
"file_name": "DatabricksServiceException.scala",
"line_number": 400
},

 

filipniziol
Esteemed Contributor

Hi @issa ,

Ok, so Transactions_Bronze_Raw is created and the data is in place. I do not see any obvious reason why it is not working, so in your situation I would try to debug step by step changing one setting at a time.
Here a couple of things I would test:

1. Could you make sure that "Company", "VoucherType", "AccountingYear", "VoucherNo", "RowNo" for a given SDA_Inserted does not return any duplicates?

2. Could you remove table properties from Transactions_Bronze:

dlt.create_streaming_table(
name="Transactions_Bronze"
)

3. Change stored_as_scd_type=1

4. I would change the table names to lowercase

 

filipniziol
Esteemed Contributor

Hi @issa ,

Also, could you share what is the Databricks Runtime that is used to run you DLT?

 

issa
New Contributor III

@filipniziol - its resolved now, thanks. What I did to solve it was to delete the tables and runt everything again. Even though I did a full refresh from DLT, that didnt seem to do the trick. So I deleted the tables thinking that it could be some cached metadata issue for the bronze table.

issa
New Contributor III

Final solution for the Bronze:

# Define view as the source
@
dlt.view
def Transactions_Bronze_View():
    return (
        spark.readStream.format("cloudFiles")
        .option("cloudFiles.format", "json")
        .option("inferSchema", True)
        .option("cloudFiles.inferColumnTypes", "true")
        .option("recursiveFileLookup", "true")
        .load(landing_json_path)
        .withColumn("SDA_Inserted", F.date_format(F.current_timestamp(), "yyyy-MM-dd HH:mm:ss"))  # Add metadata
    )

# Define a DLT to store the bronze data with Change Data Feed enabled
dlt.create_streaming_table(
    name="Transactions_Bronze",  # No database qualifier in the table name
    table_properties={
        "quality": "bronze",
        "delta.enableChangeDataFeed": "true"
    }
)

# Implement SCD type 2
dlt.apply_changes(
    target="Transactions_Bronze",
    source="Transactions_Bronze_View",
    keys=["Company", "VoucherType", "AccountingYear", "VoucherNo", "RowNo"],
    sequence_by=F.col("SDA_Inserted"),  # Timestamp column to sequence updates
    stored_as_scd_type=2
)

 

View solution in original post