2 weeks ago
We have a date (DD/MM/YYYY) partitioned BQ table. We want to update a specific partition data in 'overwrite' mode using PySpark. So to do this, I applied 'spark.sql.sources.partitionOverwriteMode' to 'DYNAMIC' as per the spark bq connector documentation. But still it deleted the other partitioned data which should not be happening.
df_with_partition.write.format("bigquery") \
.option("table", f"{bq_table_full}") \
.option("partitionField", f"{partition_date}") \
.option("partitionType", f"{bq_partition_type}") \
.option("temporaryGcsBucket", f"{temp_gcs_bucket}") \
.option("spark.sql.sources.partitionOverwriteMode", "DYNAMIC") \
.option("writeMethod", "indirect") \
.mode("overwrite") \
.save()
Can anyone please suggest me what I am doing wrong or how to implement this dynamic partitionOverwriteMode. Many thanks.
#pyspark #overwrite #partition #dynamic #bigquery
2 weeks ago
It appears that the issue is related to the behavior of the Spark BigQuery connector and how it handles partition overwrites. Here are a few points to consider:
Ensure that the configuration setting spark.sql.sources.partitionOverwriteMode is correctly applied. This can be set at the session level using spark.conf.set("spark.sql.sources.partitionOverwriteMode", "DYNAMIC") before performing the write operation.
df_with_partition.write.format("bigquery") \
.option("table", f"{bq_table_full}") \
.option("partitionField", f"{partition_date}") \
.option("partitionType", f"{bq_partition_type}") \
.option("temporaryGcsBucket", f"{temp_gcs_bucket}") \
.option("replaceWhere", f"{partition_date} = 'specific_date'") \
.mode("overwrite") \
.save()
2 weeks ago - last edited 2 weeks ago
Hi @Walter_C ,
Thank you for your response over this issue.
I tried both the options:
- setting up the spark configuration at session level, but did not work
- used the replaceWhere option for a specific partition date, but that also did not work.
In both the cases, all bq table records are getting overwritten or deleted for all partition dates which is not acceptable.
I checked the spark bq connector documentation page, I could not find the 'replaceWhere' option as well.
2 weeks ago
Got it, can you also test the following code:
df_with_partition.write.format("bigquery") \
.option("table", f"{bq_table_full}") \
.option("partitionField", f"{partition_date}") \
.option("partitionType", f"{bq_partition_type}") \
.option("temporaryGcsBucket", f"{temp_gcs_bucket}") \
.option("partitionOverwriteMode", "dynamic") \
.option("writeMethod", "indirect") \
.mode("overwrite") \
.save()
2 weeks ago
I already tried this before, which was not worked. Then I tried with 'DYNAMIC' instead of 'dynamic'. But no luck.
One thing I found that, even though I set the 'spark.sql.sources.partitionOverwriteMode' to 'DYNAMIC' in cluster advance options or even in the notebook before writer dataframe, in all the cases, this partitionOverwriteMode property is not being actually set up in the Spark UI SQL/Dataframe Properties whereas I can see the other properties very well which I set up during cluster creation or even in the notebook.
So this could be the problem with 'spark.sql.sources.partitionOverwriteMode' property set up within the databricks cluster. Any idea how to overcome this?
2 weeks ago
2 weeks ago
I reviewed this with an Spark resource, seems that for this Indirect method will be required, you can follow information in https://github.com/GoogleCloudDataproc/spark-bigquery-connector?tab=readme-ov-file#indirect-write
2 weeks ago
I raised an issue to spark-biquery-connector where they mentioned to use 'spark-3.5-bigauery-0.41.0.jar' whereas I was using Databricks Runtime Version: 15.4 LTS (includes Apache Spark 3.5.0, Scala 2.12) which include spark-bigquery-with-dependencies_2.12-0.41.0.jar.
So, I tried this:
spark_v1 = SparkSession.builder \
.appName("SampleSparkSession") \
.config("spark.jars.packages", "/Workspace/Users/xxxxxxx@xxxx.xxxx/spark-3.5-bigquery-0.41.0.jar") \
.config("spark.jars.excludes", "/databricks/jars/spark-bigquery-with-dependencies_2.12-0.41.0.jar") \
.getOrCreate()
But apply the above include and exclude the packages using spark config, the jars not even included or excluded in the actual running cluster. (by checking the Spark UI System Properties)
So, any idea how these jars can be included or excluded to the Databricks cluster which is using Databricks Runtime Version: 15.4 LTS.
2 weeks ago
Remove the Default JAR:
spark-bigquery-with-dependencies_2.12-0.41.0.jar
is included by default in Databricks Runtime 15.4 LTS, and you need to exclude it. This can be done by creating an init script to remove the JAR file from the cluster.Create an Init Script:
Create an init script that removes the default spark-bigquery-with-dependencies_2.12-0.41.0.jar
from the cluster. Here is an example of what the script might look like:
#!/bin/bash
rm /databricks/jars/spark-bigquery-with-dependencies_2.12-0.41.0.jar
Upload the Init Script:
Configure the Cluster to Use the Init Script:
dbfs:/path/to/your/init-script.sh
).Add the Custom JAR:
spark-3.5-bigquery-0.41.0.jar
to DBFS or another accessible location.spark-3.5-bigquery-0.41.0.jar
.Restart the Cluster:
Sunday
The init_script worked to remove the unwanted jars from the cluster. This is good news.
But whenever I tried to install the required jar on cluster configuration using library source as GCS path (gs://spark-lib/bigquery/spark-3.5-bigquery-0.41.0.jar), the UI shows that the jar is installed. Then when I execute the notebook to run the spark job, it returned me the below error which indicate that either the spark session unable to identify the spark-bigquery-connector jar or the jar is not installed successfully.
Py4JJavaError: An error occurred while calling o574.save.
: org.apache.spark.SparkClassNotFoundException: [DATA_SOURCE_NOT_FOUND] Failed to find the data source: com.google.cloud.spark.bigquery.BigQueryRelationProvider. Make sure the provider name is correct and the package is properly registered and compatible with your Spark version. SQLSTATE: 42K02
I tried to verify whether the jar was installed on the cluster successfully or not by checking the Spark UI classpath properties or even other properties. But could not found such. Screenshot attached below to refer that the jar was installed from GCS path.
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group