Friday
I'm attempting to fetch an Oracle Netsuite table in parallel via JDBC using the Netsuite Connect JAR, already installed on the cluster and setup correctly. I can do successfully with a single-threaded approach using the `dbtable` option:
table = 'Transaction'
df = spark.read.format("jdbc") \
.option("url", jdbc_url) \
.option("driver", jdbc_driver) \
.option("dbtable", table) \
.load()
I'd like to partition the fetch on a date field, but this code receives an error - java.sql.SQLSyntaxErrorException: [NetSuite][SuiteAnalytics Connect JDBC Driver][OpenAccess SDK SQL Engine]Syntax Error in the SQL statement.[10104]:
lower_bound = "2024-01-01"
upper_bound = "2024-03-31"
query = f"""
(
SELECT * FROM {table}
WHERE TO_DATE(lastmodifieddate, 'yyyy-MM-dd') >= TO_DATE('{lower_bound}', 'yyyy-MM-dd')
AND TO_DATE(lastmodifieddate, 'yyyy-MM-dd') <= TO_DATE('{upper_bound}', 'yyyy-MM-dd')
) AS t
"""
df = spark.read.format("jdbc") \
.option("url", jdbc_url) \
.option("driver", jdbc_driver) \
.option("dbtable", query) \
.option("partitionColumn", "lastmodifieddate") \
.option("lowerBound", lower_bound) \
.option("upperBound", upper_bound) \
.option("numPartitions", 10) \
.option("fetchsize", 1000) \
.option("sessionInitStatement", "ALTER SESSION SET NLS_DATE_FORMAT = 'YYYY-MM-DD'") \
.load()
I believe this is because the string is not being cast to a date using TO_DATE because of how partitioning works in Spark. Oracle/Netsuite wants the strings cast to dates in the SQL statement. Sure enough, the Spark debug logs show that no TO_DATE is being applied to the WHERE clauses:
INFO JDBCRelation: Number of partitions: 10, WHERE clauses of these partitions: "lastmodifieddate" < '2024-01-09 23:54:00' or "lastmodifieddate" is null...
How do I address this?
yesterday
Hi @joshuat,
To query a specific partition of an Oracle table, you can use the PARTITION FOR
clause. Here’s an example of how you can do it:
SELECT *
FROM your_table PARTITION FOR (DATE '2024-07-24')
WHERE customer = 'FooBar'
AND insert_date BETWEEN TO_DATE('2024-07-24 00:00:00', 'YYYY-MM-DD HH24:MI:SS')
AND TO_DATE('2024-07-24 23:59:59', 'YYYY-MM-DD HH24:MI:SS');
Replace your_table
, customer
, and insert_date
with your actual table name, customer condition, and d...1.
To achieve date-based partitioning in Spark using JDBC, you can follow these steps:
yyyy-MM-dd
).partitionColumn
, lowerBound
, and upperBound
options to specify the date range.sessionInitStatement
to ensure the correct date format.Your code snippet looks almost correct.
If you’re still encountering issues, check the following:
By addressing these points, you should be able to successfully partition your query based on the date field. Let me know if you need further assistance! 😊
4 hours ago - last edited 4 hours ago
Thank you for your reply. The answers and the Stack Overflow post you link appear to be heading off in the wrong direction. I don't have a named partition in the source Netsuite table. Additionally, the source table isn't partitioned by a date field. Lastly, the answer SQL you and SO provide only covers one hardcoded day of data:
PARTITION FOR (DATE '2024-07-24')
My time range spans 3 months, so I would need PARTITION FOR parameterized. The Spark logs don't help, and the Netsuite logs merely give this information: "Invalid Month."
I'd appreciate it if you could propose a code refactor that fits my use case more closely. Thank you!
an hour ago
Hello,
To address the issue
Step 1: Load Data Without Partitioning
table = 'Transaction'
lower_bound = "2024-01-01"
upper_bound = "2024-03-31"
query = f"""
(
SELECT * FROM {table}
WHERE TO_DATE(lastmodifieddate, 'yyyy-MM-dd') >= TO_DATE('{lower_bound}', 'yyyy-MM-dd')
AND TO_DATE(lastmodifieddate, 'yyyy-MM-dd') <= TO_DATE('{upper_bound}', 'yyyy-MM-dd')
) AS t
"""
df = spark.read.format("jdbc") \
.option("url", jdbc_url) \
.option("driver", jdbc_driver) \
.option("dbtable", query) \
.option("fetchsize", 1000) \
.option("sessionInitStatement", "ALTER SESSION SET NLS_DATE_FORMAT = 'YYYY-MM-DD'") \
.load()
Step 2: Repartition the Data Within Spark Once you have the DataFrame df loaded, you can repartition it based on the lastmodifieddate field:
# Convert the 'lastmodifieddate' column to date type if it's not already
from pyspark.sql.functions import col, to_date
df = df.withColumn("lastmodifieddate", to_date(col("lastmodifieddate"), "yyyy-MM-dd"))
# Repartition the DataFrame
num_partitions = 10
df_repartitioned = df.repartition(num_partitions, "lastmodifieddate")
# Now you can proceed with further processing on df_repartitioned
df_repartitioned.show()
Excited to expand your horizons with us? Click here to Register and begin your journey to success!
Already a member? Login and join your local regional user group! If there isn’t one near you, fill out this form and we’ll create one for you to join!