yesterday
Tips and Techniques for Ingesting Large JSON files with PySpark
Suppose you’ve ever struggled or grappled with consuming massive JSON files with PySpark. In that case, you are aware that insufficient data can always creep in and silently disrupt your ETL pipeline or pollute your data for downstream consumption.
In this blog post, I’ll walk you through three practical approaches to maintaining data quality during ingestion. We’ll cover how to drop records that don’t match your defined schema, validate that some numeric values are sensible (not just existent), and clean out any duplicate entries before they cause trouble downstream.
Figure 1. Overview of data quality strategies
These strategies seem simple yet effective techniques that can save you headaches when working with JSON files. While these three approaches are by no means exhaustive, other aspects of data quality, such as timeliness and consistency, will depend on data sources (real-time streaming, operational databases, etc.). Other external tools, such as Great Expectations, can also help mitigate these data validation issues.
For the past few weeks, I have been experimenting and cooperating with Claude Code, Cursor, and Goose, as my coding assistant: a generation and validation cycle, where I instruct, interact, validate, and reiterate with the AI coding agent for the best approach. Some of the code examples, test cases, and documentation are Cursor generated with my constant interactive approach of revision and validation, keeping AI “on a tight leash.”
Figure 2. Cycle of partial autonomy (source Andre Karpathy)
If you have not used the Cursor coding agent in this manner of cooperative partial autonomy, you ought to try it. 🤖 😎. You can explore the cooperative results of those data quality strategies in the GitHub repository.
Figure 3. PySpark files that implement the recommended strategy
Consider schema enforcement. When working with large JSON files, one of the most critical steps is ensuring your data conforms to an expected schema before it pollutes your downstream processes. Schema enforcement in PySpark 4.0 DataFrame API gives you the power to catch malformed data early and handle it gracefully.
The key is being proactive rather than reactive. Instead of discovering schema mismatches after your ETL job has been running for hours, you can catch these issues upfront and decide how to handle them.
Here’s how you can enforce a strict schema and capture any records that don’t comply. This strategy is a two-step process: FAILFAST for clean data; fallback to PERMISSIVE mode.
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
from pyspark.sql.functions import col
# Define schema once at module level for efficiency
TRANSACTION_SCHEMA = StructType([
StructField("user_id", StringType(), nullable=False),
StructField("transaction_amount", DoubleType(), nullable=False),
StructField("transaction_date", TimestampType(), nullable=True),
StructField("merchant_category", StringType(), nullable=True),
StructField("customer_tier", StringType(), nullable=True)
])
def smart_json_load(spark, file_path):
"""
Smart JSON Loading: Try strict first, fallback to permissive.
Best of both worlds:
- Fast processing for clean data (FAILFAST)
- Graceful recovery for messy data (PERMISSIVE)
"""
print(f"📋 Smart loading {file_path}...")
# Step 1: Try FAILFAST for clean data
try:
print("🎯 Attempting FAILFAST mode...")
df = spark.read.option("mode", "FAILFAST").schema(TRANSACTION_SCHEMA).json(file_path)
count = df.count()
# Force full evaluation to catch parsing errors - need to process all data
try:
df.collect() # This forces processing of ALL records, not just the first
print(f"✅ FAILFAST succeeded: {count} clean records")
return df, None
except Exception as eval_error:
# This catches parsing errors that occur during data access
raise eval_error
except Exception:
print("⚠️ FAILFAST failed, switching to PERMISSIVE mode...")
# Step 2: Fallback to PERMISSIVE
schema_with_corrupt = schema.add(StructField("_corrupt_record", StringType(), True))
df = spark.read.option("mode", "PERMISSIVE").schema(schema_with_corrupt).json(file_path)
# Cache to avoid Spark 4.0 corrupt record query restrictions
df = df.cache()
total_records = df.count()
# Separate good from bad
good_df = df.filter(col("_corrupt_record").isNull() & col("user_id").isNotNull()).drop("_corrupt_record")
bad_df = df.filter(col("_corrupt_record").isNotNull() | col("user_id").isNull())
good_count = good_df.count()
bad_count = bad_df.count()
print(f"🔄 PERMISSIVE recovery: {good_count} good, {bad_count} bad records")
return good_df, bad_df
if __name__ == "__main__":
spark = SparkSession.builder.appName("SmartJSONLoader").master("local[*]").getOrCreate()
# Test with different data files
test_files = [
"data/clean_transactions.json",
"data/mixed_transactions.json",
"data/invalid_transactions.json"
]
for file_path in test_files:
print(f"\n{'='*60}")
print(f"🧪 TESTING: {file_path}")
print(f"{'='*60}")
good_data, bad_data = smart_json_load(spark, file_path)
if bad_data is None:
print(f"🎯 Perfect! Clean data processed with FAILFAST")
else:
print(f"🛡️ Recovered data with PERMISSIVE fallback")
spark.stop()Example 2: Permissive Mode with Corrupt Record Tracking
Sometimes you want to ingest what you can and track what fails. This example demonstrates PERMISSIVE mode in PySpark 4.0, which enables the ingestion of data that can be parsed while tracking problematic records for later analysis and investigation. Ideal for exploratory data analysis and gradual improvement of data quality.
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
from pyspark.sql.functions import col
# Define schema once at module level for efficiency
PERMISSIVE_SCHEMA = StructType([
StructField("user_id", StringType(), nullable=True),
StructField("transaction_amount", DoubleType(), nullable=True),
StructField("transaction_date", TimestampType(), nullable=True),
StructField("merchant_category", StringType(), nullable=True),
StructField("customer_tier", StringType(), nullable=True),
StructField("_corrupt_record", StringType(), nullable=True) # Captures bad records
])
def load_with_permissive_mode(spark, file_path):
"""
Example 2: Permissive Mode with Corrupt Record Tracking
Uses PERMISSIVE mode - never fails, captures bad records in _corrupt_record column.
Perfect for messy data where you want to save what you can.
"""
print(f"📋 Loading {file_path} with permissive mode...")
# PERMISSIVE mode - never fails, captures corruption
df = spark.read.option("mode", "PERMISSIVE").schema(PERMISSIVE_SCHEMA).json(file_path)
total_records = df.count()
# Separate good from bad records
good_records = df.filter(col("_corrupt_record").isNull() & col("user_id").isNotNull())
bad_records = df.filter(col("_corrupt_record").isNotNull() | col("user_id").isNull())
good_count = good_records.count()
bad_count = bad_records.count()
success_rate = (good_count / total_records * 100) if total_records > 0 else 0
print(f"📊 Results: {good_count}/{total_records} good records ({success_rate:.1f}% success)")
if good_count > 0:
print("✅ Sample good records:")
good_records.drop("_corrupt_record").show(2, truncate=False)
if bad_count > 0:
print("❌ Sample bad records:")
bad_records.select("_corrupt_record").show(2, truncate=False)
return good_records, bad_records
if __name__ == "__main__":
spark = SparkSession.builder.appName("PermissiveMode").master("local[*]").getOrCreate()
# Test with different data files
test_files = [
"data/clean_transactions.json",
"data/mixed_transactions.json",
"data/invalid_transactions.json"
]
for file_path in test_files:
print(f"\n{'='*60}")
print(f"🧪 TESTING: {file_path}")
print(f"{'='*60}")
good_data, bad_data = load_with_permissive_mode(spark, file_path)
print(f"✅ PERMISSIVE always succeeds - captured {good_data.count()} good, {bad_data.count()} bad records")
spark.stop()The beauty of this approach is that you can continue processing with the clean data while simultaneously capturing the problematic records for later analysis and review. This provides visibility into data quality issues without interrupting your entire pipeline. Code and test runs for the above examples are in this GitHub Repo.
The second approach is validating for data accuracy. Schema enforcement gets you part of the way there, but just because a field exists and has the correct data type doesn’t mean the data makes business sense. In this scenario, data validation ensures your transaction amounts fall within realistic ranges and your categorical values are from your approved business domains.
Think of this as your business logic validation layer. You’re not just checking that a transaction_amount field is a double; you’re checking that it’s a reasonable amount for a financial transaction. You’re not just verifying that merchant_category is a string; you’re ensuring it’s one of your supported merchant types like “grocery”, “restaurant”, or “retail” — not “currency”, which your payment system doesn’t handle.
Similarly, you’re not just confirming that customer_tier contains text data; you’re validating it matches your actual tier system (“bronze”, “silver”, “gold”, “platinum”) rather than made-up values like “diamond” that don’t exist in your business model. This validation layer catches data that’s technically valid but business-invalid:
Schema validation ensures data structure integrity; business logic validation ensures that the data makes sense within your business context.
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
def validate_transaction_business_rules(df):
"""
Business Logic Validation - ensuring data makes business sense and it goes beyond schema validation to check ranges, categories, and business constraints.
"""
# Define business rules
VALID_MERCHANTS = ["grocery", "restaurant", "gas_station", "retail", "entertainment", "other"]
VALID_TIERS = ["bronze", "silver", "gold", "platinum"]
MIN_AMOUNT = 0.01
MAX_AMOUNT = 10000.0
# Apply validation rules
validated_df = df.withColumn(
"validation_status",
when(col("transaction_amount") < MIN_AMOUNT, "negative_or_zero_amount")
.when(col("transaction_amount") > MAX_AMOUNT, "excessive_amount")
.when(~col("merchant_category").isin(VALID_MERCHANTS), "invalid_merchant_category")
.when(~col("customer_tier").isin(VALID_TIERS), "invalid_customer_tier")
.otherwise("valid")
)
# Separate valid from invalid data
valid_df = validated_df.filter(col("validation_status") == "valid").drop("validation_status")
invalid_df = validated_df.filter(col("validation_status") != "valid")
# Show validation summary
total = validated_df.count()
valid_count = valid_df.count()
invalid_count = invalid_df.count()
success_rate = (valid_count / total * 100) if total > 0 else 0
print(f"📊 Business Logic Validation: {valid_count}/{total} valid ({success_rate:.1f}%)")
if invalid_count > 0:
print("🔍 Validation issues:")
validated_df.groupBy("validation_status").count().show()
return valid_df, invalid_df
if __name__ == "__main__":
# Test the validation function
spark = SparkSession.builder.appName("BusinessLogicValidation").master("local[*]").getOrCreate()
# Create test data with various business rule violations
test_data = [
# Valid records
("user_001", 45.99, "grocery", "gold"),
("user_002", 125.50, "restaurant", "silver"),
("user_003", 89.99, "retail", "bronze"),
# Invalid records
("user_004", -10.50, "grocery", "gold"), # Negative amount
("user_005", 15000.00, "restaurant", "silver"), # Excessive amount
("user_006", 50.00, "currency", "gold"), # Invalid merchant
("user_007", 75.25, "grocery", "diamond"), # Invalid tier
("user_008", 0.00, "gas_station", "platinum"), # Zero amount
]
schema = StructType([
StructField("user_id", StringType(), False),
StructField("transaction_amount", DoubleType(), False),
StructField("merchant_category", StringType(), False),
StructField("customer_tier", StringType(), False)
])
test_df = spark.createDataFrame(test_data, schema)
print("🧪 Testing Business Logic Validation")
print("=" * 50)
# Apply validation
valid_data, invalid_data = validate_transaction_business_rules(test_df)
# Verify results
if valid_data.count() == 3 and invalid_data.count() == 5:
print("✅ Test PASSED: 3 valid, 5 invalid records as expected")
else:
print(f"❌ Test FAILED: Expected 3 valid, 5 invalid. Got {valid_data.count()} valid, {invalid_data.count()} invalid")
spark.stop()The code example to validate business logic and data validation, and its relevant test run for the above example, is in this GitHub Repo.
In addition to schema and business logic validation and data consistency, another common source of data chaos is data duplication.
The final approach is to consider duplicates. Duplicate records can skew your analytics and reporting. They can inflate metrics, create incorrect aggregations, and lead to poor business decisions. PySpark DataFrame API provides powerful deduplication capabilities that go beyond simple row-by-row comparisons.
The key insight here is that duplicates aren’t always exact row matches. Sometimes, you need to define business logic around what constitutes a duplicate — perhaps it’s the same transaction ID, or the same customer making the same purchase within a specific time window.
Let’s examine the code for both scenarios: simple exact matches and business logic that dictates duplicates.
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
from pyspark.sql.functions import col, row_number, desc, when, lag, unix_timestamp, abs as spark_abs
from pyspark.sql.window import Window
from datetime import datetime
def create_sample_data(spark):
"""Create sample data with various duplicate patterns"""
data = [
# Exact duplicates
("TXN001", "USER123", 99.99, datetime(2024, 1, 15, 10, 30, 0), "RETAIL", "GOLD"),
("TXN001", "USER123", 99.99, datetime(2024, 1, 15, 10, 30, 0), "RETAIL", "GOLD"),
# Same transaction_id, different details (system error)
("TXN002", "USER456", 150.00, datetime(2024, 1, 15, 11, 0, 0), "GROCERY", "SILVER"),
("TXN002", "USER789", 200.00, datetime(2024, 1, 15, 11, 5, 0), "RETAIL", "BRONZE"),
# Same user + amount within 30 seconds (double-click)
("TXN003", "USER123", 49.99, datetime(2024, 1, 15, 12, 0, 0), "ONLINE", "GOLD"),
("TXN004", "USER123", 49.99, datetime(2024, 1, 15, 12, 0, 30), "ONLINE", "GOLD"),
# Unique records
("TXN005", "USER456", 75.50, datetime(2024, 1, 15, 13, 0, 0), "RESTAURANT", "SILVER"),
("TXN006", "USER789", 120.00, datetime(2024, 1, 15, 14, 0, 0), "GAS", "BRONZE"),
]
schema = StructType([
StructField("transaction_id", StringType(), False),
StructField("user_id", StringType(), False),
StructField("transaction_amount", DoubleType(), False),
StructField("transaction_date", TimestampType(), False),
StructField("merchant_category", StringType(), True),
StructField("customer_tier", StringType(), True)
])
return spark.createDataFrame(data, schema)
def exact_deduplication(df):
"""Remove exact duplicate rows - fastest method"""
return df.distinct()
def business_logic_deduplication(df):
"""Remove duplicates based on business rules"""
# Rule 1: Same transaction_id = duplicate (keep first occurrence)
window_txn = Window.partitionBy("transaction_id").orderBy("transaction_date")
df = df.withColumn("txn_rank", row_number().over(window_txn)) \
.filter(col("txn_rank") == 1) \
.drop("txn_rank")
# Rule 2: Same user + amount within 60 seconds = double-click (keep first)
window_user = Window.partitionBy("user_id", "transaction_amount").orderBy("transaction_date")
df = df.withColumn("prev_time", lag("transaction_date").over(window_user)) \
.withColumn("time_diff",
when(col("prev_time").isNull(), 999999)
.otherwise(spark_abs(unix_timestamp("transaction_date") -
unix_timestamp("prev_time")))) \
.filter(col("time_diff") > 60) \
.drop("prev_time", "time_diff")
return df
def priority_deduplication(df):
"""Keep the best record when duplicates exist"""
# Priority: GOLD > SILVER > BRONZE
df_with_priority = df.withColumn("priority",
when(col("customer_tier") == "GOLD", 3)
.when(col("customer_tier") == "SILVER", 2)
.when(col("customer_tier") == "BRONZE", 1)
.otherwise(0))
window_priority = Window.partitionBy("transaction_id") \
.orderBy(desc("priority"), desc("transaction_date"))
return df_with_priority.withColumn("rank", row_number().over(window_priority)) \
.filter(col("rank") == 1) \
.drop("priority", "rank")
if __name__ == "__main__":
# Initialize Spark with performance optimizations
spark = SparkSession.builder \
.appName("DeduplicationExample") \
.master("local[*]") \
.config("spark.sql.adaptive.enabled", "true") \
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR") # Reduce verbose output
# Create sample data
df = create_sample_data(spark)
print(f"Original records: {df.count()}")
# Method 1: Exact deduplication (fastest for exact matches)
exact_result = exact_deduplication(df)
print(f"After exact deduplication: {exact_result.count()}")
# Method 2: Business logic deduplication
business_result = business_logic_deduplication(df)
print(f"After business logic deduplication: {business_result.count()}")
# Method 3: Priority-based deduplication
priority_result = priority_deduplication(df)
print(f"After priority deduplication: {priority_result.count()}")
# Show final results
print("\nFinal deduplicated data:")
business_result.show(truncate=False)
spark.stop()On occasions, you need more sophisticated deduplication logic — perhaps keeping the record with the highest value, the most recent timestamp, using multi-tier priority ranking, or aggregating instead of removing entries. Some of this custom logic will be dependent on your business use case.
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, row_number, desc, when, max as spark_max, sum as spark_sum, count
from pyspark.sql.window import Window
from datetime import datetime
def create_sample_data(spark):
"""Sample customer data with duplicates"""
data = [
("CUST001", "John Smith", 50.00, datetime(2024, 1, 15, 10, 0, 0), "Basic", "ACTIVE"),
("CUST001", "John Smith", 150.00, datetime(2024, 1, 15, 11, 0, 0), "Premium", "ACTIVE"),
("CUST002", "Jane Doe", 100.00, datetime(2024, 1, 15, 9, 0, 0), "Standard", "SUSPENDED"),
("CUST002", "Jane Doe", 100.00, datetime(2024, 1, 15, 11, 0, 0), "Standard", "ACTIVE"),
("CUST003", "Bob Wilson", 220.00, datetime(2024, 1, 15, 8, 0, 0), "Basic", "ACTIVE"),
("CUST003", "Bob Wilson", 180.00, datetime(2024, 1, 15, 10, 0, 0), "Premium", "ACTIVE"),
]
columns = ["customer_id", "name", "amount", "timestamp", "tier", "status"]
return spark.createDataFrame(data, columns)
def keep_highest_value(df):
"""Keep record with highest amount per customer"""
window = Window.partitionBy("customer_id").orderBy(desc("amount"))
return df.withColumn("rank", row_number().over(window)) \
.filter(col("rank") == 1).drop("rank")
def keep_most_recent(df):
"""Keep most recent record per customer"""
window = Window.partitionBy("customer_id").orderBy(desc("timestamp"))
return df.withColumn("rank", row_number().over(window)) \
.filter(col("rank") == 1).drop("rank")
def multi_criteria_best(df):
"""Advanced: Keep best record using business priority"""
df_priority = df.withColumn("tier_score",
when(col("tier") == "Premium", 3)
.when(col("tier") == "Standard", 2)
.otherwise(1)) \
.withColumn("status_score",
when(col("status") == "ACTIVE", 1).otherwise(0))
window = Window.partitionBy("customer_id") \
.orderBy(desc("tier_score"), desc("status_score"),
desc("amount"), desc("timestamp"))
return df_priority.withColumn("rank", row_number().over(window)) \
.filter(col("rank") == 1) \
.drop("tier_score", "status_score", "rank")
def aggregate_duplicates(df):
"""Alternative: Aggregate instead of removing duplicates"""
return df.groupBy("customer_id", "name") \
.agg(spark_max("amount").alias("max_amount"),
spark_sum("amount").alias("total_amount"),
count("*").alias("record_count"),
spark_max("timestamp").alias("latest_update"))
if __name__ == "__main__":
spark = SparkSession.builder.appName("AdvancedDedup").master("local[*]").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
df = create_sample_data(spark)
print(f"Original: {df.count()} records")
# Method 1: Keep highest value
highest = keep_highest_value(df)
print(f"Highest value: {highest.count()} records")
highest.select("customer_id", "name", "amount", "tier").show()
# Method 2: Keep most recent
recent = keep_most_recent(df)
print(f"Most recent: {recent.count()} records")
recent.select("customer_id", "name", "timestamp", "status").show()
# Method 3: Multi-criteria priority
best = multi_criteria_best(df)
print(f"Multi-criteria best: {best.count()} records")
best.show()
# Method 4: Aggregate instead of remove
agg = aggregate_duplicates(df)
print(f"Aggregated: {agg.count()} records")
agg.show()
spark.stop()For a comprehensive set of deduplication strategies with custom logic, refer to the GitHub Repository. These approaches give you control over which records to keep when duplicates are found, rather than just arbitrarily keeping the first occurrence. For a more advanced version of deduplication, check the code repository.
To recap, data quality issues in large JSON ingestion pipelines are inevitable, but they don’t have to be catastrophic. By implementing these three strategies — schema enforcement, data validation, and custom deduplication — you can identify and address problems early, handling them with greater ease. By no means are these exhaustive, yet they are sufficient and commonly used.
The key is to be proactive rather than reactive. Set up your quality checks at ingestion time, not after your data has already propagated through your entire data lake. These PySpark techniques provide you with the tools to build robust data quality pipelines that can handle the complexities of real-world data.
Remember that data quality is an ongoing process, not a one-time fix. Monitor your validation metrics over time and continuously refine your validation rules as your understanding of the data evolves.
Your downstream consumers will thank you for the clean, reliable data, not least for decreasing the frequency of your 3 AM pager duty calls.
Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!
Sign Up Now