Using great expectations with autolaoder

Chhaya
New Contributor III

Hi everyone ,

I have implemented a data pipeline using autoloader bronze-->silver-->gold .

now while I do this I want to perform some data quality checks , and for that I'm using great expectations library.

However I'm stuck with below error when trying to validate the data

validator.expect_column_values_to_not_be_null(column="col1")

validator.expect_column_values_to_be_in_set(

   column="col2",

   value_set=[1,6]

)

MetricResolutionError: Queries with streaming sources must be executed with writeStream.start();

looks like great expectations can work with only static/batch data.

Can someone advise how can I go about getting it working ?

I followed below in my databricks notebook to get started with great_expectations

https://docs.greatexpectations.io/docs/deployment_patterns/how_to_use_great_expectations_in_databric...

here is autoloader code

from pyspark.sql.functions import col, to_date, date_format
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, DateType
import time
 
# autoloader table and checkpoint paths
basepath = "/mnt/autoloaderdemodl/datagenerator/"
bronzeTable = basepath + "bronze/"
bronzeCheckpoint = basepath + "checkpoint/bronze/"
bronzeSchema = basepath + "schema/bronze/"
silverTable = basepath + "silver/"
silverCheckpoint = basepath + "checkpoint/silver/"
landingZoneLocation = "/mnt/autoloaderdemodl/datageneratorraw/customerdata_csv"
 
# Load data from the CSV file using Auto Loader to bronze table using rescue as schema evolution option
raw_df = spark.readStream.format("cloudFiles") \
            .option("cloudFiles.format", "csv") \
            .option("cloudFiles.schemaEvolutionMode", "rescue") \
            .option("Header", True) \
            .option("cloudFiles.schemaLocation", bronzeSchema) \
            .option("cloudFiles.inferSchema", "true") \
            .option("cloudFiles.inferColumnTypes", True) \
        .load(landingZoneLocation)
 
        # Write raw data to the bronze layer
bronze_df = raw_df.writeStream.format("delta") \
            .trigger(once=True) \
            .queryName("bronzeLoader") \
            .option("checkpointLocation", bronzeCheckpoint) \
            .option("mergeSchema", "true") \
            .outputMode("append") \
            .start(bronzeTable)
# Wait for the bronze stream to finish
bronze_df.awaitTermination()
bronze = spark.read.format("delta").load(bronzeTable)
bronze_count = bronze.count()
display(bronze)
print("Number of rows in bronze table: {}".format(bronze_count))
 
 
bronze_df = spark.readStream.format("delta").load(bronzeTable)
 
# Apply date format transformations to the DataFrame
# Transform the date columns
silver_df = bronze_df.withColumn("date1", to_date(col("date1"), "yyyyDDD"))\
                     .withColumn("date2", to_date(col("date2"), "yyyyDDD"))\
                     .withColumn("date3", to_date(col("date3"), "MMddyy"))
 
# Write the transformed DataFrame to the Silver layer
silver_stream  = silver_df.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("mergeSchema", "true") \
    .option("checkpointLocation", silverCheckpoint) \
    .trigger(once=True) \
    .start(silverTable)
 
# Wait for the write stream to complete
silver_stream.awaitTermination()
# Count the number of rows in the Silver table
silver = spark.read.format("delta").load(silverTable)
display(silver)
silver_count = silver.count()
print("Number of rows in silver table: {}".format(silver_count))

PS - customer doesn't want to use DLT yet.