I have a job in Workflows thatt runs two DLT pipelines, one for Bronze_Transaction and on for Silver_Transaction. The reason for two DLT pipelines is because i want the tables to be created in bronze catalog and erp schema, and silver catalog and erp schema.
The notebooks for each DLT are:
Bronze:
import dlt
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, expr
from pyspark.sql.functions import round, unix_timestamp, current_timestamp
from pyspark.sql import functions as F
from pyspark.sql.types import *
storage_account = 'xxxxxxxxx'
container = 'xxxxxxxxxxxxx'
landing_path = 'Landing/Transactions'
landing_json_path = f"abfss://{container}@{storage_account}.dfs.core.windows.net/{landing_path}"
# Transactions_Bronze
# Serve as the raw source for downstream processing in silver or gold tables in the Delta Lakehouse.
@dlt.table(
name = "Transactions_Bronze",
table_properties={
"delta.enableChangeDataFeed": "true",
"quality": "bronze"
}
)
def Transactions_Bronze():
# Read data from JSON files
df = (spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("inferSchema", True)
.option("cloudFiles.inferColumnTypes", "true")
.option("recursiveFileLookup", "true")
.load(landing_json_path)
)
# Add metdadata column for insertion time
df = df.withColumn("SDA_Inserted", F.date_format(current_timestamp(), "yyyy-mm-dd HH:mm:ss"))
return df
SILVER:
import dlt
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, expr
from pyspark.sql.functions import round, unix_timestamp, current_timestamp
from pyspark.sql import functions as F
from pyspark.sql.types import *
# Transactions_Silver
dlt.create_streaming_table(
name="Transactions_Silver", # No database qualifier in the table name
table_properties={
"quality": "silver"
}
)
# Define the path to the external table
bronze_df = (
spark.readStream
.format("delta")
.table("bronze.erp.Transactions_Bronze")
)
# Apply changes from bronze to silver
dlt.apply_changes(
source=bronze_df, #first i tried to just write bronze.erp.transactions_bronze, but that failed to
target="Transactions_Silver",
keys=["Company", "VoucherType", "AccountingYear", "VoucherNo", "RowNo"],
stored_as_scd_type="2",
sequence_by="Inserted"
)
The bronze works without any issue, but the workflows fails on silver dlt: The error message I get is:
"timestamp": "2024-11-29T09:03:48.672Z",
"message": "Failed to resolve flow: 'transactions_silver'.",
"level": "ERROR",
"error": {
"exceptions": [
{
"class_name": "pyspark.errors.exceptions.base.PySparkAttributeError",
"message": "Traceback (most recent call last):\npyspark.errors.exceptions.base.PySparkAttributeError: [ATTRIBUTE_NOT_SUPPORTED] Attribute `_get_object_id` is not supported.",
"error_class": "ATTRIBUTE_NOT_SUPPORTED"
}
],
"fatal": true
},
"details": {
"flow_progress": {
"status": "FAILED"
}
Appreciative for all help I can get with this one