scvbelle
New Contributor III

UPDATE: 

See the code below (for some reason couldn't post it yesterday.)
I decided to revert back to an earlier version of the DLT pipeline which defined each table one by one rather than programatically, which had run without issue. That ran fine.
I added on a few extra tables and ran it before signing off last night, and started the job that updates the source tables

Today I ran the DLT pipeline (and adds to the source tables) and the second of the three tables/steps that I added failed with the same error.

@dataclass
class IdTable:
    schema_name: str
    table_name: str
    email_col_name: str
    idno_col_name: Optional[str] = None
    surname_col_name: Optional[str] = None
    table_user_id_col: Optional[str] = None


def generate_raw_table(source_path: str, target_name_base: str):
    @dlt.table(
        name=f"{target_name_base}_raw" ,
        comment=f"Raw records from bronze layer for {source_path}"
    )
    def create_live_raw_table():
        return (spark.readStream.table(source_path))
 
 
def generate_cleaned_bronze_table(target_name_base: str, id_table: IdTable):
    @dlt.table(
        name= f"{target_name_base}_clean"
        , comment=f"Cleaned records from raw bronze views for {target_name_base}_raw"
        # , partition_cols=["insert_time", "ref", "table_uid"]
    )
    def create_live_clean_table():
        df_raw = spark.readStream.table(f"LIVE.{target_name_base}_raw")  
        df_clean = df_raw.withColumn(
# ... 
)
        return df_clean


id_tables: List[IdTable] = [
    IdTable(
        schema_name="source_name",
        table_name="acc_user_account",
        idno_col_name="identification_nr",
        surname_col_name="surname",
        email_col_name="email_address",
        table_user_id_col="user_id"
        ),

]

for table in id_tables:
    source_path: str = f"bronze_raw.{table.schema_name}.{table.table_name}"
    target_name_base: str = f"{table.schema_name}_{table.table_name}"

    generate_raw_table(source_path=source_path, target_name_base=target_name_base)
    generate_cleaned_bronze_table(target_name_base=target_name_base, id_table=table)

the downstream code, which worked both runs

@dlt.table(
  comment = "UUIDs for where name AND id AND email match",
  partition_cols=["ref", "deduped_uid_1"],
)
def deduped_1_name_id_email():
    q: str = generate_dedup_query(
        source_table='deduped_0_per_table', 
        matching_cols=['ref'], 
        source_id_col='deduped_uid_0',
        output_id_col='deduped_uid_1'
        )
    return spark.sql(q)

the table that worked on the first run, then errored on the second

@dlt.table(
  comment = "UUIDs for where surname AND id NOT email match",
  partition_cols=["idno", "surname", "deduped_uid_2"],
)
def deduped_2_name_id():
    q: str = generate_dedup_query(
        source_table='deduped_1_name_id_email', 
        matching_cols=['idno', "surname"], 
        source_id_col='deduped_uid_1',
        output_id_col='deduped_uid_2',
        where_clause="WHERE idno is not null AND surname is not null"
        )
    print(q)
    return spark.sql(q)

 
My only thought is that its the values themselves causing the error, but during my debugging yesterday I tried adding a terminating column (containing a string like 'row_end') to no efffect.
I've tried storing them under diferent schemas, deleting and recreating the pipeline, no change.