Input SQL Script (assume any dialect) :
SELECT b.se10,
b.se3,
b.se_aggrtr_indctr,
b.key_swipe_ind
FROM
(SELECT se10,
se3,
se_aggrtr_indctr,
ROW_NUMBER() OVER (PARTITION BY SE10
ORDER BY se_aggrtr_indctr DESC) AS rn,
key_swipe_ind
FROM fraud_details_data_whole
GROUP BY se10,
se3,
se_aggrtr_indctr ,
key_swipe_ind) b
WHERE b.rn<2
Output PySpark Code using DataFrame API :
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
# Create a SparkSession
spark = SparkSession.builder \
.appName("TranspileSQLExample") \
.getOrCreate()
# Sample data (replace with your actual DataFrame)
data = [
("se10_value1", "se3_value1", "aggrtr_value1", "swipe_value1"),
("se10_value1", "se3_value2", "aggrtr_value2", "swipe_value2"),
("se10_value2", "se3_value3", "aggrtr_value3", "swipe_value3"),
("se10_value2", "se3_value4", "aggrtr_value4", "swipe_value4"),
("se10_value3", "se3_value5", "aggrtr_value5", "swipe_value5"),
("se10_value3", "se3_value6", "aggrtr_value6", "swipe_value6")
]
# Create DataFrame
fraud_details_data_whole = spark.createDataFrame(data, ["se10", "se3", "se_aggrtr_indctr", "key_swipe_ind"])
# Define Window specification for row_number() function
windowSpec = Window.partitionBy("se10").orderBy(fraud_details_data_whole["se_aggrtr_indctr"].desc())
# Add row number column
fraud_details_data_whole = fraud_details_data_whole.withColumn("rn", row_number().over(windowSpec))
# Filter rows where rn < 2
result_df = fraud_details_data_whole.filter("rn < 2")
# Select required columns
result_df = result_df.select("se10", "se3", "se_aggrtr_indctr", "key_swipe_ind")
# Show the result DataFrame
result_df.show()
Is there any way to translate the above Sql Query into its equivalent PySpark DataFrame API code? The result must be equal when we execute Sql Script and its transpiled PySpark code separately.
Kindly suggest
https://stackoverflow.com/q/78446543/6842300