- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
08-30-2023 07:37 PM
UPDATE:
See the code below (for some reason couldn't post it yesterday.)
I decided to revert back to an earlier version of the DLT pipeline which defined each table one by one rather than programatically, which had run without issue. That ran fine.
I added on a few extra tables and ran it before signing off last night, and started the job that updates the source tables
Today I ran the DLT pipeline (and adds to the source tables) and the second of the three tables/steps that I added failed with the same error.
@dataclass
class IdTable:
schema_name: str
table_name: str
email_col_name: str
idno_col_name: Optional[str] = None
surname_col_name: Optional[str] = None
table_user_id_col: Optional[str] = None
def generate_raw_table(source_path: str, target_name_base: str):
@dlt.table(
name=f"{target_name_base}_raw" ,
comment=f"Raw records from bronze layer for {source_path}"
)
def create_live_raw_table():
return (spark.readStream.table(source_path))
def generate_cleaned_bronze_table(target_name_base: str, id_table: IdTable):
@dlt.table(
name= f"{target_name_base}_clean"
, comment=f"Cleaned records from raw bronze views for {target_name_base}_raw"
# , partition_cols=["insert_time", "ref", "table_uid"]
)
def create_live_clean_table():
df_raw = spark.readStream.table(f"LIVE.{target_name_base}_raw")
df_clean = df_raw.withColumn(
# ...
)
return df_clean
id_tables: List[IdTable] = [
IdTable(
schema_name="source_name",
table_name="acc_user_account",
idno_col_name="identification_nr",
surname_col_name="surname",
email_col_name="email_address",
table_user_id_col="user_id"
),
]
for table in id_tables:
source_path: str = f"bronze_raw.{table.schema_name}.{table.table_name}"
target_name_base: str = f"{table.schema_name}_{table.table_name}"
generate_raw_table(source_path=source_path, target_name_base=target_name_base)
generate_cleaned_bronze_table(target_name_base=target_name_base, id_table=table)the downstream code, which worked both runs
@dlt.table(
comment = "UUIDs for where name AND id AND email match",
partition_cols=["ref", "deduped_uid_1"],
)
def deduped_1_name_id_email():
q: str = generate_dedup_query(
source_table='deduped_0_per_table',
matching_cols=['ref'],
source_id_col='deduped_uid_0',
output_id_col='deduped_uid_1'
)
return spark.sql(q)the table that worked on the first run, then errored on the second
@dlt.table(
comment = "UUIDs for where surname AND id NOT email match",
partition_cols=["idno", "surname", "deduped_uid_2"],
)
def deduped_2_name_id():
q: str = generate_dedup_query(
source_table='deduped_1_name_id_email',
matching_cols=['idno', "surname"],
source_id_col='deduped_uid_1',
output_id_col='deduped_uid_2',
where_clause="WHERE idno is not null AND surname is not null"
)
print(q)
return spark.sql(q)
My only thought is that its the values themselves causing the error, but during my debugging yesterday I tried adding a terminating column (containing a string like 'row_end') to no efffect.
I've tried storing them under diferent schemas, deleting and recreating the pipeline, no change.