cancel
Showing results for 
Search instead for 
Did you mean: 
Community Articles
Dive into a collaborative space where members like YOU can exchange knowledge, tips, and best practices. Join the conversation today and unlock a wealth of collective wisdom to enhance your experience and drive success.
cancel
Showing results for 
Search instead for 
Did you mean: 

Handling the Chaos: Data Quality Strategies with PySpark Ingestion

dmatrixjsd
New Contributor

Tips and Techniques for Ingesting Large JSON files with PySpark

Introduction

Suppose you’ve ever struggled or grappled with consuming massive JSON files with PySpark. In that case, you are aware that insufficient data can always creep in and silently disrupt your ETL pipeline or pollute your data for downstream consumption.

In this blog post, I’ll walk you through three practical approaches to maintaining data quality during ingestion. We’ll cover how to drop records that don’t match your defined schema, validate that some numeric values are sensible (not just existent), and clean out any duplicate entries before they cause trouble downstream.

Screenshot 2025-11-16 at 10.42.50 AM.png                                    Figure 1. Overview of data quality strategies

These strategies seem simple yet effective techniques that can save you headaches when working with JSON files. While these three approaches are by no means exhaustive, other aspects of data quality, such as timeliness and consistency, will depend on data sources (real-time streaming, operational databases, etc.). Other external tools, such as Great Expectations, can also help mitigate these data validation issues.

For the past few weeks, I have been experimenting and cooperating with Claude Code, Cursor, and Goose, as my coding assistant: a generation and validation cycle, where I instruct, interact, validate, and reiterate with the AI coding agent for the best approach. Some of the code examples, test cases, and documentation are Cursor generated with my constant interactive approach of revision and validation, keeping AI “on a tight leash.”

Screenshot 2025-11-16 at 10.45.18 AM.png

Figure 2. Cycle of partial autonomy (source Andre Karpathy)     

If you have not used the Cursor coding agent in this manner of cooperative partial autonomy, you ought to try it. 🤖 😎. You can explore the cooperative results of those data quality strategies in the GitHub repository.    

                 Screenshot 2025-11-16 at 10.47.12 AM.png                                                 Figure 3. PySpark files that implement the recommended strategy  

   

Data Ingestion with Schema Enforcement

Consider schema enforcement. When working with large JSON files, one of the most critical steps is ensuring your data conforms to an expected schema before it pollutes your downstream processes. Schema enforcement in PySpark 4.0 DataFrame API gives you the power to catch malformed data early and handle it gracefully.

The key is being proactive rather than reactive. Instead of discovering schema mismatches after your ETL job has been running for hours, you can catch these issues upfront and decide how to handle them.

Example 1: Strict Schema Enforcement with Error Handling

Here’s how you can enforce a strict schema and capture any records that don’t comply. This strategy is a two-step process: FAILFAST for clean data; fallback to PERMISSIVE mode.

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
from pyspark.sql.functions import col

# Define schema once at module level for efficiency
TRANSACTION_SCHEMA = StructType([
    StructField("user_id", StringType(), nullable=False),
    StructField("transaction_amount", DoubleType(), nullable=False),
    StructField("transaction_date", TimestampType(), nullable=True),
    StructField("merchant_category", StringType(), nullable=True),
    StructField("customer_tier", StringType(), nullable=True)
])

def smart_json_load(spark, file_path):
   """
   Smart JSON Loading: Try strict first, fallback to permissive.
   Best of both worlds:
   - Fast processing for clean data (FAILFAST)
   - Graceful recovery for messy data (PERMISSIVE)
   """
  
   print(f"📋 Smart loading {file_path}...")
  
   # Step 1: Try FAILFAST for clean data
   try:
       print("🎯 Attempting FAILFAST mode...")
       df = spark.read.option("mode", "FAILFAST").schema(TRANSACTION_SCHEMA).json(file_path)
       count = df.count()
      
       # Force full evaluation to catch parsing errors - need to process all data
       try:
           df.collect()  # This forces processing of ALL records, not just the first
           print(f"✅ FAILFAST succeeded: {count} clean records")
           return df, None
       except Exception as eval_error:
           # This catches parsing errors that occur during data access
           raise eval_error
      
   except Exception:
       print("⚠️  FAILFAST failed, switching to PERMISSIVE mode...")
      
       # Step 2: Fallback to PERMISSIVE
       schema_with_corrupt = schema.add(StructField("_corrupt_record", StringType(), True))
       df = spark.read.option("mode", "PERMISSIVE").schema(schema_with_corrupt).json(file_path)
      
       # Cache to avoid Spark 4.0 corrupt record query restrictions
       df = df.cache()
       total_records = df.count()
      
       # Separate good from bad
       good_df = df.filter(col("_corrupt_record").isNull() & col("user_id").isNotNull()).drop("_corrupt_record")
       bad_df = df.filter(col("_corrupt_record").isNotNull() | col("user_id").isNull())
       good_count = good_df.count()
       bad_count = bad_df.count()
       print(f"🔄 PERMISSIVE recovery: {good_count} good, {bad_count} bad records")
       return good_df, bad_df


