- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
07-29-2025 09:44 AM
Hi @Raj_DB
The issue you're experiencing is likely due to several bottlenecks in your data pipeline.
Here's a comprehensive optimization strategy:
1. Implement JDBC Partitioning (Critical)
# Option A: Numeric partitioning (if you have a numeric key)
df = spark.read \
.format("jdbc") \
.option("url", jdbc_url) \
.option("dbtable", oracle_query) \
.option("user", jdbc_username) \
.option("password", jdbc_password) \
.option("driver", driver_class) \
.option("fetchsize", "50000") \
.option("partitionColumn", "numeric_id_column") \
.option("lowerBound", "1") \
.option("upperBound", "1000000000") \
.option("numPartitions", "32") \
.load()
# Option B: Date-based partitioning (recommended for your use case)
df = spark.read \
.format("jdbc") \
.option("url", jdbc_url) \
.option("dbtable", oracle_query) \
.option("user", jdbc_username) \
.option("password", jdbc_password) \
.option("driver", driver_class) \
.option("fetchsize", "50000") \
.option("partitionColumn", "date_col") \
.option("lowerBound", "2024-01-01") \
.option("upperBound", "2024-12-31") \
.option("numPartitions", "24") \ # 2 years * 12 months
.load()
2. Optimize Oracle Query Structure
Modify your Oracle query to be more partition-friendly:
oracle_query = """(
SELECT /*+ parallel(50) */ DISTINCT
t1.column1, ..., t2.column28,
t1.date_col,
EXTRACT(YEAR FROM t1.date_col) as year_partition,
EXTRACT(MONTH FROM t1.date_col) as month_partition,
-- Add a numeric column for partitioning if available
ROW_NUMBER() OVER (ORDER BY t1.date_col) as row_id
FROM schema.table1 t1
LEFT JOIN schema.table2 t2
ON t1.key_col = t2.key_col
AND t1.date_col = t2.date_col
AND t2.date_col >= DATE '2024-01-01'
WHERE t1.date_col >= DATE '2024-01-01'
AND t1.date_col < DATE '2027-01-01' -- Use range for partitioning
)"""
Optimize Write Strategy:
# Optimized write for 980M records
spark.conf.set("spark.sql.files.maxPartitionBytes", "268435456")
spark.conf.set("spark.sql.parquet.compression.codec", "zstd")
spark.conf.set("spark.sql.parquet.block.size", "268435456")
final_df.coalesce(24) \ # Reduce from 100 partitions
.sortWithinPartitions("date_col") \ # Improve compression ratio
.write \
.partitionBy("year", "month") \
.option("maxRecordsPerFile", "20000000") \ # Larger files
.mode('overwrite') \
.format('parquet') \
.option('compression', 'zstd') \
.save(destination_path)