<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Handling the Chaos: Data Quality Strategies with PySpark Ingestion in Community Articles</title>
    <link>https://community.databricks.com/t5/community-articles/handling-the-chaos-data-quality-strategies-with-pyspark/m-p/139249#M784</link>
    <description>&lt;P&gt;&lt;FONT size="4"&gt;&lt;STRONG&gt;Tips and Techniques for Ingesting Large JSON files with PySpark&lt;/STRONG&gt;&lt;/FONT&gt;&lt;/P&gt;&lt;H2 id="4edd"&gt;&lt;FONT size="5"&gt;Introduction&lt;/FONT&gt;&lt;/H2&gt;&lt;P class=""&gt;Suppose you’ve ever struggled or grappled with consuming massive JSON files with PySpark. In that case, you are aware that insufficient data can always creep in and silently disrupt your ETL pipeline or pollute your data for downstream consumption.&lt;/P&gt;&lt;P class=""&gt;In this blog post, I’ll walk you through three practical approaches to maintaining data quality during ingestion. We’ll cover how to drop records that don’t match your defined schema, validate that some numeric values are sensible (not just existent), and clean out any duplicate entries before they cause trouble downstream.&lt;/P&gt;&lt;P class=""&gt;&lt;span class="lia-inline-image-display-wrapper lia-image-align-inline" image-alt="Screenshot 2025-11-16 at 10.42.50 AM.png" style="width: 999px;"&gt;&lt;img src="https://community.databricks.com/t5/image/serverpage/image-id/21738iCCC35A05C08D7FEA/image-size/large?v=v2&amp;amp;px=999" role="button" title="Screenshot 2025-11-16 at 10.42.50 AM.png" alt="Screenshot 2025-11-16 at 10.42.50 AM.png" /&gt;&lt;/span&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;&amp;nbsp;&lt;STRONG&gt;Figure 1.&lt;/STRONG&gt;&lt;SPAN&gt;&amp;nbsp;Overview of data quality strategies&lt;/SPAN&gt;&lt;/P&gt;&lt;P class=""&gt;These strategies seem simple yet effective techniques that can save you headaches when working with JSON files. While these three approaches are by no means exhaustive, other aspects of data quality, such as timeliness and consistency, will depend on data sources (real-time streaming, operational databases, etc.). Other external tools, such as&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;A class="" href="https://greatexpectations.io/" target="_blank" rel="noopener ugc nofollow"&gt;Great Expectations&lt;/A&gt;,&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;can also help mitigate these data validation issues.&lt;/P&gt;&lt;P class=""&gt;For the past few weeks, I have been experimenting and cooperating with Claude Code, Cursor, and Goose, as my coding assistant:&lt;SPAN&gt;&amp;nbsp;a&amp;nbsp;&lt;/SPAN&gt;&lt;STRONG&gt;&lt;EM&gt;generation and validation cycle&lt;/EM&gt;&lt;/STRONG&gt;, where I instruct, interact, validate, and reiterate with the AI coding agent for the best approach. Some of the code examples, test cases, and documentation are Cursor generated with my constant interactive approach of revision and validation, keeping AI “on a tight leash.”&lt;/P&gt;&lt;P class=""&gt;&lt;span class="lia-inline-image-display-wrapper lia-image-align-inline" image-alt="Screenshot 2025-11-16 at 10.45.18 AM.png" style="width: 917px;"&gt;&lt;img src="https://community.databricks.com/t5/image/serverpage/image-id/21739i3CC01A99BE3EE27C/image-size/large?v=v2&amp;amp;px=999" role="button" title="Screenshot 2025-11-16 at 10.45.18 AM.png" alt="Screenshot 2025-11-16 at 10.45.18 AM.png" /&gt;&lt;/span&gt;&lt;/P&gt;&lt;P class=""&gt;&lt;STRONG&gt;Figure 2&lt;/STRONG&gt;. Cycle of partial autonomy (source Andre Karpathy)&amp;nbsp; &amp;nbsp; &amp;nbsp;&lt;/P&gt;&lt;P class=""&gt;&lt;SPAN&gt;If you have not used the Cursor coding agent in this manner of cooperative partial autonomy, you ought to try it. &lt;span class="lia-unicode-emoji" title=":robot_face:"&gt;🤖&lt;/span&gt; &lt;span class="lia-unicode-emoji" title=":smiling_face_with_sunglasses:"&gt;😎&lt;/span&gt;. You can explore the cooperative results of those data quality strategies in the&amp;nbsp;&lt;/SPAN&gt;&lt;A class="" href="https://github.com/dmatrix/spark-misc/blob/main/src/py/blogs/README.md" target="_blank" rel="noopener ugc nofollow"&gt;GitHub repository.&lt;/A&gt; &amp;nbsp; &amp;nbsp;&lt;/P&gt;&lt;P class=""&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;&lt;span class="lia-inline-image-display-wrapper lia-image-align-inline" image-alt="Screenshot 2025-11-16 at 10.47.12 AM.png" style="width: 848px;"&gt;&lt;img src="https://community.databricks.com/t5/image/serverpage/image-id/21740i0E8A74E936C42940/image-size/large?v=v2&amp;amp;px=999" role="button" title="Screenshot 2025-11-16 at 10.47.12 AM.png" alt="Screenshot 2025-11-16 at 10.47.12 AM.png" /&gt;&lt;/span&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;&lt;STRONG&gt;Figure 3.&lt;/STRONG&gt;&lt;SPAN&gt;&amp;nbsp;PySpark files that implement the recommended strategy&lt;/SPAN&gt; &amp;nbsp;&lt;/P&gt;&lt;P class=""&gt;&amp;nbsp; &amp;nbsp;&lt;/P&gt;&lt;H2 id="efbc"&gt;&lt;FONT size="5"&gt;Data Ingestion with Schema Enforcement&lt;/FONT&gt;&lt;/H2&gt;&lt;P class=""&gt;Consider schema enforcement. When working with large JSON files, one of the most critical steps is ensuring your data conforms to an expected schema before it pollutes your downstream processes. Schema enforcement in PySpark 4.0 DataFrame API gives you the power to catch malformed data early and handle it gracefully.&lt;/P&gt;&lt;P class=""&gt;The key is being proactive rather than reactive. Instead of discovering schema mismatches after your ETL job has been running for hours, you can catch these issues upfront and decide how to handle them.&lt;/P&gt;&lt;H3 id="ce36"&gt;&lt;FONT size="5"&gt;Example 1: Strict Schema Enforcement with Error Handling&lt;/FONT&gt;&lt;/H3&gt;&lt;P class=""&gt;Here’s how you can enforce a strict schema and capture any records that don’t comply. This strategy is a two-step process: FAILFAST for clean data; fallback to PERMISSIVE mode.&lt;/P&gt;&lt;LI-CODE lang="python"&gt;from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
from pyspark.sql.functions import col

# Define schema once at module level for efficiency
TRANSACTION_SCHEMA = StructType([
    StructField("user_id", StringType(), nullable=False),
    StructField("transaction_amount", DoubleType(), nullable=False),
    StructField("transaction_date", TimestampType(), nullable=True),
    StructField("merchant_category", StringType(), nullable=True),
    StructField("customer_tier", StringType(), nullable=True)
])

def smart_json_load(spark, file_path):
   """
   Smart JSON Loading: Try strict first, fallback to permissive.
   Best of both worlds:
   - Fast processing for clean data (FAILFAST)
   - Graceful recovery for messy data (PERMISSIVE)
   """
  
   print(f"📋 Smart loading {file_path}...")
  
   # Step 1: Try FAILFAST for clean data
   try:
       print("🎯 Attempting FAILFAST mode...")
       df = spark.read.option("mode", "FAILFAST").schema(TRANSACTION_SCHEMA).json(file_path)
       count = df.count()
      
       # Force full evaluation to catch parsing errors - need to process all data
       try:
           df.collect()  # This forces processing of ALL records, not just the first
           print(f"✅ FAILFAST succeeded: {count} clean records")
           return df, None
       except Exception as eval_error:
           # This catches parsing errors that occur during data access
           raise eval_error
      
   except Exception:
       print("⚠️  FAILFAST failed, switching to PERMISSIVE mode...")
      
       # Step 2: Fallback to PERMISSIVE
       schema_with_corrupt = schema.add(StructField("_corrupt_record", StringType(), True))
       df = spark.read.option("mode", "PERMISSIVE").schema(schema_with_corrupt).json(file_path)
      
       # Cache to avoid Spark 4.0 corrupt record query restrictions
       df = df.cache()
       total_records = df.count()
      
       # Separate good from bad
       good_df = df.filter(col("_corrupt_record").isNull() &amp;amp; col("user_id").isNotNull()).drop("_corrupt_record")
       bad_df = df.filter(col("_corrupt_record").isNotNull() | col("user_id").isNull())
       good_count = good_df.count()
       bad_count = bad_df.count()
       print(f"🔄 PERMISSIVE recovery: {good_count} good, {bad_count} bad records")
       return good_df, bad_df