if __name__ == "__main__":
   spark = SparkSession.builder.appName("SmartJSONLoader").master("local[*]").getOrCreate()
  
   # Test with different data files
   test_files = [
       "data/clean_transactions.json",
       "data/mixed_transactions.json",
       "data/invalid_transactions.json"
   ]
  
   for file_path in test_files:
       print(f"\n{'='*60}")
       print(f"🧪 TESTING: {file_path}")
       print(f"{'='*60}")
             good_data, bad_data = smart_json_load(spark, file_path)
      
       if bad_data is None:
           print(f"🎯 Perfect! Clean data processed with FAILFAST")
       else:
           print(f"🛡️  Recovered data with PERMISSIVE fallback")
   spark.stop()

Example 2: Permissive Mode with Corrupt Record Tracking

Sometimes you want to ingest what you can and track what fails. This example demonstrates PERMISSIVE mode in PySpark 4.0, which enables the ingestion of data that can be parsed while tracking problematic records for later analysis and investigation. Ideal for exploratory data analysis and gradual improvement of data quality.

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
from pyspark.sql.functions import col

# Define schema once at module level for efficiency
PERMISSIVE_SCHEMA = StructType([
    StructField("user_id", StringType(), nullable=True),
    StructField("transaction_amount", DoubleType(), nullable=True),
    StructField("transaction_date", TimestampType(), nullable=True),
    StructField("merchant_category", StringType(), nullable=True),
    StructField("customer_tier", StringType(), nullable=True),
    StructField("_corrupt_record", StringType(), nullable=True)  # Captures bad records
])

def load_with_permissive_mode(spark, file_path):
   """
   Example 2: Permissive Mode with Corrupt Record Tracking
  
   Uses PERMISSIVE mode - never fails, captures bad records in _corrupt_record column.
   Perfect for messy data where you want to save what you can.
   """
  
   print(f"📋 Loading {file_path} with permissive mode...")
  
   # PERMISSIVE mode - never fails, captures corruption
   df = spark.read.option("mode", "PERMISSIVE").schema(PERMISSIVE_SCHEMA).json(file_path)
  
   total_records = df.count()
  
   # Separate good from bad records
   good_records = df.filter(col("_corrupt_record").isNull() & col("user_id").isNotNull())
   bad_records = df.filter(col("_corrupt_record").isNotNull() | col("user_id").isNull())
  
   good_count = good_records.count()
   bad_count = bad_records.count()
   success_rate = (good_count / total_records * 100) if total_records > 0 else 0
  
   print(f"📊 Results: {good_count}/{total_records} good records ({success_rate:.1f}% success)")
  
   if good_count > 0:
       print("✅ Sample good records:")
       good_records.drop("_corrupt_record").show(2, truncate=False)
  
   if bad_count > 0:
       print("❌ Sample bad records:")
       bad_records.select("_corrupt_record").show(2, truncate=False)
  
   return good_records, bad_records


