Hi,
I am facing a performance issue with spatial reference system conversions. My delta table has approximately 10 GB/46 files/160M records and gets +/- 5M records every week. After ingestion, I need to convert points (columns GE_XY_XCOR and GE_XY_YCOR) from Lambert72 to ETRS89 and save them under column GE_XY_CORLON and GE_XY_CORLAT.
I've tried multiple ways but the merge easily takes over 12 hours with a powerful cluster (Standard_L8s_v2). Here are my approaches:
Mosaic:
df_etrs89 = (df
# Only keep records without ETRS89 coordinates
.filter("GE_XY_CORLON = 0 AND GE_XY_CORLAT = 0")
# Filter out records without Lambert72 coordinates
.filter("GE_XY_XCOR IS NOT NULL AND GE_XY_YCOR IS NOT NULL")
# Cast from decimal to double
.withColumn("GE_XY_XCOR", col("GE_XY_XCOR").cast(DoubleType()))
.withColumn("GE_XY_YCOR", col("GE_XY_YCOR").cast(DoubleType()))
# Create point geom out of x and y coordinate
.withColumn("POINT_GEOM", st_point('GE_XY_XCOR', 'GE_XY_YCOR'))
# Set SRID to Lambert72
.withColumn("POINT_GEOM_31370", st_setsrid("POINT_GEOM", lit(31370)))
# Convert geom to ETRS89
.withColumn("POINT_GEOM_4258", st_transform("POINT_GEOM_31370", lit(4258)))
# Extract x coordinate from geom and cast to decimal
.withColumn("GE_XY_CORLON", st_xmax("POINT_GEOM_4258").cast(DecimalType(25,7)))
# Extract y coordinate from geom and cast to decimal
.withColumn("GE_XY_CORLAT", st_ymax("POINT_GEOM_4258").cast(DecimalType(25,7)))
.select("GE_XY_ID", "GE_XY_CORLON", "GE_XY_CORLAT")
)
UDF and pyproj:
# Define the coordinate conversion function
def convert_coordinates(x, y):
# Create transformer objects for Lambert72 and WGS84
lambert72 = pyproj.Proj(init="epsg:31370")
etrs89 = pyproj.Proj(init="epsg:4258")
lon, lat = pyproj.transform(lambert72, etrs89, x, y)
return lon, lat
# Define the schema for the resulting DataFrame
result_schema = StructType([
StructField("lon", FloatType(), False),
StructField("lat", FloatType(), False),
])
# Register the UDF
convert_coordinates_udf = udf(convert_coordinates, result_schema)
# Apply the UDF to the DataFrame to create new columns with WGS84 coordinates
conversion = convert_coordinates_udf(col("GE_XY_XCOR"), col("GE_XY_YCOR"))
df_etrs89 = (df
.select("GE_XY_ID", "GE_XY_XCOR", "GE_XY_YCOR", "GE_XY_CORLON", "GE_XY_CORLAT")
.filter("GE_XY_CORLON == 0.0000000 AND GE_XY_CORLAT = 0.0000000")
.filter("GE_XY_XCOR IS NOT NULL AND GE_XY_YCOR IS NOT NULL")
.withColumn("GE_XY_CORLON", conversion.getItem("lon").cast(DecimalType(25,7)))
.withColumn("GE_XY_CORLAT", conversion.getItem("lat").cast(DecimalType(25,7)))
.select("GE_XY_ID", "GE_XY_CORLON", "GE_XY_CORLAT")
)
I convert datatypes back and forth to be able to use pyproj and for my updates to be compatible with the schema of the target table.
In both cases I then define a temporary view based on the dataframe and perform a merge into my table:
# Register DataFrames as temporary views
df_etrs89.createOrReplaceTempView("etrs89_view")
# Define the merge condition and update SQL statement
merge_sql = f"""
MERGE INTO {schema_name}.{table_name} AS target
USING etrs89_view AS source
ON target.GE_XY_ID = source.GE_XY_ID
WHEN MATCHED THEN
UPDATE SET
target.GE_XY_CORLON = source.GE_XY_CORLON,
target.GE_XY_CORLAT = source.GE_XY_CORLAT
"""
Any suggestion on how to speed this up? Thanks in advance!