cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Community Articles
Dive into a collaborative space where members like YOU can exchange knowledge, tips, and best practices. Join the conversation today and unlock a wealth of collective wisdom to enhance your experience and drive success.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Data Quality with PySpark and Great Expectations on Databricks

WiliamRosa
New Contributor II

Data governance is one of the most important pillars in any modern architecture. When building pipelines that process data at scale, ensuring data quality is not just a best practiceโ€”it is a critical necessity.

Tools like Great Expectations (GX) were created to fill this gap, allowing you to define automated, human-readable, and auditable validation rules.

In this article, Iโ€™ll show how to use PySpark + Great Expectations on Databricks, applying programmatic validations on Spark DataFrames in a simple and reusable way.

Why Great Expectations? 

Great Expectations (GX) is an open-source framework that enables you to: 

  • Define expectations about data (e.g., non-null columns, correct data types, unique values). 
  • Create living documentation of data quality. 
  • Integrate with different data engines: Pandas, Spark, SQLAlchemy. 
  • Generate automated validation reports. 

In the context of Databricks + Spark, itโ€™s a perfect match for validating distributed DataFrames before loading them into data lakes or data warehouses. 

For more details, see "https://greatexpectations.io/". 

Generic Validation Function 

Below is a generic function I wrote to encapsulate the main validations. It takes: 

  • A Spark DataFrame. 
  • An expected schema (as a dictionary). 
  • Optional parameters, such as expected row count and column order validation.
import great_expectations as gx
from pyspark.sql import DataFrame

def validate_with_gx(
    df: DataFrame,
    schema: dict,
    expected_row_count: int = None,
    check_ordered_columns: bool = True,
    enable_length_check: bool = False
) -> None:
    """
    Runs Great Expectations checks on a Spark DataFrame.
    """
    # 1) Build a transient GX context and Spark datasource
    context = gx.get_context()
    ds = context.data_sources.add_spark(name="spark_in_memory")
    asset = ds.add_dataframe_asset(name="df_asset")
    batch_def = asset.add_batch_definition_whole_dataframe("df_batch")
    batch = batch_def.get_batch(batch_parameters={"dataframe": df})

    # 2) Run expectations per schema
    from great_expectations import expectations as E
    results = []
    ordered_cols = []
    for col, props in schema.items():
        ordered_cols.append(col)

        if props.get("unique", False):
            results.append(batch.validate(E.ExpectColumnValuesToBeUnique(column=col)))
        if props.get("nullable", True) is False:
            results.append(batch.validate(E.ExpectColumnValuesToNotBeNull(column=col)))

        dtype = props.get("dtype")
        if dtype:
            results.append(batch.validate(E.ExpectColumnValuesToBeOfType(column=col, type_=dtype)))

        if enable_length_check:
            size = props.get("size")
            if size is not None:
                results.append(
                    batch.validate(
                        E.ExpectColumnValueLengthsToBeBetween(
                            column=col, min_value=None, max_value=int(size), strict_max=True
                        )
                    )
                )

    # 3) Table-level expectations
    if check_ordered_columns:
        results.append(batch.validate(E.ExpectTableColumnsToMatchOrderedList(column_list=ordered_cols)))
    if expected_row_count is not None:
        results.append(batch.validate(E.ExpectTableRowCountToEqual(value=int(expected_row_count))))

    # 4) Summarize results
    total = len(results)
    successes = sum(1 for r in results if getattr(r, "success", False))
    failures = total - successes

    print(f"[DQ] Expectations run: {total} | Passed: {successes} | Failed: {failures}")
    if failures > 0:
        for r in results:
            if not getattr(r, "success", False):
                cfg = getattr(r, "expectation_config", None)
                etype = getattr(cfg, "type", "unknown") if cfg else "unknown"
                kwargs = getattr(cfg, "kwargs", {}) if cfg else {}
                print(f"[DQ][FAIL] {etype} {kwargs}")
        raise Exception("Data Quality validation failed.")
    else:
        print("[DQ] All checks passed โœ”๏ธ")

Usage Example 

We define an expected schema as a dictionary: 

expected_schema = {
    "id":         {"size": None, "dtype": "IntegerType",  "unique": True,  "nullable": False},
    "name":       {"size": 255,  "dtype": "StringType",   "unique": False, "nullable": False},
    "created_at": {"size": None, "dtype": "TimestampType","unique": False, "nullable": False},
}

Create a test DataFrame: 

demo_df = spark.createDataFrame(
    [(1, "Alice", "2024-01-01 10:00:00"),
     (2, "Bob",   "2024-01-01 11:00:00")],
    ["id", "name", "created_at"]
).selectExpr(
    "CAST(id AS INT) as id",
    "CAST(name AS STRING) as name",
    "to_timestamp(created_at) as created_at"
)

expected_rows = demo_df.count()

validate_with_gx(
    df=demo_df,
    schema=expected_schema,
    expected_row_count=expected_rows,
    check_ordered_columns=True,
    enable_length_check=False
)