if __name__ == "__main__":
   spark = SparkSession.builder.appName("PermissiveMode").master("local[*]").getOrCreate()
  
   # Test with different data files
   test_files = [
       "data/clean_transactions.json",
       "data/mixed_transactions.json",
       "data/invalid_transactions.json"
   ]
  
   for file_path in test_files:
       print(f"\n{'='*60}")
       print(f"🧪 TESTING: {file_path}")
       print(f"{'='*60}")
      
       good_data, bad_data = load_with_permissive_mode(spark, file_path)
       print(f"✅ PERMISSIVE always succeeds - captured {good_data.count()} good, {bad_data.count()} bad records")
  
   spark.stop()

 The beauty of this approach is that you can continue processing with the clean data while simultaneously capturing the problematic records for later analysis and review. This provides visibility into data quality issues without interrupting your entire pipeline. Code and test runs for the above examples are in this GitHub Repo.

Validating Data for Accuracy

The second approach is validating for data accuracy. Schema enforcement gets you part of the way there, but just because a field exists and has the correct data type doesn’t mean the data makes business sense. In this scenario, data validation ensures your transaction amounts fall within realistic ranges and your categorical values are from your approved business domains.

Think of this as your business logic validation layer. You’re not just checking that a transaction_amount field is a double; you’re checking that it’s a reasonable amount for a financial transaction. You’re not just verifying that merchant_category is a string; you’re ensuring it’s one of your supported merchant types like “grocery”, “restaurant”, or “retail” — not “currency”, which your payment system doesn’t handle.

Similarly, you’re not just confirming that customer_tier contains text data; you’re validating it matches your actual tier system (“bronze”, “silver”, “gold”, “platinum”) rather than made-up values like “diamond” that don’t exist in your business model. This validation layer catches data that’s technically valid but business-invalid:

  •  transaction_amount: -10.50 → Negative transactions don’t make sense
  •  transaction_amount: 15000.00 → Exceeds your transaction limits
  •  merchant_category: “currency” → Not in your supported merchant types
  •  customer_tier: “diamond” → Not in your tier system

Schema validation ensures data structure integrity; business logic validation ensures that the data makes sense within your business context.

Example 1: Range and Business Logic Validation

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when
from pyspark.sql.types import StructType, StructField, StringType, DoubleType


def validate_transaction_business_rules(df):
   """
   Business Logic Validation - ensuring data makes business sense and it goes beyond schema validation to check ranges, categories, and business constraints.
   """
 # Define business rules
   VALID_MERCHANTS = ["grocery", "restaurant", "gas_station", "retail", "entertainment", "other"]
   VALID_TIERS = ["bronze", "silver", "gold", "platinum"]
   MIN_AMOUNT = 0.01
   MAX_AMOUNT = 10000.0
  
   # Apply validation rules
   validated_df = df.withColumn(
       "validation_status",
       when(col("transaction_amount") < MIN_AMOUNT, "negative_or_zero_amount")
       .when(col("transaction_amount") > MAX_AMOUNT, "excessive_amount")
       .when(~col("merchant_category").isin(VALID_MERCHANTS), "invalid_merchant_category")
       .when(~col("customer_tier").isin(VALID_TIERS), "invalid_customer_tier")
       .otherwise("valid")
   )
  
   # Separate valid from invalid data
   valid_df = validated_df.filter(col("validation_status") == "valid").drop("validation_status")
   invalid_df = validated_df.filter(col("validation_status") != "valid")
  
   # Show validation summary
   total = validated_df.count()
   valid_count = valid_df.count()
   invalid_count = invalid_df.count()
   success_rate = (valid_count / total * 100) if total > 0 else 0
  
   print(f"📊 Business Logic Validation: {valid_count}/{total} valid ({success_rate:.1f}%)")
  
   if invalid_count > 0:
       print("🔍 Validation issues:")
       validated_df.groupBy("validation_status").count().show()
  
   return valid_df, invalid_df


