โ03-09-2023 12:05 AM
Hi everyone ,
I have implemented a data pipeline using autoloader bronze-->silver-->gold .
now while I do this I want to perform some data quality checks , and for that I'm using great expectations library.
However I'm stuck with below error when trying to validate the data
validator.expect_column_values_to_not_be_null(column="col1")
โ
validator.expect_column_values_to_be_in_set(
column="col2",
value_set=[1,6]
)
MetricResolutionError: Queries with streaming sources must be executed with writeStream.start();
looks like great expectations can work with only static/batch data.
Can someone advise how can I go about getting it working ?
I followed below in my databricks notebook to get started with great_expectations
here is autoloader code
from pyspark.sql.functions import col, to_date, date_format
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, DateType
import time
# autoloader table and checkpoint paths
basepath = "/mnt/autoloaderdemodl/datagenerator/"
bronzeTable = basepath + "bronze/"
bronzeCheckpoint = basepath + "checkpoint/bronze/"
bronzeSchema = basepath + "schema/bronze/"
silverTable = basepath + "silver/"
silverCheckpoint = basepath + "checkpoint/silver/"
landingZoneLocation = "/mnt/autoloaderdemodl/datageneratorraw/customerdata_csv"
# Load data from the CSV file using Auto Loader to bronze table using rescue as schema evolution option
raw_df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "csv") \
.option("cloudFiles.schemaEvolutionMode", "rescue") \
.option("Header", True) \
.option("cloudFiles.schemaLocation", bronzeSchema) \
.option("cloudFiles.inferSchema", "true") \
.option("cloudFiles.inferColumnTypes", True) \
.load(landingZoneLocation)
# Write raw data to the bronze layer
bronze_df = raw_df.writeStream.format("delta") \
.trigger(once=True) \
.queryName("bronzeLoader") \
.option("checkpointLocation", bronzeCheckpoint) \
.option("mergeSchema", "true") \
.outputMode("append") \
.start(bronzeTable)
# Wait for the bronze stream to finish
bronze_df.awaitTermination()
bronze = spark.read.format("delta").load(bronzeTable)
bronze_count = bronze.count()
display(bronze)
print("Number of rows in bronze table: {}".format(bronze_count))
bronze_df = spark.readStream.format("delta").load(bronzeTable)
# Apply date format transformations to the DataFrame
# Transform the date columns
silver_df = bronze_df.withColumn("date1", to_date(col("date1"), "yyyyDDD"))\
.withColumn("date2", to_date(col("date2"), "yyyyDDD"))\
.withColumn("date3", to_date(col("date3"), "MMddyy"))
# Write the transformed DataFrame to the Silver layer
silver_stream = silver_df.writeStream \
.format("delta") \
.outputMode("append") \
.option("mergeSchema", "true") \
.option("checkpointLocation", silverCheckpoint) \
.trigger(once=True) \
.start(silverTable)
# Wait for the write stream to complete
silver_stream.awaitTermination()
# Count the number of rows in the Silver table
silver = spark.read.format("delta").load(silverTable)
display(silver)
silver_count = silver.count()
print("Number of rows in silver table: {}".format(silver_count))
PS - customer doesn't want to use DLT yet.
โ03-16-2023 01:05 AM
@Chhaya Vishwakarmaโ :
The error message suggests that the issue is related to the streaming nature of your data, which requires using a specific method to execute queries that involves streaming sources. Great Expectations is designed to work with batch/static data, which means that it cannot be used directly to validate streaming data sources.
One solution to this issue is to use the foreachBatch API provided by Spark Structured Streaming, which allows you to apply a batch function on the streaming data. Within this function, you can apply your data quality checks using Great Expectations.
Hope this helps to get you move ahead,
โ03-16-2023 10:36 PM
thanks for the reply, I already tried foreachbatch doesn't really work. You are right GE doesn't support streaming data out of the box.
โ03-17-2023 08:19 AM
@Chhaya Vishwakarmaโ :
One alternative you can consider is to perform the data quality checks as part of your streaming pipeline, using Spark's built-in validation features. You can use assertions such as isNotNull() or isin() to check the quality of the data before writing it to the next layer in the pipeline. For example, you can add the following code to your bronze to silver transformation: (Please verify and modify the code to suit your use case)
silver_df = bronze_df.withColumn("date1", to_date(col("date1"), "yyyyDDD"))\
.withColumn("date2", to_date(col("date2"), "yyyyDDD"))\
.withColumn("date3", to_date(col("date3"), "MMddyy"))\
.filter(col("col1").isNotNull())\
.filter(col("col2").isin([1,6]))
This will filter out any rows where col1 is null or col2 is not in the set [1,6]. You can similarly add additional quality checks as needed.
Please remember to upvote the best answer that helped you! Asides if you need more follow ups, do reply on the thread, happy to circle back again.
โ03-23-2023 01:01 AM
Thanks @Suteja Kanuriโ for your recommendations, customer is convinced to use DLT so moving to that : ) .
โ04-01-2023 09:26 PM
@Chhaya Vishwakarmaโ : good choice ! All the best with DLT.
โ03-31-2023 05:18 PM
Hi @Chhaya Vishwakarmaโ
Thank you for your question! To assist you better, please take a moment to review the answer and let me know if it best fits your needs.
Please help us select the best solution by clicking on "Select As Best" if it does.
Your feedback will help us ensure that we are providing the best possible service to you.
Thank you!
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโt want to miss the chance to attend and share knowledge.
If there isnโt a group near you, start one and help create a community that brings people together.
Request a New Group