if __name__ == "__main__":
   spark = SparkSession.builder.appName("SmartJSONLoader").master("local[*]").getOrCreate()
  
   # Test with different data files
   test_files = [
       "data/clean_transactions.json",
       "data/mixed_transactions.json",
       "data/invalid_transactions.json"
   ]
  
   for file_path in test_files:
       print(f"\n{'='*60}")
       print(f"🧪 TESTING: {file_path}")
       print(f"{'='*60}")
             good_data, bad_data = smart_json_load(spark, file_path)
      
       if bad_data is None:
           print(f"🎯 Perfect! Clean data processed with FAILFAST")
       else:
           print(f"🛡️  Recovered data with PERMISSIVE fallback")
   spark.stop()&lt;/LI-CODE&gt;&lt;P class=""&gt;&lt;FONT size="5"&gt;Example 2: Permissive Mode with Corrupt Record Tracking&lt;/FONT&gt;&lt;/P&gt;&lt;P class=""&gt;Sometimes you want to ingest what you can and track what fails. This example demonstrates&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;STRONG&gt;PERMISSIVE&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/STRONG&gt;mode in PySpark 4.0, which enables the ingestion of data that can be parsed while tracking problematic records for later analysis and investigation. Ideal for exploratory data analysis and gradual improvement of data quality.&lt;/P&gt;&lt;LI-CODE lang="python"&gt;from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
from pyspark.sql.functions import col

# Define schema once at module level for efficiency
PERMISSIVE_SCHEMA = StructType([
    StructField("user_id", StringType(), nullable=True),
    StructField("transaction_amount", DoubleType(), nullable=True),
    StructField("transaction_date", TimestampType(), nullable=True),
    StructField("merchant_category", StringType(), nullable=True),
    StructField("customer_tier", StringType(), nullable=True),
    StructField("_corrupt_record", StringType(), nullable=True)  # Captures bad records
])

def load_with_permissive_mode(spark, file_path):
   """
   Example 2: Permissive Mode with Corrupt Record Tracking
  
   Uses PERMISSIVE mode - never fails, captures bad records in _corrupt_record column.
   Perfect for messy data where you want to save what you can.
   """
  
   print(f"📋 Loading {file_path} with permissive mode...")
  
   # PERMISSIVE mode - never fails, captures corruption
   df = spark.read.option("mode", "PERMISSIVE").schema(PERMISSIVE_SCHEMA).json(file_path)
  
   total_records = df.count()
  
   # Separate good from bad records
   good_records = df.filter(col("_corrupt_record").isNull() &amp;amp; col("user_id").isNotNull())
   bad_records = df.filter(col("_corrupt_record").isNotNull() | col("user_id").isNull())
  
   good_count = good_records.count()
   bad_count = bad_records.count()
   success_rate = (good_count / total_records * 100) if total_records &amp;gt; 0 else 0
  
   print(f"📊 Results: {good_count}/{total_records} good records ({success_rate:.1f}% success)")
  
   if good_count &amp;gt; 0:
       print("✅ Sample good records:")
       good_records.drop("_corrupt_record").show(2, truncate=False)
  
   if bad_count &amp;gt; 0:
       print("❌ Sample bad records:")
       bad_records.select("_corrupt_record").show(2, truncate=False)
  
   return good_records, bad_records


if __name__ == "__main__":
   spark = SparkSession.builder.appName("PermissiveMode").master("local[*]").getOrCreate()
  
   # Test with different data files
   test_files = [
       "data/clean_transactions.json",
       "data/mixed_transactions.json",
       "data/invalid_transactions.json"
   ]
  
   for file_path in test_files:
       print(f"\n{'='*60}")
       print(f"🧪 TESTING: {file_path}")
       print(f"{'='*60}")
      
       good_data, bad_data = load_with_permissive_mode(spark, file_path)
       print(f"✅ PERMISSIVE always succeeds - captured {good_data.count()} good, {bad_data.count()} bad records")
  
   spark.stop()&lt;/LI-CODE&gt;&lt;P class=""&gt;&amp;nbsp;&lt;SPAN&gt;The beauty of this approach is that you can continue processing with the clean data while simultaneously capturing the problematic records for later analysis and review. This provides visibility into data quality issues without interrupting your entire pipeline. Code and test runs for the above examples are in this&amp;nbsp;&lt;/SPAN&gt;&lt;A class="" href="https://github.com/dmatrix/spark-misc/tree/main/src/py/blogs" target="_blank" rel="noopener ugc nofollow"&gt;GitHub Repo&lt;/A&gt;&lt;SPAN&gt;.&lt;/SPAN&gt;&lt;/P&gt;&lt;H2 id="27ef"&gt;&lt;FONT size="5"&gt;Validating Data for Accuracy&lt;/FONT&gt;&lt;/H2&gt;&lt;P class=""&gt;The second approach is validating for data accuracy. Schema enforcement gets you part of the way there, but just because a field exists and has the correct data type doesn’t mean the data makes business sense. In this scenario, data validation ensures your transaction amounts fall within realistic ranges and your categorical values are from your approved business domains.&lt;/P&gt;&lt;P class=""&gt;Think of this as your business logic validation layer. You’re not just checking that a&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;EM&gt;transaction_amount&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/EM&gt;field is a double; you’re checking that it’s a reasonable amount for a financial transaction. You’re not just verifying that&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;EM&gt;merchant_category&lt;/EM&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;is a string; you’re ensuring it’s one of your supported merchant types like “grocery”, “restaurant”, or “retail” — not “currency”, which your payment system doesn’t handle.&lt;/P&gt;&lt;P class=""&gt;Similarly, you’re not just confirming that&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;EM&gt;customer_tier&lt;/EM&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;contains text data; you’re validating it matches your actual tier system (“bronze”, “silver”, “gold”, “platinum”) rather than made-up values like “diamond” that don’t exist in your business model. This validation layer catches data that’s technically valid but business-invalid:&lt;/P&gt;&lt;UL class=""&gt;&lt;LI&gt;&lt;span class="lia-unicode-emoji" title=":cross_mark:"&gt;❌&lt;/span&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;EM&gt;transaction_amount:&lt;/EM&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;-10.50 → Negative transactions don’t make sense&lt;/LI&gt;&lt;LI&gt;&lt;span class="lia-unicode-emoji" title=":cross_mark:"&gt;❌&lt;/span&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;EM&gt;transaction_amount&lt;/EM&gt;: 15000.00 → Exceeds your transaction limits&lt;/LI&gt;&lt;LI&gt;&lt;span class="lia-unicode-emoji" title=":cross_mark:"&gt;❌&lt;/span&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;EM&gt;merchant_category&lt;/EM&gt;: “currency” → Not in your supported merchant types&lt;/LI&gt;&lt;LI&gt;&lt;span class="lia-unicode-emoji" title=":cross_mark:"&gt;❌&lt;/span&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;EM&gt;customer_tier&lt;/EM&gt;: “diamond” → Not in your tier system&lt;/LI&gt;&lt;/UL&gt;&lt;P class=""&gt;Schema validation ensures data structure integrity; business logic validation ensures that the data makes sense within your business context.&lt;/P&gt;&lt;H3 id="e99f"&gt;&lt;FONT size="5"&gt;Example 1: Range and Business Logic Validation&lt;/FONT&gt;&lt;/H3&gt;&lt;LI-CODE lang="python"&gt;from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when
from pyspark.sql.types import StructType, StructField, StringType, DoubleType


def validate_transaction_business_rules(df):
   """
   Business Logic Validation - ensuring data makes business sense and it goes beyond schema validation to check ranges, categories, and business constraints.
   """
 # Define business rules
   VALID_MERCHANTS = ["grocery", "restaurant", "gas_station", "retail", "entertainment", "other"]
   VALID_TIERS = ["bronze", "silver", "gold", "platinum"]
   MIN_AMOUNT = 0.01
   MAX_AMOUNT = 10000.0
  
   # Apply validation rules
   validated_df = df.withColumn(
       "validation_status",
       when(col("transaction_amount") &amp;lt; MIN_AMOUNT, "negative_or_zero_amount")
       .when(col("transaction_amount") &amp;gt; MAX_AMOUNT, "excessive_amount")
       .when(~col("merchant_category").isin(VALID_MERCHANTS), "invalid_merchant_category")
       .when(~col("customer_tier").isin(VALID_TIERS), "invalid_customer_tier")
       .otherwise("valid")
   )
  
   # Separate valid from invalid data
   valid_df = validated_df.filter(col("validation_status") == "valid").drop("validation_status")
   invalid_df = validated_df.filter(col("validation_status") != "valid")
  
   # Show validation summary
   total = validated_df.count()
   valid_count = valid_df.count()
   invalid_count = invalid_df.count()
   success_rate = (valid_count / total * 100) if total &amp;gt; 0 else 0
  
   print(f"📊 Business Logic Validation: {valid_count}/{total} valid ({success_rate:.1f}%)")
  
   if invalid_count &amp;gt; 0:
       print("🔍 Validation issues:")
       validated_df.groupBy("validation_status").count().show()
  
   return valid_df, invalid_df


