Authors:
Apache Spark is a powerful engine for big data processing, but small coding mistakes can lead to significant performance degradation. Many Spark users unknowingly introduce anti-patterns that increase execution cost, time, and even cause job failures.
This blog highlights some of the most common mistakes we see in Spark applications and provides best practices to help you write more efficient, scalable code.
Calling df.count() within the application logic, especially just to check if data is not empty before writing, can be expensive. This is because Spark uses lazy evaluation, and count() triggers the DataFrame to be materialized and a full job to be executed. This can unnecessarily increase execution time.
Recommendations:
try:
df.write.mode("append").save(my_path)
except:
log.error(f"Failed to write data to {my_path}.")
.....
history = spark.sql("DESCRIBE HISTORY my_catalog.my_schema.my_table")
non_empty_metrics = history.filter(history.operationMetrics.isNotNull())
latest = non_empty_metrics.orderBy("timestamp", ascending=False).first()
row_count = latest["operationMetrics"]["numOutputRows"]
df.write.format("delta") \
.mode("overwrite") \
.saveAsTable("my_catalog.my_schema.my_table")
output_df = spark.read.table("my_catalog.my_schema.my_table")
row_count = output_df.count()
Whole-stage code generation (WholeStageCodegen) is a query optimization technique in Apache Spark's engine that significantly improves execution performance (default since Spark 2.0). However, what is less known is that it only applies if your dataframe/table has less than 100 columns [1, 2]. So if you have very wide tables, Spark falls back to a more generic and less efficient execution model.
In order to spot whether your stage is using WholeStageCodegen, head over to the Spark UI (SQL/DataFrame tab / Text Execution summary):
Alternatively:
As an experiment, we performed the same grouping operation on a very wide table of 1000 columns on clusters with the same configuration, with and without Photon. On the Photon cluster, the operation took 45% less time than on the non-Photon cluster.
A common anti-pattern when overwriting a table/path is to delete the data beforehand. Similarly, dropping and re-creating a table is also considered bad practice.
The intended reasons behind this anti-pattern are various, from saving on storage space, being sure the data is actually deleted, change of schema, etc.
dbutils.fs.rm(table_path, recursive = True) # spark.sql(f"DROP TABLE {my_table}")
df.write.format("delta").mode("overwrite").saveAsTable(my_table)
This approach introduces a few issues:
Alternatively, overwrite your table data and schema and vacuum regularly (or turn on predictive optimisation for UC-managed tables) to address storage concerns:
(df.write.format("delta")
.mode("overwrite").option("overwriteSchema", "true").saveAsTable(my_table))
# Vacuum often based on your business needs,reduce default retention period if needed
spark.sql("VACUUM my_table")
Overusing withColumn() by calling it in a loop to add many columns (usually as part of a metadata-driven framework) introduces a new projection each time, leading to massive execution plans, performance degradation and possibly a StackOverflowException.
for i in range(no_columns)
base_df = base_df.withColumn(f"id_{i}", lit(i * 10))
Alternatively, use select() with multiple columns, selectExp(), withColumns() or programatically build a SQL statement.
for i in range(no_columns)
base_df = base_df.select("*", lit(i * 10).alias(f"id_{i}"))
We ran an experiment on a data frame with 100,000 rows. We found that adding 70 columns in a loop using withColumn() takes 3 minutes. Replacing withColumn() with select(), as described above, improved performance by 179%, taking only 41 seconds. The sweet spot of the number of columns to add in a loop you will see performance improvements between the two approaches is based on the number of rows in the data frame, the number of existing columns and cluster size; for our experiments, it varies from 40+ columns to 70+.
This pattern is often seen when developers attempt to control the number of output files. While reducing file counts may be a business request (e.g. for compatibility with downstream systems), it’s rarely a strong enough reason to sacrifice scalability and performance.
Using repartition(n) with a small number without considering data size can cause inefficient execution, making the job run very slow and introducing a risk of out-of-memory (OOM) errors; it also increases the likelihood that the cluster will not be fully utilised.
Using repartition(1) involves a full shuffle of the data across the cluster. Since it performs a full shuffle, this operation can be expensive for large datasets as it requires communication between executors to redistribute data.
Calling coalesce(1) to force a single output file might create severe performance issues by concentrating data on a single CPU core, preventing you from leveraging parallelism. You can easily identify this in the Spark UI / Stages, by seeing only one task in the timeline and metrics.
Alternatively:
Removing duplicates is a common operation in Spark workflows, but applying dropDuplicates() or distinct() across all columns without considering which ones are truly necessary for uniqueness can lead to major performance issues, increasing memory pressure and execution time:
Alternatively, apply selective deduplication using only key columns that define uniqueness:
# Only deduplicate based on business keys
deduped_df = df.dropDuplicates(["user_id", "event_time"])
If you’re unsure which columns to de-duplicate on:
In structured streaming, calling dropDuplicates without a watermark will cause unbounded state growth, eventually leading to OOM. This is because, in the context of a stream, dropDuplicates maintains all data across triggers as an intermediate state to compare incoming data for duplicates.
Recommendations:
(streaming_df
.withWatermark("event_time", "1 HOUR")
.dropDuplicates("id", "event_time"))
# OR .dropDuplicatesWithinWatermark("id", "event_time")
def merge_function(df_batch: DataFrame, batch_id):
df_batch = df_batch.dropDuplicates("id")
df_batch.sparkSession.sql("""
MERGE INTO target_table
USING df_batch
.........
""")
streaming_df
.writeStream
.foreachBatch(merge_function)
....
Although Spark’s cache() helps avoid recomputing transformations by storing data frames in executor memory or disk, it can also introduce problems such as out-of-memory errors when the data exceeds available resources, or stale results if the underlying data changes outside the current process.
java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
Additionally, users frequently neglect to call unpersist() after the dataset is no longer needed, which can further strain cluster resources.
Alternatively:
The overhead of removing cache() and using one of the options above might decrease your performance, but it will produce consistent results and prevent your job from failing with OOM due to data not fitting into memory/disk.
Functions like display(), collect(), and print(df.count()) are often used during interactive development or debugging, and that’s exactly where they should stay. Including them in production pipelines is a common anti-pattern.
These functions are not inherently wrong — they are simply not safe for scale:
Production jobs should be clean, headless, and observable through logs and metrics, not developer-focused outputs. Instead, leverage logging frameworks and monitoring tools to track pipeline execution.
Applying explode() creates a considerable increase in row count, causing the size of partitions to grow substantially. For an array of 1000 items, exploding it will cause a 1000x increase in the number of rows. This might cause data spill, making your application run very slowly or fail with OOM. This problem is often seen when flattening XML or JSON files.
Recommendations:
df = df.repartition(higher_number_of_partitions)
df = df.select(col("id"), explode(col("array_col"))
Instead of:
df = df.select(col("id"), col("other_col"), explode(col("array_col"))
Use this:
df.select("id", explode("array_col")).write.saveAsTable("table1")
df.select("id", "other_col").write.saveAsTable("table2")
table1 = spark.read.table("table1")
table2 = spark.read.table("table2")
flattened_result = table1.join(table2, on="id")
Conclusion: Write Smarter Spark Code for Better Performance
Avoiding these common mistakes can significantly enhance the efficiency and stability of Spark applications. By adopting best practices, teams can reduce execution time, lower costs, and build more scalable data pipelines.
Do you want to optimise your Spark workloads? Review your code for these anti-patterns and implement best practices today!
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.