cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Trying to figure out what is causing non-null values in my bronze tables to be returned as NULL in silver tables.

190809
Contributor

I have a process which loads data from json to a bronze table. It then adds a couple of columns and creates a silver table. But the silver table has NULL values where there were values in the bronze tables. Process as follows:

def load_to_silver(source, table):

   file_format = 'json'

   context_path = f'/mnt/company-data-landing/{source}/{table}/'

# Schema evolution set to rescue - new schema breaking columns will be stored in rescued rather than failing the pipeline run.

# backfillInterval ensures that any missed files are eventually processed (weekly).

   cloudfile = {

   'cloudFiles.schemaEvolutionMode': 'rescue',

   'cloudFiles.schemaLocation': f'/mnt/ecologi-data-landing/{source}/{table}/schema/',

   'checkpointLocation': f'/mnt/ecologi-data-landing/{source}/{table}/checkpoints/',

   'schemaInference.sampleSize.numFiles': '200',

   'cloudFiles.format': file_format,

   'columnNameOfCorruptRecord': '_corrupt_record',

   'triggerOnce': 'true',

   'cloudFiles.backfillInterval': '1 week'

   }

   # Create a materialised staging table to house returned records.

   @dlt.table(name=f'bronze_{source}_{table}',

              comment=f'{source}_{table} processed from landing, to bronze streaming table. Data appended, with no changes.',

              )

   def bronze_billing_object():

       df = spark.readStream.format("cloudFiles") \

           .options(**cloudfile) \

           .load(context_path) \

           .withColumn("load_date", lit(datetime.datetime.now())) \

           .withColumn("source_file", input_file_name())

       return df

   dlt.create_streaming_live_table(name = f'silver_staging_{source}_{table}',

                           comment = f'{source}_{table} processed from bronze streaming table, to silver streaming table/ view. \

                                   Data merged rather than appended.',

   )

   if f'{table}'=='AddonsOnSubcription':

       dlt.apply_changes(

           target = f'silver_staging_{source}_{table}',

           source = f'bronze_{source}_{table}',

           keys = ['subscriptionId'],

           sequence_by = col('createdAt'),

           stored_as_scd_type = 1

           )

   elif f'{table}'=='Renewal':

       dlt.apply_changes(

           target = f'silver_staging_{source}_{table}',

           source = f'bronze_{source}_{table}',

           keys = ['invoiceId'],

           sequence_by = col('createdAt'),

           stored_as_scd_type = 1

           )

   else:

       dlt.apply_changes(

           target = f'silver_staging_{source}_{table}',

           source = f'bronze_{source}_{table}',

           keys = ['id'],

           sequence_by = col('createdAt'),

           stored_as_scd_type = 1

           )

   @dlt.table(name=f'silver_{source}_{table}',

              comment=f'{source}_{table} processed from silver staging streaming table/view, to silver non staging, batch live table. \

              Dtypes applied. Data copied from prev.',

              )

   def silver_billing_object():

       df1 = dlt.read(f'silver_staging_{source}_{table}')

       df2 = df1.withColumn("createdAt",col("createdAt").cast("timestamp"))

       return (df2)

for table in table_list:

   load_to_silver(notebook_context, table)

1 REPLY 1

Anonymous
Not applicable

@Rachel Cunninghamโ€‹ :

One possible reason for this issue could be a data type mismatch between the bronze and silver tables. It is possible that the column in the bronze table has a non-null value, but the data type of that column is different from the data type of the corresponding column in the silver table, which could result in a NULL value when the data is loaded into the silver table.

You can check the data types of the columns in both the bronze and silver tables using the

printSchema() method. Also, you may want to check if there are any transformations or modifications applied to the data between the bronze and silver tables that could cause this issue.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group