if __name__ == "__main__":
   # Test the validation function
   spark = SparkSession.builder.appName("BusinessLogicValidation").master("local[*]").getOrCreate()
  
   # Create test data with various business rule violations
   test_data = [
       # Valid records
       ("user_001", 45.99, "grocery", "gold"),
       ("user_002", 125.50, "restaurant", "silver"),
       ("user_003", 89.99, "retail", "bronze"),
      
       # Invalid records
       ("user_004", -10.50, "grocery", "gold"),           # Negative amount
       ("user_005", 15000.00, "restaurant", "silver"),    # Excessive amount
       ("user_006", 50.00, "currency", "gold"),           # Invalid merchant
       ("user_007", 75.25, "grocery", "diamond"),         # Invalid tier
       ("user_008", 0.00, "gas_station", "platinum"),     # Zero amount
   ]
  
   schema = StructType([
       StructField("user_id", StringType(), False),
       StructField("transaction_amount", DoubleType(), False),
       StructField("merchant_category", StringType(), False),
       StructField("customer_tier", StringType(), False)
   ])
  
   test_df = spark.createDataFrame(test_data, schema)
  
   print("🧪 Testing Business Logic Validation")
   print("=" * 50)
  
   # Apply validation
   valid_data, invalid_data = validate_transaction_business_rules(test_df)
  
   # Verify results
   if valid_data.count() == 3 and invalid_data.count() == 5:
       print("✅ Test PASSED: 3 valid, 5 invalid records as expected")
   else:
       print(f"❌ Test FAILED: Expected 3 valid, 5 invalid. Got {valid_data.count()} valid, {invalid_data.count()} invalid")
   spark.stop()&lt;/LI-CODE&gt;&lt;P class=""&gt;The code example to validate business logic and data validation, and its relevant test run for the above example, is in this&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;A class="" href="https://github.com/dmatrix/spark-misc/tree/main/src/py/blogs" target="_blank" rel="noopener ugc nofollow"&gt;GitHub Repo&lt;/A&gt;.&lt;/P&gt;&lt;P class=""&gt;In addition to schema and business logic validation and data consistency, another common source of data chaos is data duplication.&lt;/P&gt;&lt;H2 id="846e"&gt;Dropping Data Duplicates&lt;/H2&gt;&lt;P class=""&gt;The final approach is to consider duplicates. Duplicate records can skew your analytics and reporting. They can inflate metrics, create incorrect aggregations, and lead to poor business decisions. PySpark DataFrame API provides powerful deduplication capabilities that go beyond simple row-by-row comparisons.&lt;/P&gt;&lt;P class=""&gt;The key insight here is that duplicates aren’t always exact row matches. Sometimes, you need to define business logic around what constitutes a duplicate — perhaps it’s the same transaction ID, or the same customer making the same purchase within a specific time window.&lt;/P&gt;&lt;P class=""&gt;Let’s examine the code for both scenarios: simple exact matches and business logic that dictates duplicates.&lt;/P&gt;&lt;H3 id="9bf9"&gt;Example 1: Simple Deduplication with Performance Optimization&lt;/H3&gt;&lt;LI-CODE lang="python"&gt;from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
from pyspark.sql.functions import col, row_number, desc, when, lag, unix_timestamp, abs as spark_abs
from pyspark.sql.window import Window
from datetime import datetime

def create_sample_data(spark):
    """Create sample data with various duplicate patterns"""
    data = [
        # Exact duplicates
        ("TXN001", "USER123", 99.99, datetime(2024, 1, 15, 10, 30, 0), "RETAIL", "GOLD"),
        ("TXN001", "USER123", 99.99, datetime(2024, 1, 15, 10, 30, 0), "RETAIL", "GOLD"),
        
        # Same transaction_id, different details (system error)
        ("TXN002", "USER456", 150.00, datetime(2024, 1, 15, 11, 0, 0), "GROCERY", "SILVER"),
        ("TXN002", "USER789", 200.00, datetime(2024, 1, 15, 11, 5, 0), "RETAIL", "BRONZE"),
        
        # Same user + amount within 30 seconds (double-click)
        ("TXN003", "USER123", 49.99, datetime(2024, 1, 15, 12, 0, 0), "ONLINE", "GOLD"),
        ("TXN004", "USER123", 49.99, datetime(2024, 1, 15, 12, 0, 30), "ONLINE", "GOLD"),
        
        # Unique records
        ("TXN005", "USER456", 75.50, datetime(2024, 1, 15, 13, 0, 0), "RESTAURANT", "SILVER"),
        ("TXN006", "USER789", 120.00, datetime(2024, 1, 15, 14, 0, 0), "GAS", "BRONZE"),
    ]
    
    schema = StructType([
        StructField("transaction_id", StringType(), False),
        StructField("user_id", StringType(), False),
        StructField("transaction_amount", DoubleType(), False),
        StructField("transaction_date", TimestampType(), False),
        StructField("merchant_category", StringType(), True),
        StructField("customer_tier", StringType(), True)
    ])
    
    return spark.createDataFrame(data, schema)

def exact_deduplication(df):
    """Remove exact duplicate rows - fastest method"""
    return df.distinct()

def business_logic_deduplication(df):
    """Remove duplicates based on business rules"""
    
    # Rule 1: Same transaction_id = duplicate (keep first occurrence)
    window_txn = Window.partitionBy("transaction_id").orderBy("transaction_date")
    df = df.withColumn("txn_rank", row_number().over(window_txn)) \
           .filter(col("txn_rank") == 1) \
           .drop("txn_rank")
    
    # Rule 2: Same user + amount within 60 seconds = double-click (keep first)
    window_user = Window.partitionBy("user_id", "transaction_amount").orderBy("transaction_date")
    
    df = df.withColumn("prev_time", lag("transaction_date").over(window_user)) \
           .withColumn("time_diff", 
                      when(col("prev_time").isNull(), 999999)
                      .otherwise(spark_abs(unix_timestamp("transaction_date") - 
                                         unix_timestamp("prev_time")))) \
           .filter(col("time_diff") &amp;gt; 60) \
           .drop("prev_time", "time_diff")
    
    return df

def priority_deduplication(df):
    """Keep the best record when duplicates exist"""
    
    # Priority: GOLD &amp;gt; SILVER &amp;gt; BRONZE
    df_with_priority = df.withColumn("priority",
                                   when(col("customer_tier") == "GOLD", 3)
                                   .when(col("customer_tier") == "SILVER", 2)
                                   .when(col("customer_tier") == "BRONZE", 1)
                                   .otherwise(0))
    
    window_priority = Window.partitionBy("transaction_id") \
                           .orderBy(desc("priority"), desc("transaction_date"))
    
    return df_with_priority.withColumn("rank", row_number().over(window_priority)) \
                          .filter(col("rank") == 1) \
                          .drop("priority", "rank")

if __name__ == "__main__":
    # Initialize Spark with performance optimizations
    spark = SparkSession.builder \
        .appName("DeduplicationExample") \
        .master("local[*]") \
        .config("spark.sql.adaptive.enabled", "true") \
        .getOrCreate()
    
    spark.sparkContext.setLogLevel("ERROR")  # Reduce verbose output
    
    # Create sample data
    df = create_sample_data(spark)
    print(f"Original records: {df.count()}")
    
    # Method 1: Exact deduplication (fastest for exact matches)
    exact_result = exact_deduplication(df)
    print(f"After exact deduplication: {exact_result.count()}")
    
    # Method 2: Business logic deduplication
    business_result = business_logic_deduplication(df)
    print(f"After business logic deduplication: {business_result.count()}")
    
    # Method 3: Priority-based deduplication
    priority_result = priority_deduplication(df)
    print(f"After priority deduplication: {priority_result.count()}")
    
    # Show final results
    print("\nFinal deduplicated data:")
    business_result.show(truncate=False)
    
    spark.stop()&lt;/LI-CODE&gt;&lt;H3 id="14c8"&gt;Example 2: Advanced Deduplication with Custom Logic&lt;/H3&gt;&lt;P class=""&gt;On occasions, you need more sophisticated deduplication logic — perhaps keeping the record with the highest value, the most recent timestamp, using multi-tier priority ranking, or aggregating instead of removing entries. Some of this custom logic will be dependent on your business use case.&lt;/P&gt;&lt;LI-CODE lang="python"&gt;from pyspark.sql import SparkSession
