cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

How to access bronze dlt in silver dlt

issa
New Contributor III

I have a job in Workflows thatt runs two DLT pipelines, one for Bronze_Transaction and on for Silver_Transaction. The reason for two DLT pipelines is because i want the tables to be created in bronze catalog and erp schema, and silver catalog and erp schema.

The notebooks for each DLT are:
Bronze:

import dlt
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, expr
from pyspark.sql.functions import round, unix_timestamp, current_timestamp
from pyspark.sql import functions as F
from pyspark.sql.types import *


storage_account = 'xxxxxxxxx'
container = 'xxxxxxxxxxxxx'
landing_path = 'Landing/Transactions'
landing_json_path = f"abfss://{container}@{storage_account}.dfs.core.windows.net/{landing_path}"

# Transactions_Bronze
# Serve as the raw source for downstream processing in silver or gold tables in the Delta Lakehouse.

@dlt.table(
    name = "Transactions_Bronze",
    table_properties={
        "delta.enableChangeDataFeed": "true",
        "quality": "bronze"
    }
)

def Transactions_Bronze():
    # Read data from JSON files
    df = (spark.readStream.format("cloudFiles")
          .option("cloudFiles.format", "json")
          .option("inferSchema", True)
          .option("cloudFiles.inferColumnTypes", "true")
          .option("recursiveFileLookup", "true")
          .load(landing_json_path)
          )

    # Add metdadata column for insertion time
    df = df.withColumn("SDA_Inserted", F.date_format(current_timestamp(), "yyyy-mm-dd HH:mm:ss"))
    return df


SILVER:

import dlt
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, expr
from pyspark.sql.functions import round, unix_timestamp, current_timestamp
from pyspark.sql import functions as F
from pyspark.sql.types import *

# Transactions_Silver
dlt.create_streaming_table(
    name="Transactions_Silver",  # No database qualifier in the table name
    table_properties={
        "quality": "silver"
    }
)

# Define the path to the external table
bronze_df = (
    spark.readStream
    .format("delta")
    .table("bronze.erp.Transactions_Bronze")
)

