cancel
Showing results for 
Search instead for 
Did you mean: 
Technical Blog
Explore in-depth articles, tutorials, and insights on data analytics and machine learning in the Databricks Technical Blog. Stay updated on industry trends, best practices, and advanced techniques.
cancel
Showing results for 
Search instead for 
Did you mean: 
irinaplacinta
Databricks Employee
Databricks Employee

image (5).png

Authors:

  • Irina Placinta, Resident Solutions Architect @ Databricks     
  • Canan Girgin, Senior Solutions Consultant @ Databricks

Apache Spark is a powerful engine for big data processing, but small coding mistakes can lead to significant performance degradation. Many Spark users unknowingly introduce anti-patterns that increase execution cost, time, and even cause job failures.

This blog highlights some of the most common mistakes we see in Spark applications and provides best practices to help you write more efficient, scalable code.

1. Misusing df.count() in business logic or logs

Calling df.count() within the application logic, especially just to check if data is not empty before writing, can be expensive. This is because Spark uses lazy evaluation, and count() triggers the DataFrame to be materialized and a full job to be executed. This can unnecessarily increase execution time.

Recommendations:

  • Deciding whether to write is often redundant — appending an empty DataFrame works ok. If it fails due to a missing schema, use a try/except block around df.write().
try:
  df.write.mode("append").save(my_path)
except:
  log.error(f"Failed to write data to {my_path}.")
  .....
  • To log the results of table operations that append/merge, you can get the count from the table history after writing the data instead of running a count on the data frame.  This will be much more efficient.
history = spark.sql("DESCRIBE HISTORY my_catalog.my_schema.my_table")
non_empty_metrics = history.filter(history.operationMetrics.isNotNull())
latest = non_empty_metrics.orderBy("timestamp", ascending=False).first()
row_count = latest["operationMetrics"]["numOutputRows"]
  •  To log the results of path/table operations that overwrite data, you can get the number of records written by reading directly from the target path/table; for parquet/delta format, the count is available in the parquet’s metadata, making this count very efficient.
df.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("my_catalog.my_schema.my_table")

output_df = spark.read.table("my_catalog.my_schema.my_table")
row_count = output_df.count()

 

2. Creating Extremely Wide Tables (100+ Columns)

Whole-stage code generation (WholeStageCodegen) is a query optimization technique in Apache Spark's engine that significantly improves execution performance (default since Spark 2.0). However, what is less known is that it only applies if your dataframe/table has less than 100 columns [1, 2]. So if you have very wide tables, Spark falls back to a more generic and less efficient execution model. 

In order to spot whether your stage is using WholeStageCodegen, head over to the Spark UI (SQL/DataFrame tab / Text Execution summary):

Screenshot 2025-05-08 at 13.20.18.pngAlternatively:

  • Reduce the number of columns by creating multiple tables to hold the data. No data analyst will scroll through 1,000 columns to find the data they need.
  • Use MapType, ArrayType or Databricks Variant type columns to reduce the number of columns
  • Use Databricks clusters with Photon enabled (including Serverless). Photon does not use WholeStageCodegen, so it does not have this limitation.

As an experiment, we performed the same grouping operation on a very wide table of 1000 columns on clusters with the same configuration, with and without Photon. On the Photon cluster, the operation took 45% less time than on the non-Photon cluster.

irinaplacinta_2-1746706693531.png

 

3. Directly deleting files/tables instead of using table overwrites

A common anti-pattern when overwriting a table/path is to delete the data beforehand. Similarly, dropping and re-creating a table is also considered bad practice.

The intended reasons behind this anti-pattern are various, from saving on storage space, being sure the data is actually deleted, change of schema, etc. 

dbutils.fs.rm(table_path, recursive = True) # spark.sql(f"DROP TABLE {my_table}")
df.write.format("delta").mode("overwrite").saveAsTable(my_table)

This approach introduces a few issues:

  • Loss of data: in the case of delta tables, you lose the option to revert back to previous versions  in case of human or data error
  • Any processes reading the data will be interrupted and fail, and no new reads will be possible until the new data is written
  • Loss of table history and table lineage
  • Not being able to detect unwanted schema changes
  • Extra Unity Catalog permissions are needed to handle data directly from the storage account rather than managing permissions only at the table level; this introduces security risks
  • Given this is no longer an atomic operation, the process might run the delete first and then fail on the write, in which case the table/path will be empty, causing problems downstream

