cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Databricks Mosaic's grid_polyfill() is taking longer to explode the index when run using PySpark

KiranKondamadug
New Contributor II

Pyspark Configuration: pyspark --packages io.delta:delta-core_2.12:2.4.0,org.apache.hadoop:hadoop-aws:3.3.4,io.delta:delta-storage-s3-dynamodb:2.4.0 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" --executor-memory 10g --driver-memory 16g

EMR Configuration: m5a.16xlarge master and m5a.4xlarge core fleets scalable till 120 instances

Data: 12000 records and 80MB size

Observations: time to explode 1000 records is taking 2min and for 5000 is 33min

Pseudo Code:

from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, col, explode, explode_outer
from mosaic import enable_mosaic, st_geomfromwkb, st_geomfromwkt, st_aswkt, st_isvalid, grid_polyfill

conf = SparkConf().setAll([("spark.databricks.labs.mosaic.jar.autoattach", 'false')])

spark=SparkSession.builder.config(conf=conf).getOrCreate()
enable_mosaic(spark)

polygon_df= df.withColumn("index_array", grid_polyfill(col("GEOMETRY"), lit(11)))
exploded_df= polygon_df.withColumn("index_id", explode_outer(col("index_array"))).select("GEOMETRY", "index_id")

exploded_df.write.format("delta").option("overwrite", "true").save("<s3_path>")

Any suggestions for optimizations in explode function?

1 REPLY 1

Kaniz
Community Manager
Community Manager

Hi @KiranKondamadug, The explode function in PySpark is used to transform a column with an array of values into multiple rows. Each row of the resulting DataFrame will contain one element of the original array column. 

 

Let’s discuss some suggestions for optimizing the explode function in your use case:

 

Batch Size:

  • The performance of explode can be influenced by the size of the array being exploded.
  • Consider experimenting with different batch sizes (number of records to explode at once) to find an optimal balance between memory usage and execution time.
  • Smaller batch sizes may reduce memory pressure but increase the overhead of processing.

Data Skew:

  • If certain records have significantly larger arrays than others, it can cause data skew.
  • Investigate whether there are any skewed partitions in your data.
  • If possible, preprocess or split large arrays into smaller chunks to distribute the workload evenly.

Partitioning and Parallelism:

  • Ensure that your DataFrame is properly partitioned.
  • If the data is not evenly distributed across partitions, consider repartitioning before applying explode.
  • Adjust the number of partitions based on the available resources and the size of your data.

Memory Configuration:

  • The memory allocated to Spark executors (executor-memory) and driver (driver-memory) affects performance.
  • Ensure that you have sufficient memory for both the executor and driver.
  • Monitor memory usage during the explode operation to avoid spills to disk.

Caching and Persistence:

  • If you perform multiple operations on the same DataFrame, consider caching or persisting it in memory.
  • Caching can improve performance by avoiding recomputation.

Delta Lake Optimization:

  • Since you’re writing the result to a Delta table, consider using Delta Lake optimizations:
    • Z-Ordering: If your data has a natural order (e.g., timestamp), use Z-Ordering to improve query performance.
    • Compaction: Configure automatic compaction to optimize storage and improve read performance.

Profiling and Monitoring:

  • Use Spark UI or other monitoring tools to profile the job.
  • Identify bottlenecks, resource usage, and stages that contribute to the execution time.
  • Optimize based on the insights gained from profiling.

Remember that performance tuning is often a combination of trial and error. Experiment with different settings, monitor the impact and iterate to find the best configuration for your specific workload. 🚀

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.