How to access bronze dlt in silver dlt

issa
New Contributor III

I have a job in Workflows thatt runs two DLT pipelines, one for Bronze_Transaction and on for Silver_Transaction. The reason for two DLT pipelines is because i want the tables to be created in bronze catalog and erp schema, and silver catalog and erp schema.

The notebooks for each DLT are:
Bronze:

import dlt
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, expr
from pyspark.sql.functions import round, unix_timestamp, current_timestamp
from pyspark.sql import functions as F
from pyspark.sql.types import *


storage_account = 'xxxxxxxxx'
container = 'xxxxxxxxxxxxx'
landing_path = 'Landing/Transactions'
landing_json_path = f"abfss://{container}@{storage_account}.dfs.core.windows.net/{landing_path}"

# Transactions_Bronze
# Serve as the raw source for downstream processing in silver or gold tables in the Delta Lakehouse.

@dlt.table(
    name = "Transactions_Bronze",
    table_properties={
        "delta.enableChangeDataFeed": "true",
        "quality": "bronze"
    }
)

def Transactions_Bronze():
    # Read data from JSON files
    df = (spark.readStream.format("cloudFiles")
          .option("cloudFiles.format", "json")
          .option("inferSchema", True)
          .option("cloudFiles.inferColumnTypes", "true")
          .option("recursiveFileLookup", "true")
          .load(landing_json_path)
          )

    # Add metdadata column for insertion time
    df = df.withColumn("SDA_Inserted", F.date_format(current_timestamp(), "yyyy-mm-dd HH:mm:ss"))
    return df


SILVER:

import dlt
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, expr
from pyspark.sql.functions import round, unix_timestamp, current_timestamp
from pyspark.sql import functions as F
from pyspark.sql.types import *

# Transactions_Silver
dlt.create_streaming_table(
    name="Transactions_Silver",  # No database qualifier in the table name
    table_properties={
        "quality": "silver"
    }
)

# Define the path to the external table
bronze_df = (
    spark.readStream
    .format("delta")
    .table("bronze.erp.Transactions_Bronze")
)

# Apply changes from bronze to silver
dlt.apply_changes(
    source=bronze_df, #first i tried to just write bronze.erp.transactions_bronze, but that failed to
    target="Transactions_Silver",
    keys=["Company", "VoucherType", "AccountingYear", "VoucherNo", "RowNo"],
    stored_as_scd_type="2",
    sequence_by="Inserted"
)

 
The bronze works without any issue, but the workflows fails on silver dlt: The error message I get is:
"timestamp": "2024-11-29T09:03:48.672Z",
"message": "Failed to resolve flow: 'transactions_silver'.",
"level": "ERROR",
"error": {
"exceptions": [
{
"class_name": "pyspark.errors.exceptions.base.PySparkAttributeError",
"message": "Traceback (most recent call last):\npyspark.errors.exceptions.base.PySparkAttributeError: [ATTRIBUTE_NOT_SUPPORTED] Attribute `_get_object_id` is not supported.",
"error_class": "ATTRIBUTE_NOT_SUPPORTED"
}
],
"fatal": true
},
"details": {
"flow_progress": {
"status": "FAILED"
}

Appreciative for all help I can get with this one