from pyspark.sql.functions import col, row_number, desc, when, max as spark_max, sum as spark_sum, count
from pyspark.sql.window import Window
from datetime import datetime

def create_sample_data(spark):
    """Sample customer data with duplicates"""
    data = [
        ("CUST001", "John Smith", 50.00, datetime(2024, 1, 15, 10, 0, 0), "Basic", "ACTIVE"),
        ("CUST001", "John Smith", 150.00, datetime(2024, 1, 15, 11, 0, 0), "Premium", "ACTIVE"),
        ("CUST002", "Jane Doe", 100.00, datetime(2024, 1, 15, 9, 0, 0), "Standard", "SUSPENDED"),
        ("CUST002", "Jane Doe", 100.00, datetime(2024, 1, 15, 11, 0, 0), "Standard", "ACTIVE"),
        ("CUST003", "Bob Wilson", 220.00, datetime(2024, 1, 15, 8, 0, 0), "Basic", "ACTIVE"),
        ("CUST003", "Bob Wilson", 180.00, datetime(2024, 1, 15, 10, 0, 0), "Premium", "ACTIVE"),
    ]
    columns = ["customer_id", "name", "amount", "timestamp", "tier", "status"]
    return spark.createDataFrame(data, columns)

def keep_highest_value(df):
    """Keep record with highest amount per customer"""
    window = Window.partitionBy("customer_id").orderBy(desc("amount"))
    return df.withColumn("rank", row_number().over(window)) \
             .filter(col("rank") == 1).drop("rank")

def keep_most_recent(df):
    """Keep most recent record per customer"""
    window = Window.partitionBy("customer_id").orderBy(desc("timestamp"))
    return df.withColumn("rank", row_number().over(window)) \
             .filter(col("rank") == 1).drop("rank")

def multi_criteria_best(df):
    """Advanced: Keep best record using business priority"""
    df_priority = df.withColumn("tier_score", 
                               when(col("tier") == "Premium", 3)
                               .when(col("tier") == "Standard", 2)
                               .otherwise(1)) \
                    .withColumn("status_score", 
                               when(col("status") == "ACTIVE", 1).otherwise(0))
    
    window = Window.partitionBy("customer_id") \
                  .orderBy(desc("tier_score"), desc("status_score"), 
                          desc("amount"), desc("timestamp"))
    
    return df_priority.withColumn("rank", row_number().over(window)) \
                     .filter(col("rank") == 1) \
                     .drop("tier_score", "status_score", "rank")

def aggregate_duplicates(df):
    """Alternative: Aggregate instead of removing duplicates"""
    return df.groupBy("customer_id", "name") \
             .agg(spark_max("amount").alias("max_amount"),
                  spark_sum("amount").alias("total_amount"),
                  count("*").alias("record_count"),
                  spark_max("timestamp").alias("latest_update"))

