11-29-2024 01:24 AM - edited 11-29-2024 01:28 AM
I have a job in Workflows thatt runs two DLT pipelines, one for Bronze_Transaction and on for Silver_Transaction. The reason for two DLT pipelines is because i want the tables to be created in bronze catalog and erp schema, and silver catalog and erp schema.
The notebooks for each DLT are:
Bronze:
import dlt
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, expr
from pyspark.sql.functions import round, unix_timestamp, current_timestamp
from pyspark.sql import functions as F
from pyspark.sql.types import *
storage_account = 'xxxxxxxxx'
container = 'xxxxxxxxxxxxx'
landing_path = 'Landing/Transactions'
landing_json_path = f"abfss://{container}@{storage_account}.dfs.core.windows.net/{landing_path}"
# Transactions_Bronze
# Serve as the raw source for downstream processing in silver or gold tables in the Delta Lakehouse.
@dlt.table(
name = "Transactions_Bronze",
table_properties={
"delta.enableChangeDataFeed": "true",
"quality": "bronze"
}
)
def Transactions_Bronze():
# Read data from JSON files
df = (spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("inferSchema", True)
.option("cloudFiles.inferColumnTypes", "true")
.option("recursiveFileLookup", "true")
.load(landing_json_path)
)
# Add metdadata column for insertion time
df = df.withColumn("SDA_Inserted", F.date_format(current_timestamp(), "yyyy-mm-dd HH:mm:ss"))
return df
SILVER:
import dlt
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, expr
from pyspark.sql.functions import round, unix_timestamp, current_timestamp
from pyspark.sql import functions as F
from pyspark.sql.types import *
# Transactions_Silver
dlt.create_streaming_table(
name="Transactions_Silver", # No database qualifier in the table name
table_properties={
"quality": "silver"
}
)
# Define the path to the external table
bronze_df = (
spark.readStream
.format("delta")
.table("bronze.erp.Transactions_Bronze")
)
# Apply changes from bronze to silver
dlt.apply_changes(
source=bronze_df, #first i tried to just write bronze.erp.transactions_bronze, but that failed to
target="Transactions_Silver",
keys=["Company", "VoucherType", "AccountingYear", "VoucherNo", "RowNo"],
stored_as_scd_type="2",
sequence_by="Inserted"
)
3 weeks ago
Final solution for the Bronze:
# Define view as the source
@dlt.view
def Transactions_Bronze_View():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("inferSchema", True)
.option("cloudFiles.inferColumnTypes", "true")
.option("recursiveFileLookup", "true")
.load(landing_json_path)
.withColumn("SDA_Inserted", F.date_format(F.current_timestamp(), "yyyy-MM-dd HH:mm:ss")) # Add metadata
)
# Define a DLT to store the bronze data with Change Data Feed enabled
dlt.create_streaming_table(
name="Transactions_Bronze", # No database qualifier in the table name
table_properties={
"quality": "bronze",
"delta.enableChangeDataFeed": "true"
}
)
# Implement SCD type 2
dlt.apply_changes(
target="Transactions_Bronze",
source="Transactions_Bronze_View",
keys=["Company", "VoucherType", "AccountingYear", "VoucherNo", "RowNo"],
sequence_by=F.col("SDA_Inserted"), # Timestamp column to sequence updates
stored_as_scd_type=2
)
12-04-2024 09:28 PM
By looking at your code, I can suggest a few changes that you can try out.
Instead of spark.readStream, you can use bronze_df = dlt.read_stream("Transactions_Bronze").
Revised Code:
bronze_df = dlt.read_stream("Transactions_Bronze")
dlt.apply_changes(
source=bronze_df,
target="Transactions_Silver",
keys=["Company", "VoucherType", "AccountingYear", "VoucherNo", "RowNo"],
stored_as_scd_type="2",
sequence_by="Inserted"
)
Additionally, you have created a column called SDA_Inserted in the bronze table, but in the silver table, the sequence_by parameter is based on the Inserted column. Make sure to correct the column name if there is no Inserted column already present in the data.
12-05-2024 12:18 AM
Hi @issa ,
As suggested by @ozaaditya , the issue is with the way you define the source parameter.
It cannot be a simple dataframe, but a DLT table or view.
Here is the code from documentation:
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.table("cdc_data.users")
dlt.create_streaming_table("target")
dlt.apply_changes(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
apply_as_truncates = expr("operation = 'TRUNCATE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = 1
)
So in your case the code for source should look like:
@dlt.view
def transactions_bronze():
return spark.readStream.table("bronze.erp.transactions_bronze")
And then you pass "transaction_bronze" as a source.
a month ago - last edited a month ago
Hi @filipniziol and @ozaaditya,
Thank you both for you input. I changed the code, since I figured that the SCD should be on the bronze layer and that i then should filter out open rows in silver.
However, that dosent work.
My idea was:
Bronze layer:
import dlt
from pyspark.sql.functions import col, lit, current_timestamp
from pyspark.sql import functions as F
@dlt.table(
name="Transactions_Bronze_Raw",
table_properties={
"delta.enableChangeDataFeed": "true",
"quality": "bronze"
}
)
def Transactions_Bronze_Raw():
# Read streaming data from the landing path (source of data)
df = (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("inferSchema", True)
.option("cloudFiles.inferColumnTypes", "true")
.option("recursiveFileLookup", "true")
.load(landing_json_path)
)
# Add metadata for inserted time
df = df.withColumn("SDA_Inserted", F.date_format(F.current_timestamp(), "yyyy-MM-dd HH:mm:ss"))
return df
# Transactions_Bronze
dlt.create_streaming_table(
name="Transactions_Bronze", # No database qualifier in the table name
table_properties={
"quality": "silver",
"delta.enableChangeDataFeed": "true"
}
)
# Apply change data capture logic to handle SCD Type 2 for the streaming table
dlt.apply_changes(
target="Transactions_Bronze",
source="Transactions_Bronze_Raw",
keys=["Company", "VoucherType", "AccountingYear", "VoucherNo", "RowNo"],
sequence_by=F.col("SDA_Inserted"),
stored_as_scd_type=2
)
Silver layer:
@dlt.table(
name="Transactions_Silver",
table_properties={
"quality": "silver",
"delta.enableChangeDataFeed": "true"
}
)
def Transactions_Silver():
# Read from the Bronze table
bronze_df = spark.readStream.format("delta").table("sda_edw_bronze.erp.Transactions_Bronze")
# Filter only open rows
silver_df = bronze_df.filter(F.col("__END_AT").isNull())
# Add a new SDA_Inserted column
silver_df = silver_df.withColumn(
"SDA_Inserted",
F.date_format(F.current_timestamp(), "yyyy-MM-dd HH:mm:ss")
)
# Return the transformed DataFrame
return silver_df
The issue is that the bronze layer fails,
"message": "Update ff5d14 is FAILED.",
"level": "ERROR",
"error": {
"exceptions": [
{
"message": "INTERNAL_ERROR: Pipeline cluster is not healthy."
}
]
But the cluster is fine, because I can run other notebooks without issues.
Do you see any issue with my code for the bronze layer?
First I wanted to create a view (as you suggested @filipniziol for the previous issue) of the input data and use that as a source for the bronze table, but that didnt work either. The pipeline ran without any issue, but the data dint get ingested. This is how that ide looked:
@dlt.view
def Transactions_Bronze_View():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("inferSchema", True)
.option("cloudFiles.inferColumnTypes", "true")
.option("recursiveFileLookup", "true")
.load(landing_json_path)
.withColumn("SDA_Inserted", F.date_format(F.current_timestamp(), "yyyy-MM-dd HH:mm:ss")) # Add metadata
)
# Transactions_Bronze
dlt.create_streaming_table(
name="Transactions_Bronze", # No database qualifier in the table name
table_properties={
"quality": "bronze",
"delta.enableChangeDataFeed": "true"
}
)
# Use the view as the source for SCD2 table creation
dlt.apply_changes(
target="Transactions_Bronze",
source="Transactions_Bronze_View",
keys=["Company", "VoucherType", "AccountingYear", "VoucherNo", "RowNo"],
sequence_by=F.col("SDA_Inserted"), # Timestamp column to sequence updates
stored_as_scd_type=2
)
Would appreciate all help I can get on building the bronze table as a SCD2.
Thanks in advance!
3 weeks ago
Hi @issa ,
Could you debug step by step.
Could you have a DLT pipeline that would do just this:
import dlt
from pyspark.sql.functions import col, lit, current_timestamp
from pyspark.sql import functions as F
@dlt.table(
name="Transactions_Bronze_Raw",
table_properties={
"delta.enableChangeDataFeed": "true",
"quality": "bronze"
}
)
def Transactions_Bronze_Raw():
# Read streaming data from the landing path (source of data)
df = (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("inferSchema", True)
.option("cloudFiles.inferColumnTypes", "true")
.option("recursiveFileLookup", "true")
.load(landing_json_path)
)
# Add metadata for inserted time
df = df.withColumn("SDA_Inserted", F.date_format(F.current_timestamp(), "yyyy-MM-dd HH:mm:ss"))
return df
Is the table Transactions_Bronze_Raw correctly created?
3 weeks ago
Yes, this part is fine:
import dlt
from pyspark.sql.functions import col, lit, current_timestamp
from pyspark.sql import functions as F
storage_account = 'xxxxx'
container = 'xxxxxx'
landing_path = 'xxxxxx/Transactions'
landing_json_path = f"abfss://{container}@{storage_account}.dfs.core.windows.net/{landing_path}"
@dlt.table(
name="Transactions_Bronze_Raw",
table_properties={
"delta.enableChangeDataFeed": "true",
"quality": "bronze"
}
)
def Transactions_Bronze_Raw():
# Read streaming data from the landing path (source of data)
df = (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("inferSchema", True)
.option("cloudFiles.inferColumnTypes", "true")
.option("recursiveFileLookup", "true")
.load(landing_json_path)
)
# Add metadata for inserted time
df = df.withColumn("SDA_Inserted", F.date_format(F.current_timestamp(), "yyyy-MM-dd HH:mm:ss"))
return df
This issue is with this part:
import dlt
from pyspark.sql.functions import col, lit, current_timestamp
from pyspark.sql import functions as F
@dlt.view
def Transactions_Bronze_Raw():
return spark.readStream.table("sda_edw_bronze.erp.Transactions_Bronze_Raw")
dlt.create_streaming_table(
name="Transactions_Bronze", # No database qualifier in the table name
table_properties={
"quality": "bronze",
"delta.enableChangeDataFeed": "true"
}
)
dlt.apply_changes(
source="Transactions_Bronze_Raw",
target="Transactions_Bronze",
keys=["Company", "VoucherType", "AccountingYear", "VoucherNo", "RowNo"],
sequence_by=F.col("SDA_Inserted"),
stored_as_scd_type=2
)
It gives me this (see message below). Although the cluster is fine, because it created the Transactions_Bronze_Raw with any issue. But fail when running Transactions_Bronze
"timestamp": "2024-12-18T11:59:42.736Z",
"message": "Update 1171f1 is FAILED.",
"level": "ERROR",
"error": {
"exceptions": [
{
"message": "INTERNAL_ERROR: Pipeline cluster is not healthy."
}
],
"internal_exceptions": [
{
"class_name": "com.databricks.api.base.DatabricksServiceException",
"message": "INTERNAL_ERROR: Pipeline cluster is not healthy.",
"stack": [
{
"declaring_class": "com.databricks.api.base.DatabricksServiceException$",
"method_name": "apply",
"file_name": "DatabricksServiceException.scala",
"line_number": 400
},
3 weeks ago
Hi @issa ,
Ok, so Transactions_Bronze_Raw is created and the data is in place. I do not see any obvious reason why it is not working, so in your situation I would try to debug step by step changing one setting at a time.
Here a couple of things I would test:
1. Could you make sure that "Company", "VoucherType", "AccountingYear", "VoucherNo", "RowNo" for a given SDA_Inserted does not return any duplicates?
2. Could you remove table properties from Transactions_Bronze:
dlt.create_streaming_table(
name="Transactions_Bronze"
)
3. Change stored_as_scd_type=1
4. I would change the table names to lowercase
3 weeks ago
Hi @issa ,
Also, could you share what is the Databricks Runtime that is used to run you DLT?
3 weeks ago
@filipniziol - its resolved now, thanks. What I did to solve it was to delete the tables and runt everything again. Even though I did a full refresh from DLT, that didnt seem to do the trick. So I deleted the tables thinking that it could be some cached metadata issue for the bronze table.
3 weeks ago
Final solution for the Bronze:
# Define view as the source
@dlt.view
def Transactions_Bronze_View():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("inferSchema", True)
.option("cloudFiles.inferColumnTypes", "true")
.option("recursiveFileLookup", "true")
.load(landing_json_path)
.withColumn("SDA_Inserted", F.date_format(F.current_timestamp(), "yyyy-MM-dd HH:mm:ss")) # Add metadata
)
# Define a DLT to store the bronze data with Change Data Feed enabled
dlt.create_streaming_table(
name="Transactions_Bronze", # No database qualifier in the table name
table_properties={
"quality": "bronze",
"delta.enableChangeDataFeed": "true"
}
)
# Implement SCD type 2
dlt.apply_changes(
target="Transactions_Bronze",
source="Transactions_Bronze_View",
keys=["Company", "VoucherType", "AccountingYear", "VoucherNo", "RowNo"],
sequence_by=F.col("SDA_Inserted"), # Timestamp column to sequence updates
stored_as_scd_type=2
)
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group