cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

How to partition JDBC Oracle read query and cast with TO_DATE on partition date field?

joshuat
New Contributor II

I'm attempting to fetch an Oracle Netsuite table in parallel via JDBC using the Netsuite Connect JAR, already installed on the cluster and setup correctly. I can do successfully with a single-threaded approach using the `dbtable` option:

table = 'Transaction' 
df = spark.read.format("jdbc") \
.option("url", jdbc_url) \
.option("driver", jdbc_driver) \
.option("dbtable", table) \
.load()

 I'd like to partition the fetch on a date field, but this code receives an error - java.sql.SQLSyntaxErrorException: [NetSuite][SuiteAnalytics Connect JDBC Driver][OpenAccess SDK SQL Engine]Syntax Error in the SQL statement.[10104]: 

lower_bound = "2024-01-01"
upper_bound = "2024-03-31"
query = f"""
(
SELECT * FROM {table}
WHERE TO_DATE(lastmodifieddate, 'yyyy-MM-dd') >= TO_DATE('{lower_bound}', 'yyyy-MM-dd')
AND TO_DATE(lastmodifieddate, 'yyyy-MM-dd') <= TO_DATE('{upper_bound}', 'yyyy-MM-dd')
) AS t
"""
df = spark.read.format("jdbc") \
.option("url", jdbc_url) \
.option("driver", jdbc_driver) \
.option("dbtable", query) \
.option("partitionColumn", "lastmodifieddate") \
.option("lowerBound", lower_bound) \
.option("upperBound", upper_bound) \
.option("numPartitions", 10) \
.option("fetchsize", 1000) \
.option("sessionInitStatement", "ALTER SESSION SET NLS_DATE_FORMAT = 'YYYY-MM-DD'") \
.load()

I believe this is because the string is not being cast to a date using TO_DATE because of how partitioning works in Spark. Oracle/Netsuite wants the strings cast to dates in the SQL statement. Sure enough, the Spark debug logs show that no TO_DATE is being applied to the WHERE clauses: 

INFO JDBCRelation: Number of partitions: 10, WHERE clauses of these partitions: "lastmodifieddate" < '2024-01-09 23:54:00' or "lastmodifieddate" is null...

How do I address this?

3 REPLIES 3

Kaniz_Fatma
Community Manager
Community Manager

Hi @joshuat

  1. To query a specific partition of an Oracle table, you can use the PARTITION FOR clause. Here’s an example of how you can do it:

    SELECT *
    FROM your_table PARTITION FOR (DATE '2024-07-24')
    WHERE customer = 'FooBar'
      AND insert_date BETWEEN TO_DATE('2024-07-24 00:00:00', 'YYYY-MM-DD HH24:MI:SS')
                         AND TO_DATE('2024-07-24 23:59:59', 'YYYY-MM-DD HH24:MI:SS');
    

    Replace your_table, customer, and insert_date with your actual table name, customer condition, and d...1.

  2. To achieve date-based partitioning in Spark using JDBC, you can follow these steps:

    • Ensure that your Oracle/Netsuite date column is in a format compatible with Spark (e.g., yyyy-MM-dd).
    • Use the partitionColumn, lowerBound, and upperBound options to specify the date range.
    • Set the sessionInitStatement to ensure the correct date format.

    Your code snippet looks almost correct.

  3. If you’re still encountering issues, check the following:

    • Ensure that the Oracle/Netsuite date column is indeed a valid date field.
    • Verify that the JDBC URL, driver, and other connection settings are correct.
    • Review the Spark logs for any additional error messages.

    By addressing these points, you should be able to successfully partition your query based on the date field. Let me know if you need further assistance! 😊

 

joshuat
New Contributor II

Thank you for your reply. The answers and the Stack Overflow post you link appear to be heading off in the wrong direction. I don't have a named partition in the source Netsuite table. Additionally, the source table isn't partitioned by a date field. Lastly, the answer SQL you and SO provide only covers one hardcoded day of data: 

PARTITION FOR (DATE '2024-07-24')

My time range spans 3 months, so I would need PARTITION FOR parameterized. The Spark logs don't help, and the Netsuite logs merely give this information: "Invalid Month."

I'd appreciate it if you could propose a code refactor that fits my use case more closely. Thank you!

mtajmouati
New Contributor

Hello,

To address the issue

Step 1: Load Data Without Partitioning

table = 'Transaction'
lower_bound = "2024-01-01"
upper_bound = "2024-03-31"

query = f"""
(
SELECT * FROM {table}
WHERE TO_DATE(lastmodifieddate, 'yyyy-MM-dd') >= TO_DATE('{lower_bound}', 'yyyy-MM-dd')
AND TO_DATE(lastmodifieddate, 'yyyy-MM-dd') <= TO_DATE('{upper_bound}', 'yyyy-MM-dd')
) AS t
"""

df = spark.read.format("jdbc") \
 .option("url", jdbc_url) \
 .option("driver", jdbc_driver) \
 .option("dbtable", query) \
 .option("fetchsize", 1000) \
 .option("sessionInitStatement", "ALTER SESSION SET NLS_DATE_FORMAT = 'YYYY-MM-DD'") \
 .load()

Step 2: Repartition the Data Within Spark Once you have the DataFrame df loaded, you can repartition it based on the lastmodifieddate field:

# Convert the 'lastmodifieddate' column to date type if it's not already
from pyspark.sql.functions import col, to_date

df = df.withColumn("lastmodifieddate", to_date(col("lastmodifieddate"), "yyyy-MM-dd"))

# Repartition the DataFrame
num_partitions = 10
df_repartitioned = df.repartition(num_partitions, "lastmodifieddate")

# Now you can proceed with further processing on df_repartitioned
df_repartitioned.show()

 

 

 

 

 

Mehdi TAJMOUATI
https://www.wytasoft.com/wytasoft-group/
Join 100K+ Data Experts: Register Now & Grow with Us!

Excited to expand your horizons with us? Click here to Register and begin your journey to success!

Already a member? Login and join your local regional user group! If there isn’t one near you, fill out this form and we’ll create one for you to join!