if __name__ == "__main__":
    spark = SparkSession.builder.appName("AdvancedDedup").master("local[*]").getOrCreate()
    spark.sparkContext.setLogLevel("ERROR")
    
    df = create_sample_data(spark)
    print(f"Original: {df.count()} records")
    
    # Method 1: Keep highest value
    highest = keep_highest_value(df)
    print(f"Highest value: {highest.count()} records")
    highest.select("customer_id", "name", "amount", "tier").show()
    
    # Method 2: Keep most recent
    recent = keep_most_recent(df)
    print(f"Most recent: {recent.count()} records")
    recent.select("customer_id", "name", "timestamp", "status").show()
    
    # Method 3: Multi-criteria priority
    best = multi_criteria_best(df)
    print(f"Multi-criteria best: {best.count()} records")
    best.show()
    
    # Method 4: Aggregate instead of remove
    agg = aggregate_duplicates(df)
    print(f"Aggregated: {agg.count()} records")
    agg.show()
    
    spark.stop()&lt;/LI-CODE&gt;&lt;P&gt;&lt;SPAN&gt;For a comprehensive set of deduplication strategies with custom logic, refer to the&amp;nbsp;&lt;/SPAN&gt;&lt;A class="" href="https://github.com/dmatrix/spark-misc/blob/main/src/py/blogs/README.md" target="_blank" rel="noopener ugc nofollow"&gt;GitHub&amp;nbsp;&lt;/A&gt;&lt;SPAN&gt;&lt;A href="https://github.com/dmatrix/spark-misc/blob/main/src/py/blogs/README.md" target="_blank" rel="noopener"&gt;Rep&lt;/A&gt;ository&lt;/SPAN&gt;&lt;SPAN&gt;. These approaches give you control over which records to keep when duplicates are found, rather than just arbitrarily keeping the first occurrence. For a more advanced version of deduplication, check the code repository.&lt;/SPAN&gt;&lt;/P&gt;&lt;H2 id="6439"&gt;Conclusion&lt;/H2&gt;&lt;P class=""&gt;To recap, data quality issues in large JSON ingestion pipelines are inevitable, but they don’t have to be catastrophic. By implementing these three strategies — schema enforcement, data validation, and custom deduplication — you can identify and address problems early, handling them with greater ease. By no means are these exhaustive, yet they are sufficient and commonly used.&lt;/P&gt;&lt;P class=""&gt;The key is to be proactive rather than reactive. Set up your quality checks at ingestion time, not after your data has already propagated through your entire data lake. These PySpark techniques provide you with the tools to build robust data quality pipelines that can handle the complexities of real-world data.&lt;/P&gt;&lt;P class=""&gt;Remember that data quality is an ongoing process, not a one-time fix. Monitor your validation metrics over time and continuously refine your validation rules as your understanding of the data evolves.&lt;/P&gt;&lt;P class=""&gt;Your downstream consumers will thank you for the clean, reliable data, not least for decreasing the frequency of your 3 AM pager duty calls.&lt;/P&gt;&lt;H2 id="5480"&gt;&lt;FONT size="5"&gt;Resources and References&lt;/FONT&gt;&lt;/H2&gt;&lt;OL class=""&gt;&lt;LI&gt;Apache Spark™ 4.0 Documentation Overview — The main documentation hub for Apache Spark™ 4.0&lt;BR /&gt;&lt;A class="" href="https://spark.apache.org/docs/latest/" target="_blank" rel="noopener ugc nofollow"&gt;https://spark.apache.org/docs/latest/&lt;/A&gt;&lt;/LI&gt;&lt;LI&gt;PySpark 4.0 API Documentation — Official Python API documentation for Apache Spark™ 4.0&lt;BR /&gt;&lt;A class="" href="https://spark.apache.org/docs/latest/api/python/index.html" target="_blank" rel="noopener ugc nofollow"&gt;https://spark.apache.org/docs/latest/api/python/index.html&lt;/A&gt;&lt;/LI&gt;&lt;LI&gt;Delta Lake Schema Enforcement and Evolution — Comprehensive guide on schema enforcement mechanisms in Delta Lake&lt;BR /&gt;&lt;A class="" href="https://www.databricks.com/blog/2019/09/24/diving-into-delta-lake-schema-enforcement-evolution.html" target="_blank" rel="noopener ugc nofollow"&gt;https://www.databricks.com/blog/2019/09/24/diving-into-delta-lake-schema-enforcement-evolution.html&lt;/A&gt;&lt;/LI&gt;&lt;LI&gt;Databricks Schema Enforcement Documentation — Best practices for data quality through schema enforcement&lt;BR /&gt;&lt;A class="" href="https://docs.databricks.com/aws/en/tables/schema-enforcement" target="_blank" rel="noopener ugc nofollow"&gt;https://docs.databricks.com/aws/en/tables/schema-enforcement&lt;/A&gt;&lt;/LI&gt;&lt;LI&gt;Introducing Apache Spark™ 4.0 — Overview of key features and improvements in Spark 4.0&lt;BR /&gt;&lt;A class="" href="https://www.databricks.com/blog/introducing-apache-spark-40" target="_blank" rel="noopener ugc nofollow"&gt;https://www.databricks.com/blog/introducing-apache-spark-40&lt;/A&gt;&lt;/LI&gt;&lt;LI&gt;Great Expectations — Data validation and documentation tool for data quality management&lt;BR /&gt;&lt;A class="" href="https://greatexpectations.io/" target="_blank" rel="noopener ugc nofollow"&gt;https://greatexpectations.io/&lt;/A&gt;&lt;/LI&gt;&lt;/OL&gt;</description>
    <pubDate>Sun, 16 Nov 2025 19:21:45 GMT</pubDate>
    <dc:creator>dmatrixjsd</dc:creator>
    <dc:date>2025-11-16T19:21:45Z</dc:date>
    <item>
      <title>Handling the Chaos: Data Quality Strategies with PySpark Ingestion</title>
      <link>https://community.databricks.com/t5/community-articles/handling-the-chaos-data-quality-strategies-with-pyspark/m-p/139249#M784</link>
      <description>&lt;P&gt;&lt;FONT size="4"&gt;&lt;STRONG&gt;Tips and Techniques for Ingesting Large JSON files with PySpark&lt;/STRONG&gt;&lt;/FONT&gt;&lt;/P&gt;&lt;H2 id="4edd"&gt;&lt;FONT size="5"&gt;Introduction&lt;/FONT&gt;&lt;/H2&gt;&lt;P class=""&gt;Suppose you’ve ever struggled or grappled with consuming massive JSON files with PySpark. In that case, you are aware that insufficient data can always creep in and silently disrupt your ETL pipeline or pollute your data for downstream consumption.&lt;/P&gt;&lt;P class=""&gt;In this blog post, I’ll walk you through three practical approaches to maintaining data quality during ingestion. We’ll cover how to drop records that don’t match your defined schema, validate that some numeric values are sensible (not just existent), and clean out any duplicate entries before they cause trouble downstream.&lt;/P&gt;&lt;P class=""&gt;&lt;span class="lia-inline-image-display-wrapper lia-image-align-inline" image-alt="Screenshot 2025-11-16 at 10.42.50 AM.png" style="width: 999px;"&gt;&lt;img src="https://community.databricks.com/t5/image/serverpage/image-id/21738iCCC35A05C08D7FEA/image-size/large?v=v2&amp;amp;px=999" role="button" title="Screenshot 2025-11-16 at 10.42.50 AM.png" alt="Screenshot 2025-11-16 at 10.42.50 AM.png" /&gt;&lt;/span&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;&amp;nbsp;&lt;STRONG&gt;Figure 1.&lt;/STRONG&gt;&lt;SPAN&gt;&amp;nbsp;Overview of data quality strategies&lt;/SPAN&gt;&lt;/P&gt;&lt;P class=""&gt;These strategies seem simple yet effective techniques that can save you headaches when working with JSON files. While these three approaches are by no means exhaustive, other aspects of data quality, such as timeliness and consistency, will depend on data sources (real-time streaming, operational databases, etc.). Other external tools, such as&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;A class="" href="https://greatexpectations.io/" target="_blank" rel="noopener ugc nofollow"&gt;Great Expectations&lt;/A&gt;,&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;can also help mitigate these data validation issues.&lt;/P&gt;&lt;P class=""&gt;For the past few weeks, I have been experimenting and cooperating with Claude Code, Cursor, and Goose, as my coding assistant:&lt;SPAN&gt;&amp;nbsp;a&amp;nbsp;&lt;/SPAN&gt;&lt;STRONG&gt;&lt;EM&gt;generation and validation cycle&lt;/EM&gt;&lt;/STRONG&gt;, where I instruct, interact, validate, and reiterate with the AI coding agent for the best approach. Some of the code examples, test cases, and documentation are Cursor generated with my constant interactive approach of revision and validation, keeping AI “on a tight leash.”&lt;/P&gt;&lt;P class=""&gt;&lt;span class="lia-inline-image-display-wrapper lia-image-align-inline" image-alt="Screenshot 2025-11-16 at 10.45.18 AM.png" style="width: 917px;"&gt;&lt;img src="https://community.databricks.com/t5/image/serverpage/image-id/21739i3CC01A99BE3EE27C/image-size/large?v=v2&amp;amp;px=999" role="button" title="Screenshot 2025-11-16 at 10.45.18 AM.png" alt="Screenshot 2025-11-16 at 10.45.18 AM.png" /&gt;&lt;/span&gt;&lt;/P&gt;&lt;P class=""&gt;&lt;STRONG&gt;Figure 2&lt;/STRONG&gt;. Cycle of partial autonomy (source Andre Karpathy)&amp;nbsp; &amp;nbsp; &amp;nbsp;&lt;/P&gt;&lt;P class=""&gt;&lt;SPAN&gt;If you have not used the Cursor coding agent in this manner of cooperative partial autonomy, you ought to try it. &lt;span class="lia-unicode-emoji" title=":robot_face:"&gt;🤖&lt;/span&gt; &lt;span class="lia-unicode-emoji" title=":smiling_face_with_sunglasses:"&gt;😎&lt;/span&gt;. You can explore the cooperative results of those data quality strategies in the&amp;nbsp;&lt;/SPAN&gt;&lt;A class="" href="https://github.com/dmatrix/spark-misc/blob/main/src/py/blogs/README.md" target="_blank" rel="noopener ugc nofollow"&gt;GitHub repository.&lt;/A&gt; &amp;nbsp; &amp;nbsp;&lt;/P&gt;&lt;P class=""&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;&lt;span class="lia-inline-image-display-wrapper lia-image-align-inline" image-alt="Screenshot 2025-11-16 at 10.47.12 AM.png" style="width: 848px;"&gt;&lt;img src="https://community.databricks.com/t5/image/serverpage/image-id/21740i0E8A74E936C42940/image-size/large?v=v2&amp;amp;px=999" role="button" title="Screenshot 2025-11-16 at 10.47.12 AM.png" alt="Screenshot 2025-11-16 at 10.47.12 AM.png" /&gt;&lt;/span&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;&lt;STRONG&gt;Figure 3.&lt;/STRONG&gt;&lt;SPAN&gt;&amp;nbsp;PySpark files that implement the recommended strategy&lt;/SPAN&gt; &amp;nbsp;&lt;/P&gt;&lt;P class=""&gt;&amp;nbsp; &amp;nbsp;&lt;/P&gt;&lt;H2 id="efbc"&gt;&lt;FONT size="5"&gt;Data Ingestion with Schema Enforcement&lt;/FONT&gt;&lt;/H2&gt;&lt;P class=""&gt;Consider schema enforcement. When working with large JSON files, one of the most critical steps is ensuring your data conforms to an expected schema before it pollutes your downstream processes. Schema enforcement in PySpark 4.0 DataFrame API gives you the power to catch malformed data early and handle it gracefully.&lt;/P&gt;&lt;P class=""&gt;The key is being proactive rather than reactive. Instead of discovering schema mismatches after your ETL job has been running for hours, you can catch these issues upfront and decide how to handle them.&lt;/P&gt;&lt;H3 id="ce36"&gt;&lt;FONT size="5"&gt;Example 1: Strict Schema Enforcement with Error Handling&lt;/FONT&gt;&lt;/H3&gt;&lt;P class=""&gt;Here’s how you can enforce a strict schema and capture any records that don’t comply. This strategy is a two-step process: FAILFAST for clean data; fallback to PERMISSIVE mode.&lt;/P&gt;&lt;LI-CODE lang="python"&gt;from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
from pyspark.sql.functions import col

# Define schema once at module level for efficiency
TRANSACTION_SCHEMA = StructType([
    StructField("user_id", StringType(), nullable=False),
    StructField("transaction_amount", DoubleType(), nullable=False),
    StructField("transaction_date", TimestampType(), nullable=True),
    StructField("merchant_category", StringType(), nullable=True),
    StructField("customer_tier", StringType(), nullable=True)
])

def smart_json_load(spark, file_path):
   """
   Smart JSON Loading: Try strict first, fallback to permissive.
   Best of both worlds:
   - Fast processing for clean data (FAILFAST)
   - Graceful recovery for messy data (PERMISSIVE)
   """
  
   print(f"📋 Smart loading {file_path}...")
  
   # Step 1: Try FAILFAST for clean data
   try:
       print("🎯 Attempting FAILFAST mode...")
       df = spark.read.option("mode", "FAILFAST").schema(TRANSACTION_SCHEMA).json(file_path)
       count = df.count()
      
       # Force full evaluation to catch parsing errors - need to process all data
       try:
           df.collect()  # This forces processing of ALL records, not just the first
           print(f"✅ FAILFAST succeeded: {count} clean records")
           return df, None
       except Exception as eval_error:
           # This catches parsing errors that occur during data access
           raise eval_error
      
   except Exception:
       print("⚠️  FAILFAST failed, switching to PERMISSIVE mode...")
      
       # Step 2: Fallback to PERMISSIVE
       schema_with_corrupt = schema.add(StructField("_corrupt_record", StringType(), True))
       df = spark.read.option("mode", "PERMISSIVE").schema(schema_with_corrupt).json(file_path)
      
       # Cache to avoid Spark 4.0 corrupt record query restrictions
       df = df.cache()
       total_records = df.count()
      
       # Separate good from bad
       good_df = df.filter(col("_corrupt_record").isNull() &amp;amp; col("user_id").isNotNull()).drop("_corrupt_record")
       bad_df = df.filter(col("_corrupt_record").isNotNull() | col("user_id").isNull())
       good_count = good_df.count()
       bad_count = bad_df.count()
       print(f"🔄 PERMISSIVE recovery: {good_count} good, {bad_count} bad records")
       return good_df, bad_df


