cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Transpile a SQL Script into PySpark DataFrame API equivalent code

thecodecache
New Contributor II

Input SQL Script (assume any dialect) :

 

SELECT b.se10,
       b.se3,
       b.se_aggrtr_indctr,
       b.key_swipe_ind
FROM
  (SELECT se10,
          se3,
          se_aggrtr_indctr,
          ROW_NUMBER() OVER (PARTITION BY SE10
                             ORDER BY se_aggrtr_indctr DESC) AS rn,
          key_swipe_ind
   FROM fraud_details_data_whole
   GROUP BY se10,
            se3,
            se_aggrtr_indctr ,
            key_swipe_ind) b
WHERE b.rn<2

Output PySpark Code using DataFrame API :

 

from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

# Create a SparkSession
spark = SparkSession.builder \
    .appName("TranspileSQLExample") \
    .getOrCreate()

# Sample data (replace with your actual DataFrame)
data = [
    ("se10_value1", "se3_value1", "aggrtr_value1", "swipe_value1"),
    ("se10_value1", "se3_value2", "aggrtr_value2", "swipe_value2"),
    ("se10_value2", "se3_value3", "aggrtr_value3", "swipe_value3"),
    ("se10_value2", "se3_value4", "aggrtr_value4", "swipe_value4"),
    ("se10_value3", "se3_value5", "aggrtr_value5", "swipe_value5"),
    ("se10_value3", "se3_value6", "aggrtr_value6", "swipe_value6")
]

# Create DataFrame
fraud_details_data_whole = spark.createDataFrame(data, ["se10", "se3", "se_aggrtr_indctr", "key_swipe_ind"])

# Define Window specification for row_number() function
windowSpec = Window.partitionBy("se10").orderBy(fraud_details_data_whole["se_aggrtr_indctr"].desc())

# Add row number column
fraud_details_data_whole = fraud_details_data_whole.withColumn("rn", row_number().over(windowSpec))

# Filter rows where rn < 2
result_df = fraud_details_data_whole.filter("rn < 2")

# Select required columns
result_df = result_df.select("se10", "se3", "se_aggrtr_indctr", "key_swipe_ind")

# Show the result DataFrame
result_df.show()

Is there any way to translate the above Sql Query into its equivalent PySpark DataFrame API code? The result must be equal when we execute Sql Script and its transpiled PySpark code separately.

Kindly suggest

https://stackoverflow.com/q/78446543/6842300

2 REPLIES 2

thecodecache
New Contributor II

Hi @Retired_mod, Thanks for your response. 
I'm looking for a utility or an automated way of translating any generic SQL into PySpark DataFrame code.

So, the translate function should look like below:

def translate(input_sql):
    # translate/convert it into pyspark dataframe code
    return pyspark_code

Please suggest

MathieuDB
Databricks Employee
Databricks Employee

Hello @thecodecache ,

Have a look the SQLGlot project: https://github.com/tobymao/sqlglot?tab=readme-ov-file#faq

It can easily transpile SQL to Spark SQL, like that:

import sqlglot
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("SQLGlot Example").getOrCreate()

# Original SQL query
sql_query = """
SELECT bar.a, b + 1 AS b
FROM bar
JOIN baz ON bar.a = baz.a
WHERE bar.a > 1
"""

# Convert SQL to Spark SQL dialect
spark_sql = sqlglot.transpile(sql_query, read="generic", write="spark")[0]

# Create a DataFrame from the Spark SQL query
df = spark.sql(spark_sql)

# Show the resulting DataFrame
df.show()

 

 

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group