- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
11-29-2024 01:24 AM - edited 11-29-2024 01:28 AM
I have a job in Workflows thatt runs two DLT pipelines, one for Bronze_Transaction and on for Silver_Transaction. The reason for two DLT pipelines is because i want the tables to be created in bronze catalog and erp schema, and silver catalog and erp schema.
The notebooks for each DLT are:
Bronze:
import dlt
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, expr
from pyspark.sql.functions import round, unix_timestamp, current_timestamp
from pyspark.sql import functions as F
from pyspark.sql.types import *
storage_account = 'xxxxxxxxx'
container = 'xxxxxxxxxxxxx'
landing_path = 'Landing/Transactions'
landing_json_path = f"abfss://{container}@{storage_account}.dfs.core.windows.net/{landing_path}"
# Transactions_Bronze
# Serve as the raw source for downstream processing in silver or gold tables in the Delta Lakehouse.
@dlt.table(
name = "Transactions_Bronze",
table_properties={
"delta.enableChangeDataFeed": "true",
"quality": "bronze"
}
)
def Transactions_Bronze():
# Read data from JSON files
df = (spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("inferSchema", True)
.option("cloudFiles.inferColumnTypes", "true")
.option("recursiveFileLookup", "true")
.load(landing_json_path)
)
# Add metdadata column for insertion time
df = df.withColumn("SDA_Inserted", F.date_format(current_timestamp(), "yyyy-mm-dd HH:mm:ss"))
return df
SILVER:
import dlt
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, expr
from pyspark.sql.functions import round, unix_timestamp, current_timestamp
from pyspark.sql import functions as F
from pyspark.sql.types import *
# Transactions_Silver
dlt.create_streaming_table(
name="Transactions_Silver", # No database qualifier in the table name
table_properties={
"quality": "silver"
}
)
# Define the path to the external table
bronze_df = (
spark.readStream
.format("delta")
.table("bronze.erp.Transactions_Bronze")
)
# Apply changes from bronze to silver
dlt.apply_changes(
source=bronze_df, #first i tried to just write bronze.erp.transactions_bronze, but that failed to
target="Transactions_Silver",
keys=["Company", "VoucherType", "AccountingYear", "VoucherNo", "RowNo"],
stored_as_scd_type="2",
sequence_by="Inserted"
)
"timestamp": "2024-11-29T09:03:48.672Z",
"message": "Failed to resolve flow: 'transactions_silver'.",
"level": "ERROR",
"error": {
"exceptions": [
{
"class_name": "pyspark.errors.exceptions.base.PySparkAttributeError",
"message": "Traceback (most recent call last):\npyspark.errors.exceptions.base.PySparkAttributeError: [ATTRIBUTE_NOT_SUPPORTED] Attribute `_get_object_id` is not supported.",
"error_class": "ATTRIBUTE_NOT_SUPPORTED"
}
],
"fatal": true
},
"details": {
"flow_progress": {
"status": "FAILED"
}
Accepted Solutions
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
12-19-2024 02:26 AM
Final solution for the Bronze:
# Define view as the source
@dlt.view
def Transactions_Bronze_View():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("inferSchema", True)
.option("cloudFiles.inferColumnTypes", "true")
.option("recursiveFileLookup", "true")
.load(landing_json_path)
.withColumn("SDA_Inserted", F.date_format(F.current_timestamp(), "yyyy-MM-dd HH:mm:ss")) # Add metadata
)
# Define a DLT to store the bronze data with Change Data Feed enabled
dlt.create_streaming_table(
name="Transactions_Bronze", # No database qualifier in the table name
table_properties={
"quality": "bronze",
"delta.enableChangeDataFeed": "true"
}
)
# Implement SCD type 2
dlt.apply_changes(
target="Transactions_Bronze",
source="Transactions_Bronze_View",
keys=["Company", "VoucherType", "AccountingYear", "VoucherNo", "RowNo"],
sequence_by=F.col("SDA_Inserted"), # Timestamp column to sequence updates
stored_as_scd_type=2
)
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
12-04-2024 09:28 PM
By looking at your code, I can suggest a few changes that you can try out.
Instead of spark.readStream, you can use bronze_df = dlt.read_stream("Transactions_Bronze").
Revised Code:
bronze_df = dlt.read_stream("Transactions_Bronze")
dlt.apply_changes(
source=bronze_df,
target="Transactions_Silver",
keys=["Company", "VoucherType", "AccountingYear", "VoucherNo", "RowNo"],
stored_as_scd_type="2",
sequence_by="Inserted"
)
Additionally, you have created a column called SDA_Inserted in the bronze table, but in the silver table, the sequence_by parameter is based on the Inserted column. Make sure to correct the column name if there is no Inserted column already present in the data.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
12-05-2024 12:18 AM
Hi @issa ,
As suggested by @ozaaditya , the issue is with the way you define the source parameter.
It cannot be a simple dataframe, but a DLT table or view.
Here is the code from documentation:
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.table("cdc_data.users")
dlt.create_streaming_table("target")
dlt.apply_changes(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
apply_as_truncates = expr("operation = 'TRUNCATE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = 1
)
So in your case the code for source should look like:
@dlt.view
def transactions_bronze():
return spark.readStream.table("bronze.erp.transactions_bronze")
And then you pass "transaction_bronze" as a source.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
12-09-2024 02:35 AM - edited 12-09-2024 02:47 AM
Hi @filipniziol and @ozaaditya,
Thank you both for you input. I changed the code, since I figured that the SCD should be on the bronze layer and that i then should filter out open rows in silver.
However, that dosent work.
My idea was:
Bronze layer:
import dlt
from pyspark.sql.functions import col, lit, current_timestamp
from pyspark.sql import functions as F
@dlt.table(
name="Transactions_Bronze_Raw",
table_properties={
"delta.enableChangeDataFeed": "true",
"quality": "bronze"
}
)
def Transactions_Bronze_Raw():
# Read streaming data from the landing path (source of data)
df = (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("inferSchema", True)
.option("cloudFiles.inferColumnTypes", "true")
.option("recursiveFileLookup", "true")
.load(landing_json_path)
)
# Add metadata for inserted time
df = df.withColumn("SDA_Inserted", F.date_format(F.current_timestamp(), "yyyy-MM-dd HH:mm:ss"))
return df
# Transactions_Bronze
dlt.create_streaming_table(
name="Transactions_Bronze", # No database qualifier in the table name
table_properties={
"quality": "silver",
"delta.enableChangeDataFeed": "true"
}
)
# Apply change data capture logic to handle SCD Type 2 for the streaming table
dlt.apply_changes(
target="Transactions_Bronze",
source="Transactions_Bronze_Raw",
keys=["Company", "VoucherType", "AccountingYear", "VoucherNo", "RowNo"],
sequence_by=F.col("SDA_Inserted"),
stored_as_scd_type=2
)
Silver layer:
@dlt.table(
name="Transactions_Silver",
table_properties={
"quality": "silver",
"delta.enableChangeDataFeed": "true"
}
)
def Transactions_Silver():
# Read from the Bronze table
bronze_df = spark.readStream.format("delta").table("sda_edw_bronze.erp.Transactions_Bronze")
# Filter only open rows
silver_df = bronze_df.filter(F.col("__END_AT").isNull())
# Add a new SDA_Inserted column
silver_df = silver_df.withColumn(
"SDA_Inserted",
F.date_format(F.current_timestamp(), "yyyy-MM-dd HH:mm:ss")
)
# Return the transformed DataFrame
return silver_df
The issue is that the bronze layer fails,
"message": "Update ff5d14 is FAILED.",
"level": "ERROR",
"error": {
"exceptions": [
{
"message": "INTERNAL_ERROR: Pipeline cluster is not healthy."
}
]
But the cluster is fine, because I can run other notebooks without issues.
Do you see any issue with my code for the bronze layer?
First I wanted to create a view (as you suggested @filipniziol for the previous issue) of the input data and use that as a source for the bronze table, but that didnt work either. The pipeline ran without any issue, but the data dint get ingested. This is how that ide looked:
@dlt.view
def Transactions_Bronze_View():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("inferSchema", True)
.option("cloudFiles.inferColumnTypes", "true")
.option("recursiveFileLookup", "true")
.load(landing_json_path)
.withColumn("SDA_Inserted", F.date_format(F.current_timestamp(), "yyyy-MM-dd HH:mm:ss")) # Add metadata
)
# Transactions_Bronze
dlt.create_streaming_table(
name="Transactions_Bronze", # No database qualifier in the table name
table_properties={
"quality": "bronze",
"delta.enableChangeDataFeed": "true"
}
)
# Use the view as the source for SCD2 table creation
dlt.apply_changes(
target="Transactions_Bronze",
source="Transactions_Bronze_View",
keys=["Company", "VoucherType", "AccountingYear", "VoucherNo", "RowNo"],
sequence_by=F.col("SDA_Inserted"), # Timestamp column to sequence updates
stored_as_scd_type=2
)
Would appreciate all help I can get on building the bronze table as a SCD2.
Thanks in advance!
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
12-18-2024 02:31 AM
Hi @issa ,
Could you debug step by step.
Could you have a DLT pipeline that would do just this:
import dlt
from pyspark.sql.functions import col, lit, current_timestamp
from pyspark.sql import functions as F
@dlt.table(
name="Transactions_Bronze_Raw",
table_properties={
"delta.enableChangeDataFeed": "true",
"quality": "bronze"
}
)
def Transactions_Bronze_Raw():
# Read streaming data from the landing path (source of data)
df = (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("inferSchema", True)
.option("cloudFiles.inferColumnTypes", "true")
.option("recursiveFileLookup", "true")
.load(landing_json_path)
)
# Add metadata for inserted time
df = df.withColumn("SDA_Inserted", F.date_format(F.current_timestamp(), "yyyy-MM-dd HH:mm:ss"))
return df
Is the table Transactions_Bronze_Raw correctly created?
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
12-18-2024 04:10 AM
Yes, this part is fine:
import dlt
from pyspark.sql.functions import col, lit, current_timestamp
from pyspark.sql import functions as F
storage_account = 'xxxxx'
container = 'xxxxxx'
landing_path = 'xxxxxx/Transactions'
landing_json_path = f"abfss://{container}@{storage_account}.dfs.core.windows.net/{landing_path}"
@dlt.table(
name="Transactions_Bronze_Raw",
table_properties={
"delta.enableChangeDataFeed": "true",
"quality": "bronze"
}
)
def Transactions_Bronze_Raw():
# Read streaming data from the landing path (source of data)
df = (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("inferSchema", True)
.option("cloudFiles.inferColumnTypes", "true")
.option("recursiveFileLookup", "true")
.load(landing_json_path)
)
# Add metadata for inserted time
df = df.withColumn("SDA_Inserted", F.date_format(F.current_timestamp(), "yyyy-MM-dd HH:mm:ss"))
return df
This issue is with this part:
import dlt
from pyspark.sql.functions import col, lit, current_timestamp
from pyspark.sql import functions as F
@dlt.view
def Transactions_Bronze_Raw():
return spark.readStream.table("sda_edw_bronze.erp.Transactions_Bronze_Raw")
dlt.create_streaming_table(
name="Transactions_Bronze", # No database qualifier in the table name
table_properties={
"quality": "bronze",
"delta.enableChangeDataFeed": "true"
}
)
dlt.apply_changes(
source="Transactions_Bronze_Raw",
target="Transactions_Bronze",
keys=["Company", "VoucherType", "AccountingYear", "VoucherNo", "RowNo"],
sequence_by=F.col("SDA_Inserted"),
stored_as_scd_type=2
)
It gives me this (see message below). Although the cluster is fine, because it created the Transactions_Bronze_Raw with any issue. But fail when running Transactions_Bronze
"timestamp": "2024-12-18T11:59:42.736Z",
"message": "Update 1171f1 is FAILED.",
"level": "ERROR",
"error": {
"exceptions": [
{
"message": "INTERNAL_ERROR: Pipeline cluster is not healthy."
}
],
"internal_exceptions": [
{
"class_name": "com.databricks.api.base.DatabricksServiceException",
"message": "INTERNAL_ERROR: Pipeline cluster is not healthy.",
"stack": [
{
"declaring_class": "com.databricks.api.base.DatabricksServiceException$",
"method_name": "apply",
"file_name": "DatabricksServiceException.scala",
"line_number": 400
},
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
12-18-2024 06:09 AM
Hi @issa ,
Ok, so Transactions_Bronze_Raw is created and the data is in place. I do not see any obvious reason why it is not working, so in your situation I would try to debug step by step changing one setting at a time.
Here a couple of things I would test:
1. Could you make sure that "Company", "VoucherType", "AccountingYear", "VoucherNo", "RowNo" for a given SDA_Inserted does not return any duplicates?
2. Could you remove table properties from Transactions_Bronze:
dlt.create_streaming_table(
name="Transactions_Bronze"
)
3. Change stored_as_scd_type=1
4. I would change the table names to lowercase
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
12-19-2024 12:35 AM
Hi @issa ,
Also, could you share what is the Databricks Runtime that is used to run you DLT?
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
12-19-2024 02:20 AM
@filipniziol - its resolved now, thanks. What I did to solve it was to delete the tables and runt everything again. Even though I did a full refresh from DLT, that didnt seem to do the trick. So I deleted the tables thinking that it could be some cached metadata issue for the bronze table.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
12-19-2024 02:26 AM
Final solution for the Bronze:
# Define view as the source
@dlt.view
def Transactions_Bronze_View():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("inferSchema", True)
.option("cloudFiles.inferColumnTypes", "true")
.option("recursiveFileLookup", "true")
.load(landing_json_path)
.withColumn("SDA_Inserted", F.date_format(F.current_timestamp(), "yyyy-MM-dd HH:mm:ss")) # Add metadata
)
# Define a DLT to store the bronze data with Change Data Feed enabled
dlt.create_streaming_table(
name="Transactions_Bronze", # No database qualifier in the table name
table_properties={
"quality": "bronze",
"delta.enableChangeDataFeed": "true"
}
)
# Implement SCD type 2
dlt.apply_changes(
target="Transactions_Bronze",
source="Transactions_Bronze_View",
keys=["Company", "VoucherType", "AccountingYear", "VoucherNo", "RowNo"],
sequence_by=F.col("SDA_Inserted"), # Timestamp column to sequence updates
stored_as_scd_type=2
)

