cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Executing Python code inside a SQL Function

Dp15
Contributor

Hi ,

I am trying to create a SQL UDF and I am trying to run some python code involving pyspark, I am not able to create a spark session inside the python section of the function, here is how my code looks,

 

	CREATE OR REPLACE FUNCTION test.getValuesFromTable(field1 INT,field2 INT)
	RETURNS Map<STRING,ARRAY<STRING>>
	  LANGUAGE PYTHON
	  AS $$
		from pyspark.sql.functions import col
		import numpy as np
		import pyspark
		from pyspark.sql import SparkSession
		spark = SparkSession.builder.appName("sample").getOrCreate()
		def getqueryList():
			query1 = "select distinct(value1) from test.table"
			query2 ="select distinct(value2) from test.table"
			

			query_list = [query1,query2]

			return query_list



		def executeQuery(field1,field2,query):
			query = query + " where field1 = {} and field2 = {}".format(field1,field2)
			return spark.sql(query)

		def getValues(field1,field2):
			result_dict = {}
			result_list = []
			df_list = [executeQuery(field1,field2,query) for query in getqueryList()]
			for df in df_list:
				fieldName = df.schema.names[0]    
				result_list = [row[0] for row in df.select(fieldName).collect()]
				result_dict[fieldName] = np.array(result_list)
			return result_dict
		
		return getValues(field1,field2)
		$$

when  I try to execute the function I am not able to invoke a spark session, 

SystemExit: -1
== Stacktrace ==
  File "<udfbody>", line 6, in main
    spark = SparkSession.builder.appName("sample").getOrCreate()
  File "/databricks/spark/python/pyspark/sql/session.py", line 562, in getOrCreate
    else SparkContext.getOrCreate(sparkConf)
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/databricks/spark/python/pyspark/core/context.py", line 574, in getOrCreate
    SparkContext(conf=conf or SparkConf())
  File "/databricks/spark/python/pyspark/core/context.py", line 206, in __init__
    SparkContext._ensure_initialized(self, gateway=gateway, conf=conf)
  File "/databricks/spark/python/pyspark/core/context.py", line 495, in _ensure_initialized
    SparkContext._gateway = gateway or launch_gateway(conf)
                                       ^^^^^^^^^^^^^^^^^^^^
  File "/databricks/spark/python/pyspark/java_gateway.py", line 63, in launch_gateway
    SPARK_HOME = _find_spark_home()



Is there a way to run spark sql inside the SQL UDF?



4 REPLIES 4

VZLA
Databricks Employee
Databricks Employee

Hi @Dp15 

The error stack trace doesn't point to an error with running Spark SQL inside the SQL UDF but instead with respect to the call to the SparkSession created from inside the UDF; this is not permitted. Furthermore, the UDF runs in a separate execution context that does not have direct access to the main Spark context or session, which is why you see the SystemExit error.

The Spark session should be created outside the UDF, and the queries executed before passing the results to the UDF for further processing. 

Is there a way to run spark sql inside the SQL UDF?

So, you can run sql inside your udf, but you can't initialize the sparksession there.

Dp15
Contributor

So something like this should work?

 

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("sample").getOrCreate()
getValues(spark,scenario_id,reporting_date)
       

I tried this but I am not able to pass SparkSession object to the method definition
Unsupported data type "SPARKSESSION"

 



 

VZLA
Databricks Employee
Databricks Employee

@Dp15 

My apologies for the confusion, I've edited my previous comment (and wrong statement[1]) to make it clear:

[1] So, you can run code inside your udf, but you can't initialize the sparksession there or reuse one.

Executing "spark.sql" statements within a PySparkUDF is not feasible as we can't create a SparkSession object inside the Executors or use one in Executors as it is part of the Driver context only. UDFs are typically used to apply custom transformations to individual DataFrame rows or columns, but retrieving distinct values from specific columns based on certain conditions can be done using PySparks's build int DF operations.

Will this work for your use case instead?

 

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Initialize Spark session
spark = SparkSession.builder.appName("sample").getOrCreate()

def get_values_from_table(field1_value, field2_value):
    df = spark.table("test.table")
    
    filtered_df = df.filter((col("field1") == field1_value) & (col("field2") == field2_value))
    
    result_dict = {}
    
    columns_to_query = ["value1", "value2"]
    
    for column in columns_to_query:
        distinct_values = filtered_df.select(column).distinct().rdd.flatMap(lambda x: x).collect()
        result_dict[column] = distinct_values
    
    return result_dict

# Example usage
field1_value = 10
field2_value = 20
result = get_values_from_table(field1_value, field2_value)
print(result)

 

 

Dp15
Contributor

Actually this would if I am using it in a native notebook environment, however I am trying to create a UDF because I want these queries to be executed from an external, JDBC connection, and I dont wish to wait for the cluster to spin up for a notebook, is there a way I can achieve this?

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group