I have a process which loads data from json to a bronze table. It then adds a couple of columns and creates a silver table. But the silver table has NULL values where there were values in the bronze tables. Process as follows:
def load_to_silver(source, table):
file_format = 'json'
context_path = f'/mnt/company-data-landing/{source}/{table}/'
# Schema evolution set to rescue - new schema breaking columns will be stored in rescued rather than failing the pipeline run.
# backfillInterval ensures that any missed files are eventually processed (weekly).
cloudfile = {
'cloudFiles.schemaEvolutionMode': 'rescue',
'cloudFiles.schemaLocation': f'/mnt/ecologi-data-landing/{source}/{table}/schema/',
'checkpointLocation': f'/mnt/ecologi-data-landing/{source}/{table}/checkpoints/',
'schemaInference.sampleSize.numFiles': '200',
'cloudFiles.format': file_format,
'columnNameOfCorruptRecord': '_corrupt_record',
'triggerOnce': 'true',
'cloudFiles.backfillInterval': '1 week'
}
# Create a materialised staging table to house returned records.
@dlt.table(name=f'bronze_{source}_{table}',
comment=f'{source}_{table} processed from landing, to bronze streaming table. Data appended, with no changes.',
)
def bronze_billing_object():
df = spark.readStream.format("cloudFiles") \
.options(**cloudfile) \
.load(context_path) \
.withColumn("load_date", lit(datetime.datetime.now())) \
.withColumn("source_file", input_file_name())
return df
dlt.create_streaming_live_table(name = f'silver_staging_{source}_{table}',
comment = f'{source}_{table} processed from bronze streaming table, to silver streaming table/ view. \
Data merged rather than appended.',
)
if f'{table}'=='AddonsOnSubcription':
dlt.apply_changes(
target = f'silver_staging_{source}_{table}',
source = f'bronze_{source}_{table}',
keys = ['subscriptionId'],
sequence_by = col('createdAt'),
stored_as_scd_type = 1
)
elif f'{table}'=='Renewal':
dlt.apply_changes(
target = f'silver_staging_{source}_{table}',
source = f'bronze_{source}_{table}',
keys = ['invoiceId'],
sequence_by = col('createdAt'),
stored_as_scd_type = 1
)
else:
dlt.apply_changes(
target = f'silver_staging_{source}_{table}',
source = f'bronze_{source}_{table}',
keys = ['id'],
sequence_by = col('createdAt'),
stored_as_scd_type = 1
)
@dlt.table(name=f'silver_{source}_{table}',
comment=f'{source}_{table} processed from silver staging streaming table/view, to silver non staging, batch live table. \
Dtypes applied. Data copied from prev.',
)
def silver_billing_object():
df1 = dlt.read(f'silver_staging_{source}_{table}')
df2 = df1.withColumn("createdAt",col("createdAt").cast("timestamp"))
return (df2)
for table in table_list:
load_to_silver(notebook_context, table)