Pyspark Configuration: pyspark --packages io.delta:delta-core_2.12:2.4.0,org.apache.hadoop:hadoop-aws:3.3.4,io.delta:delta-storage-s3-dynamodb:2.4.0 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" --executor-memory 10g --driver-memory 16g
EMR Configuration: m5a.16xlarge master and m5a.4xlarge core fleets scalable till 120 instances
Data: 12000 records and 80MB size
Observations: time to explode 1000 records is taking 2min and for 5000 is 33min
Pseudo Code:
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, col, explode, explode_outer
from mosaic import enable_mosaic, st_geomfromwkb, st_geomfromwkt, st_aswkt, st_isvalid, grid_polyfill
conf = SparkConf().setAll([("spark.databricks.labs.mosaic.jar.autoattach", 'false')])
spark=SparkSession.builder.config(conf=conf).getOrCreate()
enable_mosaic(spark)
polygon_df= df.withColumn("index_array", grid_polyfill(col("GEOMETRY"), lit(11)))
exploded_df= polygon_df.withColumn("index_id", explode_outer(col("index_array"))).select("GEOMETRY", "index_id")
exploded_df.write.format("delta").option("overwrite", "true").save("<s3_path>")
Any suggestions for optimizations in explode function?