if __name__ == "__main__":
   spark = SparkSession.builder.appName("SmartJSONLoader").master("local[*]").getOrCreate()
  
   # Test with different data files
   test_files = [
       "data/clean_transactions.json",
       "data/mixed_transactions.json",
       "data/invalid_transactions.json"
   ]
  
   for file_path in test_files:
       print(f"\n{'='*60}")
       print(f"🧪 TESTING: {file_path}")
       print(f"{'='*60}")
             good_data, bad_data = smart_json_load(spark, file_path)
      
       if bad_data is None:
           print(f"🎯 Perfect! Clean data processed with FAILFAST")
       else:
           print(f"🛡️  Recovered data with PERMISSIVE fallback")
   spark.stop()&lt;/LI-CODE&gt;&lt;P class=""&gt;&lt;FONT size="5"&gt;Example 2: Permissive Mode with Corrupt Record Tracking&lt;/FONT&gt;&lt;/P&gt;&lt;P class=""&gt;Sometimes you want to ingest what you can and track what fails. This example demonstrates&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;STRONG&gt;PERMISSIVE&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/STRONG&gt;mode in PySpark 4.0, which enables the ingestion of data that can be parsed while tracking problematic records for later analysis and investigation. Ideal for exploratory data analysis and gradual improvement of data quality.&lt;/P&gt;&lt;LI-CODE lang="python"&gt;from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
from pyspark.sql.functions import col

# Define schema once at module level for efficiency
PERMISSIVE_SCHEMA = StructType([
    StructField("user_id", StringType(), nullable=True),
    StructField("transaction_amount", DoubleType(), nullable=True),
    StructField("transaction_date", TimestampType(), nullable=True),
    StructField("merchant_category", StringType(), nullable=True),
    StructField("customer_tier", StringType(), nullable=True),
    StructField("_corrupt_record", StringType(), nullable=True)  # Captures bad records
])

def load_with_permissive_mode(spark, file_path):
   """
   Example 2: Permissive Mode with Corrupt Record Tracking
  
   Uses PERMISSIVE mode - never fails, captures bad records in _corrupt_record column.
   Perfect for messy data where you want to save what you can.
   """
  
   print(f"📋 Loading {file_path} with permissive mode...")
  
   # PERMISSIVE mode - never fails, captures corruption
   df = spark.read.option("mode", "PERMISSIVE").schema(PERMISSIVE_SCHEMA).json(file_path)
  
   total_records = df.count()
  
   # Separate good from bad records
   good_records = df.filter(col("_corrupt_record").isNull() &amp;amp; col("user_id").isNotNull())
   bad_records = df.filter(col("_corrupt_record").isNotNull() | col("user_id").isNull())
  
   good_count = good_records.count()
   bad_count = bad_records.count()
   success_rate = (good_count / total_records * 100) if total_records &amp;gt; 0 else 0
  
   print(f"📊 Results: {good_count}/{total_records} good records ({success_rate:.1f}% success)")
  
   if good_count &amp;gt; 0:
       print("✅ Sample good records:")
       good_records.drop("_corrupt_record").show(2, truncate=False)
  
   if bad_count &amp;gt; 0:
       print("❌ Sample bad records:")
       bad_records.select("_corrupt_record").show(2, truncate=False)
  
   return good_records, bad_records


if __name__ == "__main__":
   spark = SparkSession.builder.appName("PermissiveMode").master("local[*]").getOrCreate()
  
   # Test with different data files
   test_files = [
       "data/clean_transactions.json",
       "data/mixed_transactions.json",
       "data/invalid_transactions.json"
   ]
  
   for file_path in test_files:
       print(f"\n{'='*60}")
       print(f"🧪 TESTING: {file_path}")
       print(f"{'='*60}")
      
       good_data, bad_data = load_with_permissive_mode(spark, file_path)
       print(f"✅ PERMISSIVE always succeeds - captured {good_data.count()} good, {bad_data.count()} bad records")
  
   spark.stop()&lt;/LI-CODE&gt;&lt;P class=""&gt;&amp;nbsp;&lt;SPAN&gt;The beauty of this approach is that you can continue processing with the clean data while simultaneously capturing the problematic records for later analysis and review. This provides visibility into data quality issues without interrupting your entire pipeline. Code and test runs for the above examples are in this&amp;nbsp;&lt;/SPAN&gt;&lt;A class="" href="https://github.com/dmatrix/spark-misc/tree/main/src/py/blogs" target="_blank" rel="noopener ugc nofollow"&gt;GitHub Repo&lt;/A&gt;&lt;SPAN&gt;.&lt;/SPAN&gt;&lt;/P&gt;&lt;H2 id="27ef"&gt;&lt;FONT size="5"&gt;Validating Data for Accuracy&lt;/FONT&gt;&lt;/H2&gt;&lt;P class=""&gt;The second approach is validating for data accuracy. Schema enforcement gets you part of the way there, but just because a field exists and has the correct data type doesn’t mean the data makes business sense. In this scenario, data validation ensures your transaction amounts fall within realistic ranges and your categorical values are from your approved business domains.&lt;/P&gt;&lt;P class=""&gt;Think of this as your business logic validation layer. You’re not just checking that a&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;EM&gt;transaction_amount&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/EM&gt;field is a double; you’re checking that it’s a reasonable amount for a financial transaction. You’re not just verifying that&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;EM&gt;merchant_category&lt;/EM&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;is a string; you’re ensuring it’s one of your supported merchant types like “grocery”, “restaurant”, or “retail” — not “currency”, which your payment system doesn’t handle.&lt;/P&gt;&lt;P class=""&gt;Similarly, you’re not just confirming that&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;EM&gt;customer_tier&lt;/EM&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;contains text data; you’re validating it matches your actual tier system (“bronze”, “silver”, “gold”, “platinum”) rather than made-up values like “diamond” that don’t exist in your business model. This validation layer catches data that’s technically valid but business-invalid:&lt;/P&gt;&lt;UL class=""&gt;&lt;LI&gt;&lt;span class="lia-unicode-emoji" title=":cross_mark:"&gt;❌&lt;/span&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;EM&gt;transaction_amount:&lt;/EM&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;-10.50 → Negative transactions don’t make sense&lt;/LI&gt;&lt;LI&gt;&lt;span class="lia-unicode-emoji" title=":cross_mark:"&gt;❌&lt;/span&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;EM&gt;transaction_amount&lt;/EM&gt;: 15000.00 → Exceeds your transaction limits&lt;/LI&gt;&lt;LI&gt;&lt;span class="lia-unicode-emoji" title=":cross_mark:"&gt;❌&lt;/span&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;EM&gt;merchant_category&lt;/EM&gt;: “currency” → Not in your supported merchant types&lt;/LI&gt;&lt;LI&gt;&lt;span class="lia-unicode-emoji" title=":cross_mark:"&gt;❌&lt;/span&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;EM&gt;customer_tier&lt;/EM&gt;: “diamond” → Not in your tier system&lt;/LI&gt;&lt;/UL&gt;&lt;P class=""&gt;Schema validation ensures data structure integrity; business logic validation ensures that the data makes sense within your business context.&lt;/P&gt;&lt;H3 id="e99f"&gt;&lt;FONT size="5"&gt;Example 1: Range and Business Logic Validation&lt;/FONT&gt;&lt;/H3&gt;&lt;LI-CODE lang="python"&gt;from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when
from pyspark.sql.types import StructType, StructField, StringType, DoubleType


def validate_transaction_business_rules(df):
   """
   Business Logic Validation - ensuring data makes business sense and it goes beyond schema validation to check ranges, categories, and business constraints.
   """
 # Define business rules
   VALID_MERCHANTS = ["grocery", "restaurant", "gas_station", "retail", "entertainment", "other"]
   VALID_TIERS = ["bronze", "silver", "gold", "platinum"]
   MIN_AMOUNT = 0.01
   MAX_AMOUNT = 10000.0
  
   # Apply validation rules
   validated_df = df.withColumn(
       "validation_status",
       when(col("transaction_amount") &amp;lt; MIN_AMOUNT, "negative_or_zero_amount")
       .when(col("transaction_amount") &amp;gt; MAX_AMOUNT, "excessive_amount")
       .when(~col("merchant_category").isin(VALID_MERCHANTS), "invalid_merchant_category")
       .when(~col("customer_tier").isin(VALID_TIERS), "invalid_customer_tier")
       .otherwise("valid")
   )
  
   # Separate valid from invalid data
   valid_df = validated_df.filter(col("validation_status") == "valid").drop("validation_status")
   invalid_df = validated_df.filter(col("validation_status") != "valid")
  
   # Show validation summary
   total = validated_df.count()
   valid_count = valid_df.count()
   invalid_count = invalid_df.count()
   success_rate = (valid_count / total * 100) if total &amp;gt; 0 else 0
  
   print(f"📊 Business Logic Validation: {valid_count}/{total} valid ({success_rate:.1f}%)")
  
   if invalid_count &amp;gt; 0:
       print("🔍 Validation issues:")
       validated_df.groupBy("validation_status").count().show()
  
   return valid_df, invalid_df


