07-29-2025 05:39 AM
Hi there,
I am currently working on a notebook where I pull data from an Oracle database using an Oracle SQL script with a JDBC connection. Due to the large dataset size and joins in my query, I’ve implemented the /*+ parallel(n) */ hint, which works efficiently—returning results in seconds for one month of data and a few minutes for two years of data, which is fine.
However, when writing the entire result set (~980M records) to my ADLS (Azure Data Lake Storage) path, the process is taking excessively long (over 4 hours without completion). I’ve tried multiple optimization methods but haven’t seen any improvement. I would deeply appreciate any insights or solutions.
I have tried for one month and it is taking around 10-15 min.
My cluster configuration:
Databricks runtime: 16.4 LTS
Worker Type: Standard_D32ds_v5 ( 128 Gb memory, 32 core)
Enable autoscaling: min(1) and max(8)
Driver Type: Standard_d16ds_v5 ( 64 Gb memory and 16 cores)
Example Code:
oracle_query = """( SELECT /*+ parallel(100) */ DISTINCT
t1.column1......t2.colum28
FROM schema.table1 t1
LEFT JOIN( SELECT *+ parallel(50) */ key_col, date_col, column
FROM schema.table2
WHERE date_col >= to_date( '2024-01-01', 'yyyy-MM-dd')
) t2
ON t1.key_col = t2.key_col AND t1.date_col = t2.date_col
WHERE t1.date_col >= to_date( '2024-01-01', 'yyyy-MM-dd')
)"""
JDBC connection:
df = spark.read \
.format("jdbc") \
.option("url", jdbc_url) \
.option("dbtable", oracle_query) \
.option("user", jdbc_username) \
.option("password", jdbc_password) \
.option("driver", driver_class) \
.option( fetchsize = "10000000")
.load()
Some transformation:
final_df = df.withcolumn("Year", year("date_col")\
.withcolumn("month", month("date_col")\
.withcolumn("Action_timestp", from_utc_timestamp(current_timestamp(),'America'))
Saving final dataframe to ADLS:
final_df.repartition(100,'year','month').write.option("maxrecordsPerFile",1000000).mode('Overwrite').format('parquet').option('compression','snappy').save(destination_path)
Thanks
07-29-2025 09:44 AM
Hi @Raj_DB
The issue you're experiencing is likely due to several bottlenecks in your data pipeline.
Here's a comprehensive optimization strategy:
1. Implement JDBC Partitioning (Critical)
# Option A: Numeric partitioning (if you have a numeric key)
df = spark.read \
.format("jdbc") \
.option("url", jdbc_url) \
.option("dbtable", oracle_query) \
.option("user", jdbc_username) \
.option("password", jdbc_password) \
.option("driver", driver_class) \
.option("fetchsize", "50000") \
.option("partitionColumn", "numeric_id_column") \
.option("lowerBound", "1") \
.option("upperBound", "1000000000") \
.option("numPartitions", "32") \
.load()
# Option B: Date-based partitioning (recommended for your use case)
df = spark.read \
.format("jdbc") \
.option("url", jdbc_url) \
.option("dbtable", oracle_query) \
.option("user", jdbc_username) \
.option("password", jdbc_password) \
.option("driver", driver_class) \
.option("fetchsize", "50000") \
.option("partitionColumn", "date_col") \
.option("lowerBound", "2024-01-01") \
.option("upperBound", "2024-12-31") \
.option("numPartitions", "24") \ # 2 years * 12 months
.load()
2. Optimize Oracle Query Structure
Modify your Oracle query to be more partition-friendly:
oracle_query = """(
SELECT /*+ parallel(50) */ DISTINCT
t1.column1, ..., t2.column28,
t1.date_col,
EXTRACT(YEAR FROM t1.date_col) as year_partition,
EXTRACT(MONTH FROM t1.date_col) as month_partition,
-- Add a numeric column for partitioning if available
ROW_NUMBER() OVER (ORDER BY t1.date_col) as row_id
FROM schema.table1 t1
LEFT JOIN schema.table2 t2
ON t1.key_col = t2.key_col
AND t1.date_col = t2.date_col
AND t2.date_col >= DATE '2024-01-01'
WHERE t1.date_col >= DATE '2024-01-01'
AND t1.date_col < DATE '2027-01-01' -- Use range for partitioning
)"""
Optimize Write Strategy:
# Optimized write for 980M records
spark.conf.set("spark.sql.files.maxPartitionBytes", "268435456")
spark.conf.set("spark.sql.parquet.compression.codec", "zstd")
spark.conf.set("spark.sql.parquet.block.size", "268435456")
final_df.coalesce(24) \ # Reduce from 100 partitions
.sortWithinPartitions("date_col") \ # Improve compression ratio
.write \
.partitionBy("year", "month") \
.option("maxRecordsPerFile", "20000000") \ # Larger files
.mode('overwrite') \
.format('parquet') \
.option('compression', 'zstd') \
.save(destination_path)
07-29-2025 10:56 PM
Hi @lingareddy_Alva , Thank you very much for all the optimization techniques.
I have tried this technique but still same issue.
# Optimized write for 980M records
spark.conf.set("spark.sql.files.maxPartitionBytes", "268435456")
spark.conf.set("spark.sql.parquet.compression.codec", "zstd")
spark.conf.set("spark.sql.parquet.block.size", "268435456")
final_df.coalesce(24) \ # Reduce from 100 partitions
.sortWithinPartitions("date_col") \ # Improve compression ratio
.write \
.partitionBy("year", "month") \
.option("maxRecordsPerFile", "20000000") \ # Larger files
.mode('overwrite') \
.format('parquet') \
.option('compression', 'zstd') \
.save(destination_path)
While applying upper and lower bounds on date_col, I encountered the following error. I have tried to fix it but I am getting same issue. My date_col is a timestamp type.
ORA-01861: literal does not match format string
df = spark.read \
.format("jdbc") \
.option("url", jdbc_url) \
.option("dbtable", oracle_query) \
.option("user", jdbc_username) \
.option("password", jdbc_password) \
.option("driver", driver_class) \
.option("fetchsize", "50000") \
.option("partitionColumn", "date_col") \
.option("lowerBound", "2025-05-01") \
.option("upperBound", "2025-07-31") \
.option("numPartitions", "5")
.load()
df.limit(5).display() # Getting that error while displaying result.
And I did not try the oracle query as I am not able to understand few columns where to use them.
Modify your Oracle query to be more partition-friendly:
oracle_query = """(
SELECT /*+ parallel(50) */ DISTINCT
t1.column1, ..., t2.column28,
t1.date_col,
EXTRACT(YEAR FROM t1.date_col) as year_partition, #1 where I can use this for partition
EXTRACT(MONTH FROM t1.date_col) as month_partition, #2 same
-- Add a numeric column for partitioning if available
ROW_NUMBER() OVER (ORDER BY t1.date_col) as row_id #3 Where I can use this col.
FROM schema.table1 t1
LEFT JOIN schema.table2 t2
ON t1.key_col = t2.key_col
AND t1.date_col = t2.date_col
AND t2.date_col >= DATE '2024-01-01'
WHERE t1.date_col >= DATE '2024-01-01'
AND t1.date_col < DATE '2027-01-01'
)"""
Could you help me if you have any solution?
Thanks
07-30-2025 06:29 AM
I can suggest few tweaks in the compute, the current D series is good enough, but we are handling huge data, please try bumping up minimum workers from 1 to at least 4; change the VM type - to a bigger one - Standard_E64ds_v5, and if not try to use a memory optimized instance too, the IO is high while extracting the data from external systems, we should see which area is fast, extraction process or writing process, accordingly better recomendation can be given. Try to find a better partition keys, run a groupby statement on the records against partition keys to check for distribution setup? Few ideas, please try!
a month ago
Hi @chanukya-pekala , Thank you for your suggestions. After making a few changes to the cluster as recommended, I observed a slight improvement in performance. However, I am still experiencing issues with the writing process.
a month ago
Hi @Raj_DB
Since you're still facing write performance issues even after cluster optimization,
Just try below Staged Writing approach:
def staged_write_approach(df, destination_path):
"""Write in stages to avoid memory pressure"""
# Stage 1: Write to temporary location without partitioning
temp_path = f"{destination_path}_temp_{int(time.time())}"
print("Stage 1: Writing to temporary location...")
start_time = time.time()
df.coalesce(16) \
.write \
.mode('overwrite') \
.format('parquet') \
.option('compression', 'zstd') \
.option("maxRecordsPerFile", "15000000") \
.save(temp_path)
stage1_time = (time.time() - start_time) / 60
print(f"Stage 1 completed in {stage1_time:.2f} minutes")
# Stage 2: Read back and write with partitioning
print("Stage 2: Reading back and applying partitioning...")
start_time = time.time()
temp_df = spark.read.parquet(temp_path)
temp_df.write \
.partitionBy("year", "month") \
.mode('overwrite') \
.format('parquet') \
.option('compression', 'zstd') \
.option("maxRecordsPerFile", "15000000") \
.save(destination_path)
stage2_time = (time.time() - start_time) / 60
print(f"Stage 2 completed in {stage2_time:.2f} minutes")
# Cleanup
dbutils.fs.rm(temp_path, True)
print(f"Total staged write time: {stage1_time + stage2_time:.2f} minutes")
# Use staged writing
staged_write_approach(final_df, destination_path)
Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!
Sign Up Now