if __name__ == "__main__":
   # Test the validation function
   spark = SparkSession.builder.appName("BusinessLogicValidation").master("local[*]").getOrCreate()
  
   # Create test data with various business rule violations
   test_data = [
       # Valid records
       ("user_001", 45.99, "grocery", "gold"),
       ("user_002", 125.50, "restaurant", "silver"),
       ("user_003", 89.99, "retail", "bronze"),
      
       # Invalid records
       ("user_004", -10.50, "grocery", "gold"),           # Negative amount
       ("user_005", 15000.00, "restaurant", "silver"),    # Excessive amount
       ("user_006", 50.00, "currency", "gold"),           # Invalid merchant
       ("user_007", 75.25, "grocery", "diamond"),         # Invalid tier
       ("user_008", 0.00, "gas_station", "platinum"),     # Zero amount
   ]
  
   schema = StructType([
       StructField("user_id", StringType(), False),
       StructField("transaction_amount", DoubleType(), False),
       StructField("merchant_category", StringType(), False),
       StructField("customer_tier", StringType(), False)
   ])
  
   test_df = spark.createDataFrame(test_data, schema)
  
   print("🧪 Testing Business Logic Validation")
   print("=" * 50)
  
   # Apply validation
   valid_data, invalid_data = validate_transaction_business_rules(test_df)
  
   # Verify results
   if valid_data.count() == 3 and invalid_data.count() == 5:
       print("✅ Test PASSED: 3 valid, 5 invalid records as expected")
   else:
       print(f"❌ Test FAILED: Expected 3 valid, 5 invalid. Got {valid_data.count()} valid, {invalid_data.count()} invalid")
   spark.stop()

The code example to validate business logic and data validation, and its relevant test run for the above example, is in this GitHub Repo.

In addition to schema and business logic validation and data consistency, another common source of data chaos is data duplication.

Dropping Data Duplicates

The final approach is to consider duplicates. Duplicate records can skew your analytics and reporting. They can inflate metrics, create incorrect aggregations, and lead to poor business decisions. PySpark DataFrame API provides powerful deduplication capabilities that go beyond simple row-by-row comparisons.

The key insight here is that duplicates aren’t always exact row matches. Sometimes, you need to define business logic around what constitutes a duplicate — perhaps it’s the same transaction ID, or the same customer making the same purchase within a specific time window.

Let’s examine the code for both scenarios: simple exact matches and business logic that dictates duplicates.

Example 1: Simple Deduplication with Performance Optimization

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
from pyspark.sql.functions import col, row_number, desc, when, lag, unix_timestamp, abs as spark_abs
from pyspark.sql.window import Window
from datetime import datetime

def create_sample_data(spark):
    """Create sample data with various duplicate patterns"""
    data = [
        # Exact duplicates
        ("TXN001", "USER123", 99.99, datetime(2024, 1, 15, 10, 30, 0), "RETAIL", "GOLD"),
        ("TXN001", "USER123", 99.99, datetime(2024, 1, 15, 10, 30, 0), "RETAIL", "GOLD"),
        
        # Same transaction_id, different details (system error)
        ("TXN002", "USER456", 150.00, datetime(2024, 1, 15, 11, 0, 0), "GROCERY", "SILVER"),
        ("TXN002", "USER789", 200.00, datetime(2024, 1, 15, 11, 5, 0), "RETAIL", "BRONZE"),
        
        # Same user + amount within 30 seconds (double-click)
        ("TXN003", "USER123", 49.99, datetime(2024, 1, 15, 12, 0, 0), "ONLINE", "GOLD"),
        ("TXN004", "USER123", 49.99, datetime(2024, 1, 15, 12, 0, 30), "ONLINE", "GOLD"),
        
        # Unique records
        ("TXN005", "USER456", 75.50, datetime(2024, 1, 15, 13, 0, 0), "RESTAURANT", "SILVER"),
        ("TXN006", "USER789", 120.00, datetime(2024, 1, 15, 14, 0, 0), "GAS", "BRONZE"),
    ]
    
    schema = StructType([
        StructField("transaction_id", StringType(), False),
        StructField("user_id", StringType(), False),
        StructField("transaction_amount", DoubleType(), False),
        StructField("transaction_date", TimestampType(), False),
        StructField("merchant_category", StringType(), True),
        StructField("customer_tier", StringType(), True)
    ])
    
    return spark.createDataFrame(data, schema)

def exact_deduplication(df):
    """Remove exact duplicate rows - fastest method"""
    return df.distinct()