if __name__ == "__main__":
   # Test the validation function
   spark = SparkSession.builder.appName("BusinessLogicValidation").master("local[*]").getOrCreate()
  
   # Create test data with various business rule violations
   test_data = [
       # Valid records
       ("user_001", 45.99, "grocery", "gold"),
       ("user_002", 125.50, "restaurant", "silver"),
       ("user_003", 89.99, "retail", "bronze"),
      
       # Invalid records
       ("user_004", -10.50, "grocery", "gold"),           # Negative amount
       ("user_005", 15000.00, "restaurant", "silver"),    # Excessive amount
       ("user_006", 50.00, "currency", "gold"),           # Invalid merchant
       ("user_007", 75.25, "grocery", "diamond"),         # Invalid tier
       ("user_008", 0.00, "gas_station", "platinum"),     # Zero amount
   ]
  
   schema = StructType([
       StructField("user_id", StringType(), False),
       StructField("transaction_amount", DoubleType(), False),
       StructField("merchant_category", StringType(), False),
       StructField("customer_tier", StringType(), False)
   ])
  
   test_df = spark.createDataFrame(test_data, schema)
  
   print("🧪 Testing Business Logic Validation")
   print("=" * 50)
  
   # Apply validation
   valid_data, invalid_data = validate_transaction_business_rules(test_df)
  
   # Verify results
   if valid_data.count() == 3 and invalid_data.count() == 5:
       print("✅ Test PASSED: 3 valid, 5 invalid records as expected")
   else:
       print(f"❌ Test FAILED: Expected 3 valid, 5 invalid. Got {valid_data.count()} valid, {invalid_data.count()} invalid")
   spark.stop()&lt;/LI-CODE&gt;&lt;P class=""&gt;The code example to validate business logic and data validation, and its relevant test run for the above example, is in this&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;A class="" href="https://github.com/dmatrix/spark-misc/tree/main/src/py/blogs" target="_blank" rel="noopener ugc nofollow"&gt;GitHub Repo&lt;/A&gt;.&lt;/P&gt;&lt;P class=""&gt;In addition to schema and business logic validation and data consistency, another common source of data chaos is data duplication.&lt;/P&gt;&lt;H2 id="846e"&gt;Dropping Data Duplicates&lt;/H2&gt;&lt;P class=""&gt;The final approach is to consider duplicates. Duplicate records can skew your analytics and reporting. They can inflate metrics, create incorrect aggregations, and lead to poor business decisions. PySpark DataFrame API provides powerful deduplication capabilities that go beyond simple row-by-row comparisons.&lt;/P&gt;&lt;P class=""&gt;The key insight here is that duplicates aren’t always exact row matches. Sometimes, you need to define business logic around what constitutes a duplicate — perhaps it’s the same transaction ID, or the same customer making the same purchase within a specific time window.&lt;/P&gt;&lt;P class=""&gt;Let’s examine the code for both scenarios: simple exact matches and business logic that dictates duplicates.&lt;/P&gt;&lt;H3 id="9bf9"&gt;Example 1: Simple Deduplication with Performance Optimization&lt;/H3&gt;&lt;LI-CODE lang="python"&gt;from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
from pyspark.sql.functions import col, row_number, desc, when, lag, unix_timestamp, abs as spark_abs
from pyspark.sql.window import Window
from datetime import datetime

def create_sample_data(spark):
    """Create sample data with various duplicate patterns"""
    data = [
        # Exact duplicates
        ("TXN001", "USER123", 99.99, datetime(2024, 1, 15, 10, 30, 0), "RETAIL", "GOLD"),
        ("TXN001", "USER123", 99.99, datetime(2024, 1, 15, 10, 30, 0), "RETAIL", "GOLD"),
        
        # Same transaction_id, different details (system error)
        ("TXN002", "USER456", 150.00, datetime(2024, 1, 15, 11, 0, 0), "GROCERY", "SILVER"),
        ("TXN002", "USER789", 200.00, datetime(2024, 1, 15, 11, 5, 0), "RETAIL", "BRONZE"),
        
        # Same user + amount within 30 seconds (double-click)
        ("TXN003", "USER123", 49.99, datetime(2024, 1, 15, 12, 0, 0), "ONLINE", "GOLD"),
        ("TXN004", "USER123", 49.99, datetime(2024, 1, 15, 12, 0, 30), "ONLINE", "GOLD"),
        
        # Unique records
        ("TXN005", "USER456", 75.50, datetime(2024, 1, 15, 13, 0, 0), "RESTAURANT", "SILVER"),
        ("TXN006", "USER789", 120.00, datetime(2024, 1, 15, 14, 0, 0), "GAS", "BRONZE"),
    ]
    
    schema = StructType([
        StructField("transaction_id", StringType(), False),
        StructField("user_id", StringType(), False),
        StructField("transaction_amount", DoubleType(), False),
        StructField("transaction_date", TimestampType(), False),
        StructField("merchant_category", StringType(), True),
        StructField("customer_tier", StringType(), True)
    ])
    
    return spark.createDataFrame(data, schema)

def exact_deduplication(df):
    """Remove exact duplicate rows - fastest method"""
    return df.distinct()

def business_logic_deduplication(df):
    """Remove duplicates based on business rules"""
    
    # Rule 1: Same transaction_id = duplicate (keep first occurrence)
    window_txn = Window.partitionBy("transaction_id").orderBy("transaction_date")
    df = df.withColumn("txn_rank", row_number().over(window_txn)) \
           .filter(col("txn_rank") == 1) \
           .drop("txn_rank")
    
    # Rule 2: Same user + amount within 60 seconds = double-click (keep first)
    window_user = Window.partitionBy("user_id", "transaction_amount").orderBy("transaction_date")
    
    df = df.withColumn("prev_time", lag("transaction_date").over(window_user)) \
           .withColumn("time_diff", 
                      when(col("prev_time").isNull(), 999999)
                      .otherwise(spark_abs(unix_timestamp("transaction_date") - 
                                         unix_timestamp("prev_time")))) \
           .filter(col("time_diff") &amp;gt; 60) \
           .drop("prev_time", "time_diff")
    
    return df

def priority_deduplication(df):
    """Keep the best record when duplicates exist"""
    
    # Priority: GOLD &amp;gt; SILVER &amp;gt; BRONZE
    df_with_priority = df.withColumn("priority",
                                   when(col("customer_tier") == "GOLD", 3)
                                   .when(col("customer_tier") == "SILVER", 2)
                                   .when(col("customer_tier") == "BRONZE", 1)
                                   .otherwise(0))
    
    window_priority = Window.partitionBy("transaction_id") \
                           .orderBy(desc("priority"), desc("transaction_date"))
    
    return df_with_priority.withColumn("rank", row_number().over(window_priority)) \
                          .filter(col("rank") == 1) \
                          .drop("priority", "rank")

if __name__ == "__main__":
    # Initialize Spark with performance optimizations
    spark = SparkSession.builder \
        .appName("DeduplicationExample") \
        .master("local[*]") \
        .config("spark.sql.adaptive.enabled", "true") \
        .getOrCreate()
    
    spark.sparkContext.setLogLevel("ERROR")  # Reduce verbose output
    
    # Create sample data
    df = create_sample_data(spark)
    print(f"Original records: {df.count()}")
    
    # Method 1: Exact deduplication (fastest for exact matches)
    exact_result = exact_deduplication(df)
    print(f"After exact deduplication: {exact_result.count()}")
    
    # Method 2: Business logic deduplication
    business_result = business_logic_deduplication(df)
    print(f"After business logic deduplication: {business_result.count()}")
    
    # Method 3: Priority-based deduplication
    priority_result = priority_deduplication(df)
    print(f"After priority deduplication: {priority_result.count()}")
    
    # Show final results
    print("\nFinal deduplicated data:")
    business_result.show(truncate=False)
    
    spark.stop()&lt;/LI-CODE&gt;&lt;H3 id="14c8"&gt;Example 2: Advanced Deduplication with Custom Logic&lt;/H3&gt;&lt;P class=""&gt;On occasions, you need more sophisticated deduplication logic — perhaps keeping the record with the highest value, the most recent timestamp, using multi-tier priority ranking, or aggregating instead of removing entries. Some of this custom logic will be dependent on your business use case.&lt;/P&gt;&lt;LI-CODE lang="python"&gt;from pyspark.sql import SparkSession