Expected Output 

If all expectations are met: 

[DQ] Expectations run: 5 | Passed: 5 | Failed: 0 
[DQ] All checks passed โœ”๏ธ 

If there are failures, the log will show details about the unmet expectation (e.g., null values or wrong data type). 

 When to Use on Databricks? 

This type of validation is ideal for: 

  • ETL/ELT pipelines โ†’ validating intermediate tables before saving them to the Delta Lake. 
  • Data mesh โ†’ enforcing data contracts across domains. 
  • Governance โ†’ producing evidence of data quality for audits. 
  • Monitoring โ†’ detecting schema breaks and anomalies early. 

Complete Expectations Reference 

For the full catalog of expectations available in Great Expectations, consult the official index at "https://greatexpectations.io/expectations/". 

Summary List of Common Expectations 

Below is a concise list of frequently used expectations (names as used in GXโ€™s V3 API). Use them as building blocks for your contracts: 

  • ExpectTableRowCountToEqual โ€” Enforces an exact number of rows. 
  • ExpectTableRowCountToBeBetween โ€” Enforces a row count range. 
  • ExpectTableColumnsToMatchOrderedList โ€” Ensures the table has exactly these columns in this order. 
  • ExpectTableColumnCountToBeBetween โ€” Enforces a range of column count. 
  • ExpectColumnValuesToNotBeNull โ€” Disallows nulls in a column. 
  • ExpectColumnValuesToBeUnique โ€” Enforces uniqueness on a column. 
  • ExpectMulticolumnValuesToBeUnique โ€” Enforces uniqueness across multiple columns (composite key). 
  • ExpectColumnValuesToBeInSet โ€” Values must be in a given whitelist. 
  • ExpectColumnDistinctValuesToBeInSet โ€” All distinct values are from a given set. 
  • ExpectColumnValuesToBeBetween โ€” Numeric or datetime values fall within [min, max]. 
  • ExpectColumnValueLengthsToBeBetween โ€” String length bounds (often used with VARCHAR-like limits). 
  • ExpectColumnValuesToMatchRegex โ€” String values match a regular expression. 
  • ExpectColumnValuesToMatchRegexList โ€” String values match at least one regex from a list. 
  • ExpectColumnValuesToBeOfType โ€” Column has an expected data type (e.g., "StringType"). 
  • ExpectColumnValuesToBeDateutilParseable โ€” Values can be parsed as dates. 
  • ExpectColumnValuesToBeInTypeList โ€” Type belongs to an allowed set. 
  • ExpectColumnMedianToBeBetween / ExpectColumnMeanToBeBetween โ€” Distribution sanity checks. 
  • ExpectColumnQuantileValuesToBeBetween โ€” Quantiles fall within ranges. 
  • ExpectColumnProportionOfUniqueValuesToBeBetween โ€” Cardinality sanity check. 
  • ExpectColumnPairValuesToBeInSet โ€” Validates allowed combinations across two columns. 
  • ExpectColumnValuesToBeUniqueWithinRecord โ€” No duplicate values within a row (useful for wide tables with repeating fields). 

Conclusion 

Integrating PySpark + Great Expectations within Databricks is a powerful way to boost data reliability. 

With just a few lines of code, we can: 

  • Validate schemas, columns, and types. 
  • Ensure quality before persisting to the data lake. 
  • Automate checks across critical pipelines. 
  • Run data quality checks across batches of multiple tables, not just individual DataFrames. 
  • Persist validation results into Delta Tables, making them available for monitoring and visualization through dashboards such as Power BI or Databricks SQL. 
Wiliam Rosa
Data Engineer | Machine Learning Engineer
LinkedIn: linkedin.com/in/wiliamrosa
1 ACCEPTED SOLUTION

Accepted Solutions

WiliamRosa
New Contributor II

Hi @BR_DatabricksAI 

Thank you very much for the question, and if you can't access external libraries, there's the option of using Databricks' own DQ features. There's a really cool post about it, I'll leave the link below:

https://www.databricks.com/discover/pages/data-quality-management

Wiliam Rosa
Data Engineer | Machine Learning Engineer
LinkedIn: linkedin.com/in/wiliamrosa

View solution in original post

3 REPLIES 3

BR_DatabricksAI
Contributor III

Hello @WiliamRosa  : Thanks for sharing the nice article on DQ. I would like to hear from your end what are the other alternative options exist, if we don't want to go for external libraries.  

BR

WiliamRosa
New Contributor II

Hi @BR_DatabricksAI 

Thank you very much for the question, and if you can't access external libraries, there's the option of using Databricks' own DQ features. There's a really cool post about it, I'll leave the link below:

https://www.databricks.com/discover/pages/data-quality-management

Wiliam Rosa
Data Engineer | Machine Learning Engineer
LinkedIn: linkedin.com/in/wiliamrosa

BR_DatabricksAI
Contributor III

@WiliamRosaWiliamRosa:  Thanks for sharing the link. I will explore. 

BR