cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Performance Issue – Writing Large Dataset to ADLS from Oracle via JDBC

Raj_DB
New Contributor III

Hi there, 

I am currently working on a notebook where I pull data from an Oracle database using an Oracle SQL script with a JDBC connection. Due to the large dataset size and joins in my query, I’ve implemented the /*+ parallel(n) */ hint, which works efficiently—returning results in seconds for one month of data and a few minutes for two years of data, which is fine.

However, when writing the entire result set (~980M records) to my ADLS (Azure Data Lake Storage) path, the process is taking excessively long (over 4 hours without completion). I’ve tried multiple optimization methods but haven’t seen any improvement. I would deeply appreciate any insights or solutions.

I have tried for one month and it is taking around 10-15 min.

My cluster configuration:

Databricks runtime: 16.4 LTS

Worker Type: Standard_D32ds_v5 ( 128 Gb memory, 32 core)

Enable autoscaling: min(1) and max(8) 

Driver Type: Standard_d16ds_v5 ( 64 Gb memory and 16 cores)

Example Code:

oracle_query = """( SELECT /*+ parallel(100) */ DISTINCT

                  t1.column1......t2.colum28

FROM schema.table1 t1

LEFT JOIN( SELECT  *+ parallel(50) */  key_col, date_col, column

                    FROM schema.table2 

                    WHERE date_col >= to_date( '2024-01-01', 'yyyy-MM-dd')

                   ) t2

ON t1.key_col = t2.key_col AND t1.date_col = t2.date_col

WHERE  t1.date_col >= to_date( '2024-01-01', 'yyyy-MM-dd')

)"""

JDBC connection:

df = spark.read \
    .format("jdbc") \
    .option("url", jdbc_url) \
    .option("dbtable", oracle_query) \
    .option("user", jdbc_username) \
    .option("password", jdbc_password) \
    .option("driver", driver_class) \
    .option( fetchsize = "10000000")
    .load()

Some transformation:

final_df = df.withcolumn("Year", year("date_col")\
              .withcolumn("month", month("date_col")\
              .withcolumn("Action_timestp", from_utc_timestamp(current_timestamp(),'America'))

Saving final dataframe to ADLS:

final_df.repartition(100,'year','month').write.option("maxrecordsPerFile",1000000).mode('Overwrite').format('parquet').option('compression','snappy').save(destination_path)

 

Thanks

5 REPLIES 5

lingareddy_Alva
Honored Contributor III

Hi @Raj_DB 

The issue you're experiencing is likely due to several bottlenecks in your data pipeline.
Here's a comprehensive optimization strategy:

1. Implement JDBC Partitioning (Critical)

# Option A: Numeric partitioning (if you have a numeric key)
df = spark.read \
.format("jdbc") \
.option("url", jdbc_url) \
.option("dbtable", oracle_query) \
.option("user", jdbc_username) \
.option("password", jdbc_password) \
.option("driver", driver_class) \
.option("fetchsize", "50000") \
.option("partitionColumn", "numeric_id_column") \
.option("lowerBound", "1") \
.option("upperBound", "1000000000") \
.option("numPartitions", "32") \
.load()

# Option B: Date-based partitioning (recommended for your use case)
df = spark.read \
.format("jdbc") \
.option("url", jdbc_url) \
.option("dbtable", oracle_query) \
.option("user", jdbc_username) \
.option("password", jdbc_password) \
.option("driver", driver_class) \
.option("fetchsize", "50000") \
.option("partitionColumn", "date_col") \
.option("lowerBound", "2024-01-01") \
.option("upperBound", "2024-12-31") \
.option("numPartitions", "24") \ # 2 years * 12 months
.load()

2. Optimize Oracle Query Structure
Modify your Oracle query to be more partition-friendly:
oracle_query = """(
SELECT /*+ parallel(50) */ DISTINCT
t1.column1, ..., t2.column28,
t1.date_col,
EXTRACT(YEAR FROM t1.date_col) as year_partition,
EXTRACT(MONTH FROM t1.date_col) as month_partition,
-- Add a numeric column for partitioning if available
ROW_NUMBER() OVER (ORDER BY t1.date_col) as row_id
FROM schema.table1 t1
LEFT JOIN schema.table2 t2
ON t1.key_col = t2.key_col
AND t1.date_col = t2.date_col
AND t2.date_col >= DATE '2024-01-01'
WHERE t1.date_col >= DATE '2024-01-01'
AND t1.date_col < DATE '2027-01-01' -- Use range for partitioning
)"""

Optimize Write Strategy:

# Optimized write for 980M records
spark.conf.set("spark.sql.files.maxPartitionBytes", "268435456")
spark.conf.set("spark.sql.parquet.compression.codec", "zstd")
spark.conf.set("spark.sql.parquet.block.size", "268435456")

final_df.coalesce(24) \ # Reduce from 100 partitions
.sortWithinPartitions("date_col") \ # Improve compression ratio
.write \
.partitionBy("year", "month") \
.option("maxRecordsPerFile", "20000000") \ # Larger files
.mode('overwrite') \
.format('parquet') \
.option('compression', 'zstd') \
.save(destination_path)

 

 

LR

Hi @lingareddy_Alva , Thank you very much for all the optimization techniques.

I have tried this technique but still same issue.

# Optimized write for 980M records
spark.conf.set("spark.sql.files.maxPartitionBytes", "268435456")
spark.conf.set("spark.sql.parquet.compression.codec", "zstd")
spark.conf.set("spark.sql.parquet.block.size", "268435456")

final_df.coalesce(24) \ # Reduce from 100 partitions
.sortWithinPartitions("date_col") \ # Improve compression ratio
.write \
.partitionBy("year", "month") \
.option("maxRecordsPerFile", "20000000") \ # Larger files
.mode('overwrite') \
.format('parquet') \
.option('compression', 'zstd') \
.save(destination_path)

While applying upper and lower bounds on date_col, I encountered the following error. I have tried to fix it but I am getting same issue. My date_col is a timestamp type. 

ORA-01861: literal does not match format string
df = spark.read \
.format("jdbc") \
.option("url", jdbc_url) \
.option("dbtable", oracle_query) \
.option("user", jdbc_username) \
.option("password", jdbc_password) \
.option("driver", driver_class) \
.option("fetchsize", "50000") \
.option("partitionColumn", "date_col") \
.option("lowerBound", "2025-05-01") \
.option("upperBound", "2025-07-31") \
.option("numPartitions", "5") 
.load()


df.limit(5).display() # Getting that error while displaying result.

And I did not try the oracle query as I am not able to understand few columns where to use them.

Modify your Oracle query to be more partition-friendly:
oracle_query = """(
SELECT /*+ parallel(50) */ DISTINCT
t1.column1, ..., t2.column28,
t1.date_col,
EXTRACT(YEAR FROM t1.date_col) as year_partition,  #1 where I can use this for partition
EXTRACT(MONTH FROM t1.date_col) as month_partition, #2 same
-- Add a numeric column for partitioning if available
ROW_NUMBER() OVER (ORDER BY t1.date_col) as row_id  #3 Where I can use this col. 
FROM schema.table1 t1
LEFT JOIN schema.table2 t2
ON t1.key_col = t2.key_col
AND t1.date_col = t2.date_col
AND t2.date_col >= DATE '2024-01-01'
WHERE t1.date_col >= DATE '2024-01-01'
AND t1.date_col < DATE '2027-01-01' 
)"""

Could you help me if you have any solution?

Thanks

chanukya-pekala
Contributor II

I can suggest few tweaks in the compute, the current D series is good enough, but we are handling huge data, please try bumping up minimum workers from 1 to at least 4; change the VM type - to a bigger one - Standard_E64ds_v5, and if not try to use a memory optimized instance too, the IO is high while extracting the data from external systems, we should see which area is fast, extraction process or writing process, accordingly better recomendation can be given. Try to find a better partition keys, run a groupby statement on the records against partition keys to check for distribution setup? Few ideas, please try! 


Chanukya

Hi @chanukya-pekala , Thank you for your suggestions. After making a few changes to the cluster as recommended, I observed a slight improvement in performance. However, I am still experiencing issues with the writing process.

lingareddy_Alva
Honored Contributor III

Hi @Raj_DB  

Since you're still facing write performance issues even after cluster optimization,
Just try below Staged Writing approach:

def staged_write_approach(df, destination_path):
    """Write in stages to avoid memory pressure"""
    
    # Stage 1: Write to temporary location without partitioning
    temp_path = f"{destination_path}_temp_{int(time.time())}"
    
    print("Stage 1: Writing to temporary location...")
    start_time = time.time()
    
    df.coalesce(16) \
        .write \
        .mode('overwrite') \
        .format('parquet') \
        .option('compression', 'zstd') \
        .option("maxRecordsPerFile", "15000000") \
        .save(temp_path)
    
    stage1_time = (time.time() - start_time) / 60
    print(f"Stage 1 completed in {stage1_time:.2f} minutes")
    
    # Stage 2: Read back and write with partitioning
    print("Stage 2: Reading back and applying partitioning...")
    start_time = time.time()
    
    temp_df = spark.read.parquet(temp_path)
    
    temp_df.write \
        .partitionBy("year", "month") \
        .mode('overwrite') \
        .format('parquet') \
        .option('compression', 'zstd') \
        .option("maxRecordsPerFile", "15000000") \
        .save(destination_path)
    
    stage2_time = (time.time() - start_time) / 60
    print(f"Stage 2 completed in {stage2_time:.2f} minutes")
    
    # Cleanup
    dbutils.fs.rm(temp_path, True)
    print(f"Total staged write time: {stage1_time + stage2_time:.2f} minutes")

# Use staged writing
staged_write_approach(final_df, destination_path)
LR