3 weeks ago
Data governance is one of the most important pillars in any modern architecture. When building pipelines that process data at scale, ensuring data quality is not just a best practiceโit is a critical necessity.
Tools like Great Expectations (GX) were created to fill this gap, allowing you to define automated, human-readable, and auditable validation rules.
In this article, Iโll show how to use PySpark + Great Expectations on Databricks, applying programmatic validations on Spark DataFrames in a simple and reusable way.
Why Great Expectations?
Great Expectations (GX) is an open-source framework that enables you to:
In the context of Databricks + Spark, itโs a perfect match for validating distributed DataFrames before loading them into data lakes or data warehouses.
For more details, see "https://greatexpectations.io/".
Generic Validation Function
Below is a generic function I wrote to encapsulate the main validations. It takes:
import great_expectations as gx
from pyspark.sql import DataFrame
def validate_with_gx(
df: DataFrame,
schema: dict,
expected_row_count: int = None,
check_ordered_columns: bool = True,
enable_length_check: bool = False
) -> None:
"""
Runs Great Expectations checks on a Spark DataFrame.
"""
# 1) Build a transient GX context and Spark datasource
context = gx.get_context()
ds = context.data_sources.add_spark(name="spark_in_memory")
asset = ds.add_dataframe_asset(name="df_asset")
batch_def = asset.add_batch_definition_whole_dataframe("df_batch")
batch = batch_def.get_batch(batch_parameters={"dataframe": df})
# 2) Run expectations per schema
from great_expectations import expectations as E
results = []
ordered_cols = []
for col, props in schema.items():
ordered_cols.append(col)
if props.get("unique", False):
results.append(batch.validate(E.ExpectColumnValuesToBeUnique(column=col)))
if props.get("nullable", True) is False:
results.append(batch.validate(E.ExpectColumnValuesToNotBeNull(column=col)))
dtype = props.get("dtype")
if dtype:
results.append(batch.validate(E.ExpectColumnValuesToBeOfType(column=col, type_=dtype)))
if enable_length_check:
size = props.get("size")
if size is not None:
results.append(
batch.validate(
E.ExpectColumnValueLengthsToBeBetween(
column=col, min_value=None, max_value=int(size), strict_max=True
)
)
)
# 3) Table-level expectations
if check_ordered_columns:
results.append(batch.validate(E.ExpectTableColumnsToMatchOrderedList(column_list=ordered_cols)))
if expected_row_count is not None:
results.append(batch.validate(E.ExpectTableRowCountToEqual(value=int(expected_row_count))))
# 4) Summarize results
total = len(results)
successes = sum(1 for r in results if getattr(r, "success", False))
failures = total - successes
print(f"[DQ] Expectations run: {total} | Passed: {successes} | Failed: {failures}")
if failures > 0:
for r in results:
if not getattr(r, "success", False):
cfg = getattr(r, "expectation_config", None)
etype = getattr(cfg, "type", "unknown") if cfg else "unknown"
kwargs = getattr(cfg, "kwargs", {}) if cfg else {}
print(f"[DQ][FAIL] {etype} {kwargs}")
raise Exception("Data Quality validation failed.")
else:
print("[DQ] All checks passed โ๏ธ")
Usage Example
We define an expected schema as a dictionary:
expected_schema = {
"id": {"size": None, "dtype": "IntegerType", "unique": True, "nullable": False},
"name": {"size": 255, "dtype": "StringType", "unique": False, "nullable": False},
"created_at": {"size": None, "dtype": "TimestampType","unique": False, "nullable": False},
}
Create a test DataFrame:
demo_df = spark.createDataFrame(
[(1, "Alice", "2024-01-01 10:00:00"),
(2, "Bob", "2024-01-01 11:00:00")],
["id", "name", "created_at"]
).selectExpr(
"CAST(id AS INT) as id",
"CAST(name AS STRING) as name",
"to_timestamp(created_at) as created_at"
)
expected_rows = demo_df.count()
validate_with_gx(
df=demo_df,
schema=expected_schema,
expected_row_count=expected_rows,
check_ordered_columns=True,
enable_length_check=False
)
Expected Output
If all expectations are met:
[DQ] Expectations run: 5 | Passed: 5 | Failed: 0
[DQ] All checks passed โ๏ธ
If there are failures, the log will show details about the unmet expectation (e.g., null values or wrong data type).
When to Use on Databricks?
This type of validation is ideal for:
Complete Expectations Reference
For the full catalog of expectations available in Great Expectations, consult the official index at "https://greatexpectations.io/expectations/".
Summary List of Common Expectations
Below is a concise list of frequently used expectations (names as used in GXโs V3 API). Use them as building blocks for your contracts:
Conclusion
Integrating PySpark + Great Expectations within Databricks is a powerful way to boost data reliability.
With just a few lines of code, we can:
2 weeks ago
Thank you very much for the question, and if you can't access external libraries, there's the option of using Databricks' own DQ features. There's a really cool post about it, I'll leave the link below:
https://www.databricks.com/discover/pages/data-quality-management
2 weeks ago
Hello @WiliamRosa : Thanks for sharing the nice article on DQ. I would like to hear from your end what are the other alternative options exist, if we don't want to go for external libraries.
2 weeks ago
Thank you very much for the question, and if you can't access external libraries, there's the option of using Databricks' own DQ features. There's a really cool post about it, I'll leave the link below:
https://www.databricks.com/discover/pages/data-quality-management
a week ago
@WiliamRosaWiliamRosa: Thanks for sharing the link. I will explore.
Passionate about hosting events and connecting people? Help us grow a vibrant local communityโsign up today to get started!
Sign Up Now