cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Using great expectations with autolaoder

Chhaya
New Contributor III

Hi everyone ,

I have implemented a data pipeline using autoloader bronze-->silver-->gold .

now while I do this I want to perform some data quality checks , and for that I'm using great expectations library.

However I'm stuck with below error when trying to validate the data

validator.expect_column_values_to_not_be_null(column="col1")

validator.expect_column_values_to_be_in_set(

   column="col2",

   value_set=[1,6]

)

MetricResolutionError: Queries with streaming sources must be executed with writeStream.start();

looks like great expectations can work with only static/batch data.

Can someone advise how can I go about getting it working ?

I followed below in my databricks notebook to get started with great_expectations

https://docs.greatexpectations.io/docs/deployment_patterns/how_to_use_great_expectations_in_databric...

here is autoloader code

from pyspark.sql.functions import col, to_date, date_format
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, DateType
import time
 
# autoloader table and checkpoint paths
basepath = "/mnt/autoloaderdemodl/datagenerator/"
bronzeTable = basepath + "bronze/"
bronzeCheckpoint = basepath + "checkpoint/bronze/"
bronzeSchema = basepath + "schema/bronze/"
silverTable = basepath + "silver/"
silverCheckpoint = basepath + "checkpoint/silver/"
landingZoneLocation = "/mnt/autoloaderdemodl/datageneratorraw/customerdata_csv"
 
# Load data from the CSV file using Auto Loader to bronze table using rescue as schema evolution option
raw_df = spark.readStream.format("cloudFiles") \
            .option("cloudFiles.format", "csv") \
            .option("cloudFiles.schemaEvolutionMode", "rescue") \
            .option("Header", True) \
            .option("cloudFiles.schemaLocation", bronzeSchema) \
            .option("cloudFiles.inferSchema", "true") \
            .option("cloudFiles.inferColumnTypes", True) \
        .load(landingZoneLocation)
 
        # Write raw data to the bronze layer
bronze_df = raw_df.writeStream.format("delta") \
            .trigger(once=True) \
            .queryName("bronzeLoader") \
            .option("checkpointLocation", bronzeCheckpoint) \
            .option("mergeSchema", "true") \
            .outputMode("append") \
            .start(bronzeTable)
# Wait for the bronze stream to finish
bronze_df.awaitTermination()
bronze = spark.read.format("delta").load(bronzeTable)
bronze_count = bronze.count()
display(bronze)
print("Number of rows in bronze table: {}".format(bronze_count))
 
 
bronze_df = spark.readStream.format("delta").load(bronzeTable)
 
# Apply date format transformations to the DataFrame
# Transform the date columns
silver_df = bronze_df.withColumn("date1", to_date(col("date1"), "yyyyDDD"))\
                     .withColumn("date2", to_date(col("date2"), "yyyyDDD"))\
                     .withColumn("date3", to_date(col("date3"), "MMddyy"))
 
# Write the transformed DataFrame to the Silver layer
silver_stream  = silver_df.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("mergeSchema", "true") \
    .option("checkpointLocation", silverCheckpoint) \
    .trigger(once=True) \
    .start(silverTable)
 
# Wait for the write stream to complete
silver_stream.awaitTermination()
# Count the number of rows in the Silver table
silver = spark.read.format("delta").load(silverTable)
display(silver)
silver_count = silver.count()
print("Number of rows in silver table: {}".format(silver_count))

PS - customer doesn't want to use DLT yet.

6 REPLIES 6

Anonymous
Not applicable

@Chhaya Vishwakarma​ :

The error message suggests that the issue is related to the streaming nature of your data, which requires using a specific method to execute queries that involves streaming sources. Great Expectations is designed to work with batch/static data, which means that it cannot be used directly to validate streaming data sources.

One solution to this issue is to use the foreachBatch API provided by Spark Structured Streaming, which allows you to apply a batch function on the streaming data. Within this function, you can apply your data quality checks using Great Expectations.

Hope this helps to get you move ahead,

Chhaya
New Contributor III

thanks for the reply, I already tried foreachbatch doesn't really work. You are right GE doesn't support streaming data out of the box.

Anonymous
Not applicable

@Chhaya Vishwakarma​ :

One alternative you can consider is to perform the data quality checks as part of your streaming pipeline, using Spark's built-in validation features. You can use assertions such as isNotNull() or isin() to check the quality of the data before writing it to the next layer in the pipeline. For example, you can add the following code to your bronze to silver transformation: (Please verify and modify the code to suit your use case)

silver_df = bronze_df.withColumn("date1", to_date(col("date1"), "yyyyDDD"))\
                     .withColumn("date2", to_date(col("date2"), "yyyyDDD"))\
                     .withColumn("date3", to_date(col("date3"), "MMddyy"))\
                     .filter(col("col1").isNotNull())\
                     .filter(col("col2").isin([1,6]))

This will filter out any rows where col1 is null or col2 is not in the set [1,6]. You can similarly add additional quality checks as needed.

Please remember to upvote the best answer that helped you! Asides if you need more follow ups, do reply on the thread, happy to circle back again.

Chhaya
New Contributor III

Thanks @Suteja Kanuri​ for your recommendations, customer is convinced to use DLT so moving to that : ) .

Anonymous
Not applicable

@Chhaya Vishwakarma​ : good choice ! All the best with DLT.

Anonymous
Not applicable

Hi @Chhaya Vishwakarma​ 

Thank you for your question! To assist you better, please take a moment to review the answer and let me know if it best fits your needs.

Please help us select the best solution by clicking on "Select As Best" if it does.

Your feedback will help us ensure that we are providing the best possible service to you.

Thank you!

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.