def business_logic_deduplication(df):
    """Remove duplicates based on business rules"""
    
    # Rule 1: Same transaction_id = duplicate (keep first occurrence)
    window_txn = Window.partitionBy("transaction_id").orderBy("transaction_date")
    df = df.withColumn("txn_rank", row_number().over(window_txn)) \
           .filter(col("txn_rank") == 1) \
           .drop("txn_rank")
    
    # Rule 2: Same user + amount within 60 seconds = double-click (keep first)
    window_user = Window.partitionBy("user_id", "transaction_amount").orderBy("transaction_date")
    
    df = df.withColumn("prev_time", lag("transaction_date").over(window_user)) \
           .withColumn("time_diff", 
                      when(col("prev_time").isNull(), 999999)
                      .otherwise(spark_abs(unix_timestamp("transaction_date") - 
                                         unix_timestamp("prev_time")))) \
           .filter(col("time_diff") > 60) \
           .drop("prev_time", "time_diff")
    
    return df

def priority_deduplication(df):
    """Keep the best record when duplicates exist"""
    
    # Priority: GOLD > SILVER > BRONZE
    df_with_priority = df.withColumn("priority",
                                   when(col("customer_tier") == "GOLD", 3)
                                   .when(col("customer_tier") == "SILVER", 2)
                                   .when(col("customer_tier") == "BRONZE", 1)
                                   .otherwise(0))
    
    window_priority = Window.partitionBy("transaction_id") \
                           .orderBy(desc("priority"), desc("transaction_date"))
    
    return df_with_priority.withColumn("rank", row_number().over(window_priority)) \
                          .filter(col("rank") == 1) \
                          .drop("priority", "rank")

if __name__ == "__main__":
    # Initialize Spark with performance optimizations
    spark = SparkSession.builder \
        .appName("DeduplicationExample") \
        .master("local[*]") \
        .config("spark.sql.adaptive.enabled", "true") \
        .getOrCreate()
    
    spark.sparkContext.setLogLevel("ERROR")  # Reduce verbose output
    
    # Create sample data
    df = create_sample_data(spark)
    print(f"Original records: {df.count()}")
    
    # Method 1: Exact deduplication (fastest for exact matches)
    exact_result = exact_deduplication(df)
    print(f"After exact deduplication: {exact_result.count()}")
    
    # Method 2: Business logic deduplication
    business_result = business_logic_deduplication(df)
    print(f"After business logic deduplication: {business_result.count()}")
    
    # Method 3: Priority-based deduplication
    priority_result = priority_deduplication(df)
    print(f"After priority deduplication: {priority_result.count()}")
    
    # Show final results
    print("\nFinal deduplicated data:")
    business_result.show(truncate=False)
    
    spark.stop()

Example 2: Advanced Deduplication with Custom Logic

On occasions, you need more sophisticated deduplication logic — perhaps keeping the record with the highest value, the most recent timestamp, using multi-tier priority ranking, or aggregating instead of removing entries. Some of this custom logic will be dependent on your business use case.

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, row_number, desc, when, max as spark_max, sum as spark_sum, count
from pyspark.sql.window import Window
from datetime import datetime

def create_sample_data(spark):
    """Sample customer data with duplicates"""
    data = [
        ("CUST001", "John Smith", 50.00, datetime(2024, 1, 15, 10, 0, 0), "Basic", "ACTIVE"),
        ("CUST001", "John Smith", 150.00, datetime(2024, 1, 15, 11, 0, 0), "Premium", "ACTIVE"),
        ("CUST002", "Jane Doe", 100.00, datetime(2024, 1, 15, 9, 0, 0), "Standard", "SUSPENDED"),
        ("CUST002", "Jane Doe", 100.00, datetime(2024, 1, 15, 11, 0, 0), "Standard", "ACTIVE"),
        ("CUST003", "Bob Wilson", 220.00, datetime(2024, 1, 15, 8, 0, 0), "Basic", "ACTIVE"),
        ("CUST003", "Bob Wilson", 180.00, datetime(2024, 1, 15, 10, 0, 0), "Premium", "ACTIVE"),
    ]
    columns = ["customer_id", "name", "amount", "timestamp", "tier", "status"]
    return spark.createDataFrame(data, columns)

def keep_highest_value(df):
    """Keep record with highest amount per customer"""
    window = Window.partitionBy("customer_id").orderBy(desc("amount"))
    return df.withColumn("rank", row_number().over(window)) \
             .filter(col("rank") == 1).drop("rank")

def keep_most_recent(df):
    """Keep most recent record per customer"""
    window = Window.partitionBy("customer_id").orderBy(desc("timestamp"))
    return df.withColumn("rank", row_number().over(window)) \
             .filter(col("rank") == 1).drop("rank")

