cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

BQ partition data deleted fully even though 'spark.sql.sources.partitionOverwriteMode' is DYNAMIC

soumiknow
New Contributor III

We have a date (DD/MM/YYYY) partitioned BQ table. We want to update a specific partition data in 'overwrite' mode using PySpark. So to do this, I applied 'spark.sql.sources.partitionOverwriteMode' to 'DYNAMIC' as per the spark bq connector documentation. But still it deleted the other partitioned data which should not be happening.

 

 

df_with_partition.write.format("bigquery") \
                .option("table", f"{bq_table_full}") \
                .option("partitionField", f"{partition_date}") \
                .option("partitionType", f"{bq_partition_type}") \
                .option("temporaryGcsBucket", f"{temp_gcs_bucket}") \
                .option("spark.sql.sources.partitionOverwriteMode", "DYNAMIC") \
                .option("writeMethod", "indirect") \
                .mode("overwrite") \
                .save()

 

 

Can anyone please suggest me what I am doing wrong or how to implement this dynamic partitionOverwriteMode. Many thanks.
#pyspark #overwrite #partition #dynamic #bigquery

9 REPLIES 9

Walter_C
Databricks Employee
Databricks Employee

It appears that the issue is related to the behavior of the Spark BigQuery connector and how it handles partition overwrites. Here are a few points to consider:

  1. Ensure that the configuration setting spark.sql.sources.partitionOverwriteMode is correctly applied. This can be set at the session level using spark.conf.set("spark.sql.sources.partitionOverwriteMode", "DYNAMIC") before performing the write operation.

  2. If the dynamic partition overwrite mode is not working as expected, you might consider using the replaceWhere option as an alternative. This option allows you to specify a condition to selectively overwrite data. For example:
df_with_partition.write.format("bigquery") \
    .option("table", f"{bq_table_full}") \
    .option("partitionField", f"{partition_date}") \
    .option("partitionType", f"{bq_partition_type}") \
    .option("temporaryGcsBucket", f"{temp_gcs_bucket}") \
    .option("replaceWhere", f"{partition_date} = 'specific_date'") \
    .mode("overwrite") \
    .save()

 

soumiknow
New Contributor III

Hi @Walter_C ,

Thank you for your response over this issue.

I tried both the options:
- setting up the spark configuration at session level, but did not work
- used the replaceWhere option for a specific partition date, but that also did not work.

In both the cases, all bq table records are getting overwritten or deleted for all partition dates which is not acceptable.

I checked the spark bq connector documentation page, I could not find the 'replaceWhere' option as well.

Walter_C
Databricks Employee
Databricks Employee

Got it, can you also test the following code:

df_with_partition.write.format("bigquery") \
    .option("table", f"{bq_table_full}") \
    .option("partitionField", f"{partition_date}") \
    .option("partitionType", f"{bq_partition_type}") \
    .option("temporaryGcsBucket", f"{temp_gcs_bucket}") \
    .option("partitionOverwriteMode", "dynamic") \
    .option("writeMethod", "indirect") \
    .mode("overwrite") \
    .save()

soumiknow
New Contributor III

I already tried this before, which was not worked. Then I tried with 'DYNAMIC' instead of 'dynamic'. But no luck.

One thing I found that, even though I set the 'spark.sql.sources.partitionOverwriteMode' to 'DYNAMIC' in cluster advance options or even in the notebook before writer dataframe, in all the cases, this partitionOverwriteMode property is not being actually set up in the Spark UI SQL/Dataframe Properties whereas I can see the other properties very well which I set up during cluster creation or even in the notebook.

So this could be the problem with 'spark.sql.sources.partitionOverwriteMode' property set up within the databricks cluster. Any idea how to overcome this?

soumiknow
New Contributor III

'spark.sql.sources.partitionOverwriteMode' is visible under Spark UI Environment tab. So this is not the issue.

Walter_C
Databricks Employee
Databricks Employee

I reviewed this with an Spark resource, seems that for this Indirect method will be required, you can follow information in https://github.com/GoogleCloudDataproc/spark-bigquery-connector?tab=readme-ov-file#indirect-write 

soumiknow
New Contributor III

I raised an issue to spark-biquery-connector where they mentioned to use 'spark-3.5-bigauery-0.41.0.jar' whereas I was using Databricks Runtime Version: 15.4 LTS (includes Apache Spark 3.5.0, Scala 2.12) which include spark-bigquery-with-dependencies_2.12-0.41.0.jar.

So, I tried this:

spark_v1 = SparkSession.builder \
    .appName("SampleSparkSession") \
    .config("spark.jars.packages", "/Workspace/Users/xxxxxxx@xxxx.xxxx/spark-3.5-bigquery-0.41.0.jar") \
    .config("spark.jars.excludes", "/databricks/jars/spark-bigquery-with-dependencies_2.12-0.41.0.jar") \
    .getOrCreate()

But apply the above include and exclude the packages using spark config, the jars not even included or excluded in the actual running cluster. (by checking the Spark UI System Properties)

So, any idea how these jars can be included or excluded to the Databricks cluster which is using Databricks Runtime Version: 15.4 LTS.

Walter_C
Databricks Employee
Databricks Employee

 

  • Remove the Default JAR:

    • Since spark-bigquery-with-dependencies_2.12-0.41.0.jar is included by default in Databricks Runtime 15.4 LTS, and you need to exclude it. This can be done by creating an init script to remove the JAR file from the cluster.
  • Create an Init Script:

    • Create an init script that removes the default spark-bigquery-with-dependencies_2.12-0.41.0.jar from the cluster. Here is an example of what the script might look like:

      #!/bin/bash
      rm /databricks/jars/spark-bigquery-with-dependencies_2.12-0.41.0.jar
  • Upload the Init Script:

    • Upload this script to a location accessible by Databricks, such as DBFS (Databricks File System).
  • Configure the Cluster to Use the Init Script:

    • Go to the cluster configuration page in Databricks.
    • Under the "Advanced Options" section, find the "Init Scripts" tab.
    • Add the path to your init script (e.g., dbfs:/path/to/your/init-script.sh).
  • Add the Custom JAR:

    • Upload the spark-3.5-bigquery-0.41.0.jar to DBFS or another accessible location.
    • In the cluster configuration, go to the "Libraries" tab.
    • Choose "Install New" and select "DBFS" or the appropriate option where your JAR is stored.
    • Provide the path to the spark-3.5-bigquery-0.41.0.jar.
  • Restart the Cluster:

    • Restart the cluster to apply the changes. The init script will run, removing the default JAR, and the new JAR will be added to the cluster.

 

soumiknow
New Contributor III

The init_script worked to remove the unwanted jars from the cluster. This is good news.

But whenever I tried to install the required jar on cluster configuration using library source as GCS path (gs://spark-lib/bigquery/spark-3.5-bigquery-0.41.0.jar), the UI shows that the jar is installed. Then when I execute the notebook to run the spark job, it returned me the below error which indicate that either the spark session unable to identify the spark-bigquery-connector jar or the jar is not installed successfully.

Py4JJavaError: An error occurred while calling o574.save.
: org.apache.spark.SparkClassNotFoundException: [DATA_SOURCE_NOT_FOUND] Failed to find the data source: com.google.cloud.spark.bigquery.BigQueryRelationProvider. Make sure the provider name is correct and the package is properly registered and compatible with your Spark version. SQLSTATE: 42K02

I tried to verify whether the jar was installed on the cluster successfully or not by checking the Spark UI classpath properties or even other properties. But could not found such. Screenshot attached below to refer that the jar was installed from GCS path.

soumiknow_0-1734937665284.png

 

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group