# Apply changes from bronze to silver
dlt.apply_changes(
    source=bronze_df, #first i tried to just write bronze.erp.transactions_bronze, but that failed to
    target="Transactions_Silver",
    keys=["Company", "VoucherType", "AccountingYear", "VoucherNo", "RowNo"],
    stored_as_scd_type="2",
    sequence_by="Inserted"
)

 
The bronze works without any issue, but the workflows fails on silver dlt: The error message I get is:
"timestamp": "2024-11-29T09:03:48.672Z",
"message": "Failed to resolve flow: 'transactions_silver'.",
"level": "ERROR",
"error": {
"exceptions": [
{
"class_name": "pyspark.errors.exceptions.base.PySparkAttributeError",
"message": "Traceback (most recent call last):\npyspark.errors.exceptions.base.PySparkAttributeError: [ATTRIBUTE_NOT_SUPPORTED] Attribute `_get_object_id` is not supported.",
"error_class": "ATTRIBUTE_NOT_SUPPORTED"
}
],
"fatal": true
},
"details": {
"flow_progress": {
"status": "FAILED"
}

Appreciative for all help I can get with this one
1 ACCEPTED SOLUTION

Accepted Solutions

issa
New Contributor III

Final solution for the Bronze:

# Define view as the source
@
dlt.view
def Transactions_Bronze_View():
    return (
        spark.readStream.format("cloudFiles")
        .option("cloudFiles.format", "json")
        .option("inferSchema", True)
        .option("cloudFiles.inferColumnTypes", "true")
        .option("recursiveFileLookup", "true")
        .load(landing_json_path)
        .withColumn("SDA_Inserted", F.date_format(F.current_timestamp(), "yyyy-MM-dd HH:mm:ss"))  # Add metadata
    )

# Define a DLT to store the bronze data with Change Data Feed enabled
dlt.create_streaming_table(
    name="Transactions_Bronze",  # No database qualifier in the table name
    table_properties={
        "quality": "bronze",
        "delta.enableChangeDataFeed": "true"
    }
)

# Implement SCD type 2
dlt.apply_changes(
    target="Transactions_Bronze",
    source="Transactions_Bronze_View",
    keys=["Company", "VoucherType", "AccountingYear", "VoucherNo", "RowNo"],
    sequence_by=F.col("SDA_Inserted"),  # Timestamp column to sequence updates
    stored_as_scd_type=2
)

 

View solution in original post

9 REPLIES 9

ozaaditya
Contributor


By looking at your code, I can suggest a few changes that you can try out.

Instead of spark.readStream, you can use bronze_df = dlt.read_stream("Transactions_Bronze").

Revised Code:

bronze_df = dlt.read_stream("Transactions_Bronze")
dlt.apply_changes(
source=bronze_df,
target="Transactions_Silver",
keys=["Company", "VoucherType", "AccountingYear", "VoucherNo", "RowNo"],
stored_as_scd_type="2",
sequence_by="Inserted"
)


Additionally, you have created a column called SDA_Inserted in the bronze table, but in the silver table, the sequence_by parameter is based on the Inserted column. Make sure to correct the column name if there is no Inserted column already present in the data.

filipniziol
Contributor III

Hi @issa ,

As suggested by @ozaaditya , the issue is with the way you define the source parameter.
It cannot be a simple dataframe, but a DLT table or view.

Here is the code from documentation:

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  apply_as_truncates = expr("operation = 'TRUNCATE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = 1
)

So in your case the code for source should look like:

@dlt.view
def transactions_bronze():
  return spark.readStream.table("bronze.erp.transactions_bronze")

And then you pass "transaction_bronze" as a source.

issa
New Contributor III

Hi @filipniziol and @ozaaditya,

Thank you both for you input. I changed the code, since I figured that the SCD should be on the bronze layer and that i then should filter out open rows in silver.

However, that dosent work.

My idea was:

Bronze layer:

import dlt
from pyspark.sql.functions import col, lit, current_timestamp
from pyspark.sql import functions as F

@dlt.table(
    name="Transactions_Bronze_Raw",
    table_properties={
        "delta.enableChangeDataFeed": "true",
        "quality": "bronze"
    }
)

def Transactions_Bronze_Raw():
    # Read streaming data from the landing path (source of data)
    df = (
        spark.readStream.format("cloudFiles")
        .option("cloudFiles.format", "json")
        .option("inferSchema", True)
        .option("cloudFiles.inferColumnTypes", "true")
        .option("recursiveFileLookup", "true")
        .load(landing_json_path)
    )

    # Add metadata for inserted time
    df = df.withColumn("SDA_Inserted", F.date_format(F.current_timestamp(), "yyyy-MM-dd HH:mm:ss"))
    return df

# Transactions_Bronze
dlt.create_streaming_table(
    name="Transactions_Bronze",  # No database qualifier in the table name
    table_properties={
        "quality": "silver",
        "delta.enableChangeDataFeed": "true"
    }
)

# Apply change data capture logic to handle SCD Type 2 for the streaming table
dlt.apply_changes(
    target="Transactions_Bronze",  
    source="Transactions_Bronze_Raw",  
    keys=["Company", "VoucherType", "AccountingYear", "VoucherNo", "RowNo"],
    sequence_by=F.col("SDA_Inserted"),  
    stored_as_scd_type=2  
)

Silver layer:

@dlt.table(
    name="Transactions_Silver",
    table_properties={
        "quality": "silver",
        "delta.enableChangeDataFeed": "true"
    }
)

def Transactions_Silver():
    # Read from the Bronze table
    bronze_df = spark.readStream.format("delta").table("sda_edw_bronze.erp.Transactions_Bronze")

    # Filter only open rows
    silver_df = bronze_df.filter(F.col("__END_AT").isNull())

    # Add a new SDA_Inserted column
    silver_df = silver_df.withColumn(
        "SDA_Inserted",
        F.date_format(F.current_timestamp(), "yyyy-MM-dd HH:mm:ss")
    )

    # Return the transformed DataFrame
    return silver_df

 The issue is that the bronze layer fails, 
"message": "Update ff5d14 is FAILED.",
"level": "ERROR",
"error": {
"exceptions": [
{
"message": "INTERNAL_ERROR: Pipeline cluster is not healthy."
}
]

But the cluster is fine, because I can run other notebooks without issues.

Do you see any issue with my code for the bronze layer?

First I wanted to create a view (as you suggested @filipniziol for the previous issue) of the input data and use that as a source for the bronze table, but that didnt work either. The pipeline ran without any issue, but the data dint get ingested. This is how that ide looked:

# Define a view as the source
@dlt.view
def Transactions_Bronze_View():
    return (
        spark.readStream.format("cloudFiles")
        .option("cloudFiles.format", "json")
        .option("inferSchema", True)
        .option("cloudFiles.inferColumnTypes", "true")
        .option("recursiveFileLookup", "true")
        .load(landing_json_path)
        .withColumn("SDA_Inserted", F.date_format(F.current_timestamp(), "yyyy-MM-dd HH:mm:ss"))  # Add metadata
    )

# Transactions_Bronze
dlt.create_streaming_table(
    name="Transactions_Bronze",  # No database qualifier in the table name
    table_properties={
        "quality": "bronze",
        "delta.enableChangeDataFeed": "true"
    }
)

# Use the view as the source for SCD2 table creation
dlt.apply_changes(
    target="Transactions_Bronze",
    source="Transactions_Bronze_View",
    keys=["Company", "VoucherType", "AccountingYear", "VoucherNo", "RowNo"],
    sequence_by=F.col("SDA_Inserted"),  # Timestamp column to sequence updates
    stored_as_scd_type=2
)
 

 

Would appreciate all help I can get on building the bronze table as a SCD2.

Thanks in advance!

Hi @issa ,

Could you debug step by step.

Could you have a DLT pipeline that would do just this:

 

import dlt
from pyspark.sql.functions import col, lit, current_timestamp
from pyspark.sql import functions as F

@dlt.table(
    name="Transactions_Bronze_Raw",
    table_properties={
        "delta.enableChangeDataFeed": "true",
        "quality": "bronze"
    }
)

def Transactions_Bronze_Raw():
    # Read streaming data from the landing path (source of data)
    df = (
        spark.readStream.format("cloudFiles")
        .option("cloudFiles.format", "json")
        .option("inferSchema", True)
        .option("cloudFiles.inferColumnTypes", "true")
        .option("recursiveFileLookup", "true")
        .load(landing_json_path)
    )

    # Add metadata for inserted time
    df = df.withColumn("SDA_Inserted", F.date_format(F.current_timestamp(), "yyyy-MM-dd HH:mm:ss"))
    return df

 

Is the table Transactions_Bronze_Raw correctly created?

issa
New Contributor III

Yes, this part is fine:

import dlt
from pyspark.sql.functions import col, lit, current_timestamp
from pyspark.sql import functions as F

storage_account = 'xxxxx'
container = 'xxxxxx'
landing_path = 'xxxxxx/Transactions'

landing_json_path = f"abfss://{container}@{storage_account}.dfs.core.windows.net/{landing_path}"


@dlt.table(
name="Transactions_Bronze_Raw",
table_properties={
"delta.enableChangeDataFeed": "true",
"quality": "bronze"
}
)

def Transactions_Bronze_Raw():
# Read streaming data from the landing path (source of data)
df = (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("inferSchema", True)
.option("cloudFiles.inferColumnTypes", "true")
.option("recursiveFileLookup", "true")
.load(landing_json_path)
)

# Add metadata for inserted time
df = df.withColumn("SDA_Inserted", F.date_format(F.current_timestamp(), "yyyy-MM-dd HH:mm:ss"))
return df

 

 This issue is with this part:

import dlt
from pyspark.sql.functions import col, lit, current_timestamp
from pyspark.sql import functions as F

@dlt.view
def Transactions_Bronze_Raw():
return spark.readStream.table("sda_edw_bronze.erp.Transactions_Bronze_Raw")


dlt.create_streaming_table(
name="Transactions_Bronze", # No database qualifier in the table name
table_properties={
"quality": "bronze",
"delta.enableChangeDataFeed": "true"
}
)


dlt.apply_changes(
source="Transactions_Bronze_Raw",
target="Transactions_Bronze",
keys=["Company", "VoucherType", "AccountingYear", "VoucherNo", "RowNo"],
sequence_by=F.col("SDA_Inserted"),
stored_as_scd_type=2
)

It gives me this (see message below). Although the cluster is fine, because it created the Transactions_Bronze_Raw with any issue. But fail when running Transactions_Bronze

"timestamp": "2024-12-18T11:59:42.736Z",
"message": "Update 1171f1 is FAILED.",
"level": "ERROR",
"error": {
"exceptions": [
{
"message": "INTERNAL_ERROR: Pipeline cluster is not healthy."
}
],
"internal_exceptions": [
{
"class_name": "com.databricks.api.base.DatabricksServiceException",
"message": "INTERNAL_ERROR: Pipeline cluster is not healthy.",
"stack": [
{
"declaring_class": "com.databricks.api.base.DatabricksServiceException$",
"method_name": "apply",
"file_name": "DatabricksServiceException.scala",
"line_number": 400
},

 

filipniziol
Contributor III

Hi @issa ,

Ok, so Transactions_Bronze_Raw is created and the data is in place. I do not see any obvious reason why it is not working, so in your situation I would try to debug step by step changing one setting at a time.
Here a couple of things I would test:

1. Could you make sure that "Company", "VoucherType", "AccountingYear", "VoucherNo", "RowNo" for a given SDA_Inserted does not return any duplicates?

2. Could you remove table properties from Transactions_Bronze:

dlt.create_streaming_table(
name="Transactions_Bronze"
)

3. Change stored_as_scd_type=1

4. I would change the table names to lowercase

 

filipniziol
Contributor III

Hi @issa ,

Also, could you share what is the Databricks Runtime that is used to run you DLT?

 

issa
New Contributor III

@filipniziol - its resolved now, thanks. What I did to solve it was to delete the tables and runt everything again. Even though I did a full refresh from DLT, that didnt seem to do the trick. So I deleted the tables thinking that it could be some cached metadata issue for the bronze table.

issa
New Contributor III

Final solution for the Bronze:

# Define view as the source
@
dlt.view
def Transactions_Bronze_View():
    return (
        spark.readStream.format("cloudFiles")
        .option("cloudFiles.format", "json")
        .option("inferSchema", True)
        .option("cloudFiles.inferColumnTypes", "true")
        .option("recursiveFileLookup", "true")
        .load(landing_json_path)
        .withColumn("SDA_Inserted", F.date_format(F.current_timestamp(), "yyyy-MM-dd HH:mm:ss"))  # Add metadata
    )

# Define a DLT to store the bronze data with Change Data Feed enabled
dlt.create_streaming_table(
    name="Transactions_Bronze",  # No database qualifier in the table name
    table_properties={
        "quality": "bronze",
        "delta.enableChangeDataFeed": "true"
    }
)

# Implement SCD type 2
dlt.apply_changes(
    target="Transactions_Bronze",
    source="Transactions_Bronze_View",
    keys=["Company", "VoucherType", "AccountingYear", "VoucherNo", "RowNo"],
    sequence_by=F.col("SDA_Inserted"),  # Timestamp column to sequence updates
    stored_as_scd_type=2
)

 

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group