def multi_criteria_best(df):
    """Advanced: Keep best record using business priority"""
    df_priority = df.withColumn("tier_score", 
                               when(col("tier") == "Premium", 3)
                               .when(col("tier") == "Standard", 2)
                               .otherwise(1)) \
                    .withColumn("status_score", 
                               when(col("status") == "ACTIVE", 1).otherwise(0))
    
    window = Window.partitionBy("customer_id") \
                  .orderBy(desc("tier_score"), desc("status_score"), 
                          desc("amount"), desc("timestamp"))
    
    return df_priority.withColumn("rank", row_number().over(window)) \
                     .filter(col("rank") == 1) \
                     .drop("tier_score", "status_score", "rank")

def aggregate_duplicates(df):
    """Alternative: Aggregate instead of removing duplicates"""
    return df.groupBy("customer_id", "name") \
             .agg(spark_max("amount").alias("max_amount"),
                  spark_sum("amount").alias("total_amount"),
                  count("*").alias("record_count"),
                  spark_max("timestamp").alias("latest_update"))

if __name__ == "__main__":
    spark = SparkSession.builder.appName("AdvancedDedup").master("local[*]").getOrCreate()
    spark.sparkContext.setLogLevel("ERROR")
    
    df = create_sample_data(spark)
    print(f"Original: {df.count()} records")
    
    # Method 1: Keep highest value
    highest = keep_highest_value(df)
    print(f"Highest value: {highest.count()} records")
    highest.select("customer_id", "name", "amount", "tier").show()
    
    # Method 2: Keep most recent
    recent = keep_most_recent(df)
    print(f"Most recent: {recent.count()} records")
    recent.select("customer_id", "name", "timestamp", "status").show()
    
    # Method 3: Multi-criteria priority
    best = multi_criteria_best(df)
    print(f"Multi-criteria best: {best.count()} records")
    best.show()
    
    # Method 4: Aggregate instead of remove
    agg = aggregate_duplicates(df)
    print(f"Aggregated: {agg.count()} records")
    agg.show()
    
    spark.stop()

For a comprehensive set of deduplication strategies with custom logic, refer to the GitHub Repository. These approaches give you control over which records to keep when duplicates are found, rather than just arbitrarily keeping the first occurrence. For a more advanced version of deduplication, check the code repository.

Conclusion

To recap, data quality issues in large JSON ingestion pipelines are inevitable, but they don’t have to be catastrophic. By implementing these three strategies — schema enforcement, data validation, and custom deduplication — you can identify and address problems early, handling them with greater ease. By no means are these exhaustive, yet they are sufficient and commonly used.

The key is to be proactive rather than reactive. Set up your quality checks at ingestion time, not after your data has already propagated through your entire data lake. These PySpark techniques provide you with the tools to build robust data quality pipelines that can handle the complexities of real-world data.

Remember that data quality is an ongoing process, not a one-time fix. Monitor your validation metrics over time and continuously refine your validation rules as your understanding of the data evolves.

Your downstream consumers will thank you for the clean, reliable data, not least for decreasing the frequency of your 3 AM pager duty calls.

Resources and References

  1. Apache Spark™ 4.0 Documentation Overview — The main documentation hub for Apache Spark™ 4.0
    https://spark.apache.org/docs/latest/
  2. PySpark 4.0 API Documentation — Official Python API documentation for Apache Spark™ 4.0
    https://spark.apache.org/docs/latest/api/python/index.html
  3. Delta Lake Schema Enforcement and Evolution — Comprehensive guide on schema enforcement mechanisms in Delta Lake
    https://www.databricks.com/blog/2019/09/24/diving-into-delta-lake-schema-enforcement-evolution.html
  4. Databricks Schema Enforcement Documentation — Best practices for data quality through schema enforcement
    https://docs.databricks.com/aws/en/tables/schema-enforcement
  5. Introducing Apache Spark™ 4.0 — Overview of key features and improvements in Spark 4.0
    https://www.databricks.com/blog/introducing-apache-spark-40
  6. Great Expectations — Data validation and documentation tool for data quality management
    https://greatexpectations.io/
0 REPLIES 0