from pyspark.sql.functions import col, row_number, desc, when, max as spark_max, sum as spark_sum, count
from pyspark.sql.window import Window
from datetime import datetime

def create_sample_data(spark):
    """Sample customer data with duplicates"""
    data = [
        ("CUST001", "John Smith", 50.00, datetime(2024, 1, 15, 10, 0, 0), "Basic", "ACTIVE"),
        ("CUST001", "John Smith", 150.00, datetime(2024, 1, 15, 11, 0, 0), "Premium", "ACTIVE"),
        ("CUST002", "Jane Doe", 100.00, datetime(2024, 1, 15, 9, 0, 0), "Standard", "SUSPENDED"),
        ("CUST002", "Jane Doe", 100.00, datetime(2024, 1, 15, 11, 0, 0), "Standard", "ACTIVE"),
        ("CUST003", "Bob Wilson", 220.00, datetime(2024, 1, 15, 8, 0, 0), "Basic", "ACTIVE"),
        ("CUST003", "Bob Wilson", 180.00, datetime(2024, 1, 15, 10, 0, 0), "Premium", "ACTIVE"),
    ]
    columns = ["customer_id", "name", "amount", "timestamp", "tier", "status"]
    return spark.createDataFrame(data, columns)

def keep_highest_value(df):
    """Keep record with highest amount per customer"""
    window = Window.partitionBy("customer_id").orderBy(desc("amount"))
    return df.withColumn("rank", row_number().over(window)) \
             .filter(col("rank") == 1).drop("rank")

def keep_most_recent(df):
    """Keep most recent record per customer"""
    window = Window.partitionBy("customer_id").orderBy(desc("timestamp"))
    return df.withColumn("rank", row_number().over(window)) \
             .filter(col("rank") == 1).drop("rank")

def multi_criteria_best(df):
    """Advanced: Keep best record using business priority"""
    df_priority = df.withColumn("tier_score", 
                               when(col("tier") == "Premium", 3)
                               .when(col("tier") == "Standard", 2)
                               .otherwise(1)) \
                    .withColumn("status_score", 
                               when(col("status") == "ACTIVE", 1).otherwise(0))
    
    window = Window.partitionBy("customer_id") \
                  .orderBy(desc("tier_score"), desc("status_score"), 
                          desc("amount"), desc("timestamp"))
    
    return df_priority.withColumn("rank", row_number().over(window)) \
                     .filter(col("rank") == 1) \
                     .drop("tier_score", "status_score", "rank")

def aggregate_duplicates(df):
    """Alternative: Aggregate instead of removing duplicates"""
    return df.groupBy("customer_id", "name") \
             .agg(spark_max("amount").alias("max_amount"),
                  spark_sum("amount").alias("total_amount"),
                  count("*").alias("record_count"),
                  spark_max("timestamp").alias("latest_update"))

if __name__ == "__main__":
    spark = SparkSession.builder.appName("AdvancedDedup").master("local[*]").getOrCreate()
    spark.sparkContext.setLogLevel("ERROR")
    
    df = create_sample_data(spark)
    print(f"Original: {df.count()} records")
    
    # Method 1: Keep highest value
    highest = keep_highest_value(df)
    print(f"Highest value: {highest.count()} records")
    highest.select("customer_id", "name", "amount", "tier").show()
    
    # Method 2: Keep most recent
    recent = keep_most_recent(df)
    print(f"Most recent: {recent.count()} records")
    recent.select("customer_id", "name", "timestamp", "status").show()
    
    # Method 3: Multi-criteria priority
    best = multi_criteria_best(df)
    print(f"Multi-criteria best: {best.count()} records")
    best.show()
    
    # Method 4: Aggregate instead of remove
    agg = aggregate_duplicates(df)
    print(f"Aggregated: {agg.count()} records")
    agg.show()
    
    spark.stop()&lt;/LI-CODE&gt;&lt;P&gt;&lt;SPAN&gt;For a comprehensive set of deduplication strategies with custom logic, refer to the&amp;nbsp;&lt;/SPAN&gt;&lt;A class="" href="https://github.com/dmatrix/spark-misc/blob/main/src/py/blogs/README.md" target="_blank" rel="noopener ugc nofollow"&gt;GitHub&amp;nbsp;&lt;/A&gt;&lt;SPAN&gt;&lt;A href="https://github.com/dmatrix/spark-misc/blob/main/src/py/blogs/README.md" target="_blank" rel="noopener"&gt;Rep&lt;/A&gt;ository&lt;/SPAN&gt;&lt;SPAN&gt;. These approaches give you control over which records to keep when duplicates are found, rather than just arbitrarily keeping the first occurrence. For a more advanced version of deduplication, check the code repository.&lt;/SPAN&gt;&lt;/P&gt;&lt;H2 id="6439"&gt;Conclusion&lt;/H2&gt;&lt;P class=""&gt;To recap, data quality issues in large JSON ingestion pipelines are inevitable, but they don’t have to be catastrophic. By implementing these three strategies — schema enforcement, data validation, and custom deduplication — you can identify and address problems early, handling them with greater ease. By no means are these exhaustive, yet they are sufficient and commonly used.&lt;/P&gt;&lt;P class=""&gt;The key is to be proactive rather than reactive. Set up your quality checks at ingestion time, not after your data has already propagated through your entire data lake. These PySpark techniques provide you with the tools to build robust data quality pipelines that can handle the complexities of real-world data.&lt;/P&gt;&lt;P class=""&gt;Remember that data quality is an ongoing process, not a one-time fix. Monitor your validation metrics over time and continuously refine your validation rules as your understanding of the data evolves.&lt;/P&gt;&lt;P class=""&gt;Your downstream consumers will thank you for the clean, reliable data, not least for decreasing the frequency of your 3 AM pager duty calls.&lt;/P&gt;&lt;H2 id="5480"&gt;&lt;FONT size="5"&gt;Resources and References&lt;/FONT&gt;&lt;/H2&gt;&lt;OL class=""&gt;&lt;LI&gt;Apache Spark™ 4.0 Documentation Overview — The main documentation hub for Apache Spark™ 4.0&lt;BR /&gt;&lt;A class="" href="https://spark.apache.org/docs/latest/" target="_blank" rel="noopener ugc nofollow"&gt;https://spark.apache.org/docs/latest/&lt;/A&gt;&lt;/LI&gt;&lt;LI&gt;PySpark 4.0 API Documentation — Official Python API documentation for Apache Spark™ 4.0&lt;BR /&gt;&lt;A class="" href="https://spark.apache.org/docs/latest/api/python/index.html" target="_blank" rel="noopener ugc nofollow"&gt;https://spark.apache.org/docs/latest/api/python/index.html&lt;/A&gt;&lt;/LI&gt;&lt;LI&gt;Delta Lake Schema Enforcement and Evolution — Comprehensive guide on schema enforcement mechanisms in Delta Lake&lt;BR /&gt;&lt;A class="" href="https://www.databricks.com/blog/2019/09/24/diving-into-delta-lake-schema-enforcement-evolution.html" target="_blank" rel="noopener ugc nofollow"&gt;https://www.databricks.com/blog/2019/09/24/diving-into-delta-lake-schema-enforcement-evolution.html&lt;/A&gt;&lt;/LI&gt;&lt;LI&gt;Databricks Schema Enforcement Documentation — Best practices for data quality through schema enforcement&lt;BR /&gt;&lt;A class="" href="https://docs.databricks.com/aws/en/tables/schema-enforcement" target="_blank" rel="noopener ugc nofollow"&gt;https://docs.databricks.com/aws/en/tables/schema-enforcement&lt;/A&gt;&lt;/LI&gt;&lt;LI&gt;Introducing Apache Spark™ 4.0 — Overview of key features and improvements in Spark 4.0&lt;BR /&gt;&lt;A class="" href="https://www.databricks.com/blog/introducing-apache-spark-40" target="_blank" rel="noopener ugc nofollow"&gt;https://www.databricks.com/blog/introducing-apache-spark-40&lt;/A&gt;&lt;/LI&gt;&lt;LI&gt;Great Expectations — Data validation and documentation tool for data quality management&lt;BR /&gt;&lt;A class="" href="https://greatexpectations.io/" target="_blank" rel="noopener ugc nofollow"&gt;https://greatexpectations.io/&lt;/A&gt;&lt;/LI&gt;&lt;/OL&gt;</description>
      <pubDate>Sun, 16 Nov 2025 19:21:45 GMT</pubDate>
      <guid>https://community.databricks.com/t5/community-articles/handling-the-chaos-data-quality-strategies-with-pyspark/m-p/139249#M784</guid>
      <dc:creator>dmatrixjsd</dc:creator>
      <dc:date>2025-11-16T19:21:45Z</dc:date>
    </item>
  </channel>
</rss>