Alternatively, overwrite your table data and schema and vacuum regularly (or turn on predictive optimisation for UC-managed tables) to address storage concerns:

(df.write.format("delta") 
  .mode("overwrite").option("overwriteSchema", "true").saveAsTable(my_table))
# Vacuum often based on your business needs,reduce default retention period if needed
spark.sql("VACUUM my_table")

 

4. Calling withColumn() in a Loop

Overusing withColumn() by calling it in a loop to add many columns (usually as part of a metadata-driven framework) introduces a new projection each time, leading to massive execution plans, performance degradation and possibly a StackOverflowException.

for i in range(no_columns)
  base_df = base_df.withColumn(f"id_{i}", lit(i * 10))

irinaplacinta_3-1746707338595.png

Alternatively, use select() with multiple columns, selectExp(), withColumns() or programatically build a SQL statement.

for i in range(no_columns)
  base_df = base_df.select("*", lit(i * 10).alias(f"id_{i}"))

We ran an experiment on a data frame with 100,000 rows. We found that adding 70 columns in a loop using withColumn() takes 3 minutes. Replacing withColumn() with select(), as described above, improved performance by 179%, taking only 41 seconds. The sweet spot of the number of columns to add in a loop you will see performance improvements between the two approaches is based on the number of rows in the data frame, the number of existing columns and cluster size; for our experiments, it varies from 40+ columns to 70+.

 

5. Use of repartition(1) or coalesce(1)

This pattern is often seen when developers attempt to control the number of output files. While reducing file counts may be a business request (e.g. for compatibility with downstream systems), it’s rarely a strong enough reason to sacrifice scalability and performance.

Using repartition(n) with a small number without considering data size can cause inefficient execution, making the job run very slow and introducing a risk of out-of-memory (OOM) errors; it also increases the likelihood that the cluster will not be fully utilised. 

Using repartition(1) involves a full shuffle of the data across the cluster. Since it performs a full shuffle, this operation can be expensive for large datasets as it requires communication between executors to redistribute data.

Calling coalesce(1) to force a single output file might create severe performance issues by concentrating data on a single CPU core, preventing you from leveraging parallelism. You can easily identify this in the Spark UI / Stages, by seeing only one task in the timeline and metrics.

irinaplacinta_0-1746707486560.png

Alternatively:

  • Let Spark determine optimal partitioning based on the data volume and cluster configuration, AQE will dynamically coalesce shuffle partitions 
  • To optimize file size, consider using the Optimize command, Auto Optimize (which improves file sizes through delta file compaction, & optimized writes, typically targeting default ~128 MB files without producing a single output file) or predictive optimisation for UC managed tables.
  • Avoid using repartition(1) unless you have a truly critical requirement for a single output file — and even then, challenge that need
  • Avoid using coalesce(1); if you absolutely need a single output file - repartition(1) will give you a better performance

 

6. Applying dropDuplicates() or distinct on all columns

Removing duplicates is a common operation in Spark workflows, but applying dropDuplicates() or distinct() across all columns without considering which ones are truly necessary for uniqueness can lead to major performance issues, increasing memory pressure and execution time:

  • Both dropDuplicates() and distinct() trigger a full shuffle of the dataset, which is expensive.
  • When applied across all columns — especially in wide tables — the shuffle size increases significantly.

Alternatively, apply selective deduplication using only key columns that define uniqueness:

# Only deduplicate based on business keys
deduped_df = df.dropDuplicates(["user_id", "event_time"])

If you’re unsure which columns to de-duplicate on:

  • Consult data owners for business logic
  • Explore the data by logging the duplicate counts per key before and after deduplication to understand the duplication keys

 

7. Using dropDuplicates() in streaming applications without watermarking

In structured streaming, calling dropDuplicates without a watermark will cause unbounded state growth, eventually leading to OOM. This is because, in the context of a stream, dropDuplicates maintains all data across triggers as an intermediate state to compare incoming data for duplicates. 

Recommendations:

  • Use event time watermarks to limit how late duplicate data can arrive and shrink the state store. A detailed guide can be found here.
