Hey everyone, im using autoloader x soda.
I'm new to both,
The idea is to ingest with quality checks in my silver table for every batch in a continuous ingestion.
I tried to configure soda as str just like the docs show, but its seems that it keeps on trying to find a configuration file.
please feel free to comment on any bad practice if you see it:
def process_batch(df, batch_id):
soda_config ="""
soda_cloud:
host: cloud.soda.io
api_key_id: x
api_key_secret: x
""" #move to enviroment
checks = """
checks for dfSilver:
- missing_count(site) = 0:
name: Ensure there are no null values in the Site column
fail: 1
warn: 0
- missing_count(meter) = 0:
name: Ensure there are no null values in the Meter column
fail: 1
warn: 0
- missing_count(device_time) = 0:
name: Ensure there are no null values in the Device Time column
fail: 1
warn: 0
- missing_count(energy_cumulative_active_value) = 0:
name: Ensure there are no null values in the Cumulative Active Energy Value column
fail: 1
warn: 0
- missing_count(energy_cumulative_active_unit) = 0:
name: Ensure there are no null values in the Cumulative Active Energy Unit column
fail: 1
warn: 0
- missing_count(power_instantaneous_active_value) = 0:
name: Ensure there are no null values in the Instantaneous Active Power Value column
fail: 1
warn: 0
- missing_count(power_instantaneous_active_unit) = 0:
name: Ensure there are no null values in the Instantaneous Active Power Unit column
fail: 1
warn: 0
- missing_count(power_average_active_value) = 0:
name: Ensure there are no null values in the Average Active Power Value column
fail: 1
warn: 0
- missing_count(power_average_active_unit) = 0:
name: Ensure there are no null values in the Average Active Power Unit column
fail: 1
warn: 0
- invalid(energy_cumulative_active_unit) in ("kWh"):
name: Ensure valid units in the Active Energy Unit column
fail: 1
warn: 0
"""
# Initialize the Soda Scan for each batch
scan = Scan()
scan.add_configuration_yaml_str(soda_config)
scan.set_scan_definition_name("Silver Ingestion DQs")
scan.add_sodacl_yaml_str(checks)
scan.set_data_source_name("dfSilver")
# Run Soda checks on the batch
scan.execute()
# Filter records with no null values (applying your filtering logic)
df_filtered = df.filter(
(~isnull(col("site"))) &
(~isnull(col("meter"))) &
(~isnull(col("device_time"))) &
(~isnull(col("data.energy.cumulative.active.value"))) &
(~isnull(col("data.energy.cumulative.active.unit"))) &
(~isnull(col("data.power.instantaneous.active.value"))) &
(~isnull(col("data.power.instantaneous.active.unit"))) &
(~isnull(col("data.power.average.active.value"))) &
(~isnull(col("data.power.average.active.unit")))
)
# Write the filtered data to the Silver table (or another target)
df_filtered.write \
.format("delta") \
.mode("append") \
.save(silver_table_path)
dfSilver.writeStream \
.foreachBatch(process_batch) \
.trigger(processingTime="1 second") \
.start()
this is the error i get: