cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Performance issue with spatial reference system conversions

rvo1994
New Contributor

Hi,

I am facing a performance issue with spatial reference system conversions. My delta table has approximately 10 GB/46 files/160M records and gets +/- 5M records every week. After ingestion, I need to convert points (columns GE_XY_XCOR and GE_XY_YCOR) from Lambert72 to ETRS89 and save them under column GE_XY_CORLON and GE_XY_CORLAT.

I've tried multiple ways but the merge easily takes over 12 hours with a powerful cluster (Standard_L8s_v2). Here are my approaches:

Mosaic:

 

 

df_etrs89 = (df
# Only keep records without ETRS89 coordinates
.filter("GE_XY_CORLON = 0 AND GE_XY_CORLAT = 0")
# Filter out records without Lambert72 coordinates
.filter("GE_XY_XCOR IS NOT NULL AND GE_XY_YCOR IS NOT NULL")
# Cast from decimal to double
.withColumn("GE_XY_XCOR", col("GE_XY_XCOR").cast(DoubleType()))
.withColumn("GE_XY_YCOR", col("GE_XY_YCOR").cast(DoubleType()))
# Create point geom out of x and y coordinate
.withColumn("POINT_GEOM", st_point('GE_XY_XCOR', 'GE_XY_YCOR'))
# Set SRID to Lambert72
.withColumn("POINT_GEOM_31370", st_setsrid("POINT_GEOM", lit(31370)))
# Convert geom to ETRS89
.withColumn("POINT_GEOM_4258", st_transform("POINT_GEOM_31370", lit(4258)))
# Extract x coordinate from geom and cast to decimal
.withColumn("GE_XY_CORLON", st_xmax("POINT_GEOM_4258").cast(DecimalType(25,7)))
# Extract y coordinate from geom and cast to decimal
.withColumn("GE_XY_CORLAT", st_ymax("POINT_GEOM_4258").cast(DecimalType(25,7)))
.select("GE_XY_ID", "GE_XY_CORLON", "GE_XY_CORLAT")
)

 

UDF and pyproj: 

 

 

# Define the coordinate conversion function
def convert_coordinates(x, y):
# Create transformer objects for Lambert72 and WGS84
lambert72 = pyproj.Proj(init="epsg:31370")
etrs89 = pyproj.Proj(init="epsg:4258")
lon, lat = pyproj.transform(lambert72, etrs89, x, y)
return lon, lat

# Define the schema for the resulting DataFrame
result_schema = StructType([
StructField("lon", FloatType(), False),
StructField("lat", FloatType(), False),
])

# Register the UDF
convert_coordinates_udf = udf(convert_coordinates, result_schema)

# Apply the UDF to the DataFrame to create new columns with WGS84 coordinates
conversion = convert_coordinates_udf(col("GE_XY_XCOR"), col("GE_XY_YCOR"))
df_etrs89 = (df
.select("GE_XY_ID", "GE_XY_XCOR", "GE_XY_YCOR", "GE_XY_CORLON", "GE_XY_CORLAT")
.filter("GE_XY_CORLON == 0.0000000 AND GE_XY_CORLAT = 0.0000000")
.filter("GE_XY_XCOR IS NOT NULL AND GE_XY_YCOR IS NOT NULL")
.withColumn("GE_XY_CORLON", conversion.getItem("lon").cast(DecimalType(25,7)))
.withColumn("GE_XY_CORLAT", conversion.getItem("lat").cast(DecimalType(25,7)))
.select("GE_XY_ID", "GE_XY_CORLON", "GE_XY_CORLAT")
)

 


I convert datatypes back and forth to be able to use pyproj and for my updates to be compatible with the schema of the target table. 

In both cases I then define a temporary view based on the dataframe and perform a merge into my table:

 

 

# Register DataFrames as temporary views
df_etrs89.createOrReplaceTempView("etrs89_view")

# Define the merge condition and update SQL statement
merge_sql = f"""
MERGE INTO {schema_name}.{table_name} AS target
USING etrs89_view AS source
ON target.GE_XY_ID = source.GE_XY_ID
WHEN MATCHED THEN
UPDATE SET
target.GE_XY_CORLON = source.GE_XY_CORLON,
target.GE_XY_CORLAT = source.GE_XY_CORLAT
"""

 

Any suggestion on how to speed this up? Thanks in advance!

0 REPLIES 0
Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.