(streaming_df
  .withWatermark("event_time", "1 HOUR")
  .dropDuplicates("id", "event_time"))
# OR .dropDuplicatesWithinWatermark("id", "event_time")
  • If the objective is to maintain deduplicated data in a table, you can also achieve this by deduplicating against the destination table using the MERGE INTO operation inside “foreachBatch”; if there are duplicates in the source, performing dropDuplicates() within the foreachBatch function will not require a watermark.
def merge_function(df_batch: DataFrame, batch_id):
	df_batch = df_batch.dropDuplicates("id")
	df_batch.sparkSession.sql("""
		MERGE INTO target_table
		USING df_batch
		.........
	""")
streaming_df
	.writeStream
	.foreachBatch(merge_function)
	....

 

8. Using cache()

Although Spark’s cache()  helps avoid recomputing transformations by storing data frames in executor memory or disk, it can also introduce problems such as out-of-memory errors when the data exceeds available resources, or stale results if the underlying data changes outside the current process.

java.lang.OutOfMemoryError: GC overhead limit exceeded
  at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
  at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)

Additionally, users frequently neglect to call unpersist() after the dataset is no longer needed, which can further strain cluster resources.

Alternatively:

  • Use Databricks disk caching - this saves local copies of the data read, making successive reads faster; note it only works for delta/parquet files
  • For complex lineage of transformations that take significant time, save intermediate results to your storage account in delta format
  • Let Spark recompute the whole lineage

The overhead of removing cache() and using one of the options above might decrease your performance, but it will produce consistent results and prevent your job from failing with OOM due to data not fitting into memory/disk. 

 

9. Using display(), collect() or print(df.count()) in Production

Functions like display(), collect(), and print(df.count()) are often used during interactive development or debugging, and that’s exactly where they should stay. Including them in production pipelines is a common anti-pattern.

These functions are not inherently wrong — they are simply not safe for scale

  • collect() brings all data from executors to the driver node, which can cause driver memory overflow (OOM errors) on large datasets.
  • display() is designed for notebooks and user interfaces, not scalable batch or streaming jobs, especially if done against complex data frames
  • print(df.count()) triggers a full action that might get executed repeatedly, adding unnecessary overhead to pipelines.

Production jobs should be clean, headless, and observable through logs and metrics, not developer-focused outputs. Instead, leverage logging frameworks and monitoring tools to track pipeline execution.

 

10. Overusing explode() to flatten XML/JSON files

Applying explode() creates a considerable increase in row count, causing the size of partitions to grow substantially. For an array of 1000 items, exploding it will cause a 1000x increase in the number of rows. This might cause data spill, making your application run very slowly or fail with OOM. This problem is often seen when flattening XML or JSON files.

irinaplacinta_0-1746709487255.png

Recommendations:

  • Make use of Databricks variant type to understand your data before explosion; much too often, all arrays in a DF are exploded without needing the data downstream
  • Drop the exploded column immediately after the explosion to avoid running into memory issues if someone refers to it or uses all columns
  • Repartition your data frame to a higher number right before the explode() operation in order to decrease your partition size
df = df.repartition(higher_number_of_partitions)
df = df.select(col("id"), explode(col("array_col"))
  • If explode() is equivalent to generating a cartesian product on your data (due to the way the data is structured), divide your data into intermediate dataframes (one with the column that needs to be exploded + join column and the other with the rest of the columns), write the results to delta tables and join them; this will improve your partition size and therefore your performance.

Instead of:

df = df.select(col("id"), col("other_col"), explode(col("array_col"))

Use this:

df.select("id", explode("array_col")).write.saveAsTable("table1")
df.select("id", "other_col").write.saveAsTable("table2")
table1 = spark.read.table("table1")
table2 = spark.read.table("table2")
flattened_result = table1.join(table2, on="id")

 

Conclusion: Write Smarter Spark Code for Better Performance

Avoiding these common mistakes can significantly enhance the efficiency and stability of Spark applications. By adopting best practices, teams can reduce execution time, lower costs, and build more scalable data pipelines.

Do you want to optimise your Spark workloads? Review your code for these anti-patterns and implement best practices today!

1 Comment