a week ago
Hi ,
I am trying to create a SQL UDF and I am trying to run some python code involving pyspark, I am not able to create a spark session inside the python section of the function, here is how my code looks,
CREATE OR REPLACE FUNCTION test.getValuesFromTable(field1 INT,field2 INT)
RETURNS Map<STRING,ARRAY<STRING>>
LANGUAGE PYTHON
AS $$
from pyspark.sql.functions import col
import numpy as np
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("sample").getOrCreate()
def getqueryList():
query1 = "select distinct(value1) from test.table"
query2 ="select distinct(value2) from test.table"
query_list = [query1,query2]
return query_list
def executeQuery(field1,field2,query):
query = query + " where field1 = {} and field2 = {}".format(field1,field2)
return spark.sql(query)
def getValues(field1,field2):
result_dict = {}
result_list = []
df_list = [executeQuery(field1,field2,query) for query in getqueryList()]
for df in df_list:
fieldName = df.schema.names[0]
result_list = [row[0] for row in df.select(fieldName).collect()]
result_dict[fieldName] = np.array(result_list)
return result_dict
return getValues(field1,field2)
$$
when I try to execute the function I am not able to invoke a spark session,
SystemExit: -1
== Stacktrace ==
File "<udfbody>", line 6, in main
spark = SparkSession.builder.appName("sample").getOrCreate()
File "/databricks/spark/python/pyspark/sql/session.py", line 562, in getOrCreate
else SparkContext.getOrCreate(sparkConf)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/databricks/spark/python/pyspark/core/context.py", line 574, in getOrCreate
SparkContext(conf=conf or SparkConf())
File "/databricks/spark/python/pyspark/core/context.py", line 206, in __init__
SparkContext._ensure_initialized(self, gateway=gateway, conf=conf)
File "/databricks/spark/python/pyspark/core/context.py", line 495, in _ensure_initialized
SparkContext._gateway = gateway or launch_gateway(conf)
^^^^^^^^^^^^^^^^^^^^
File "/databricks/spark/python/pyspark/java_gateway.py", line 63, in launch_gateway
SPARK_HOME = _find_spark_home()
Is there a way to run spark sql inside the SQL UDF?
Monday - last edited Monday
Hi @Dp15
The error stack trace doesn't point to an error with running Spark SQL inside the SQL UDF but instead with respect to the call to the SparkSession created from inside the UDF; this is not permitted. Furthermore, the UDF runs in a separate execution context that does not have direct access to the main Spark context or session, which is why you see the SystemExit error.
The Spark session should be created outside the UDF, and the queries executed before passing the results to the UDF for further processing.
Is there a way to run spark sql inside the SQL UDF?
So, you can run sql inside your udf, but you can't initialize the sparksession there.
Monday - last edited Monday
So something like this should work?
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("sample").getOrCreate()
getValues(spark,scenario_id,reporting_date)
I tried this but I am not able to pass SparkSession object to the method definition
Unsupported data type "SPARKSESSION"
Monday - last edited Monday
My apologies for the confusion, I've edited my previous comment (and wrong statement[1]) to make it clear:
[1] So, you can run code inside your udf, but you can't initialize the sparksession there or reuse one.
Executing "spark.sql" statements within a PySparkUDF is not feasible as we can't create a SparkSession object inside the Executors or use one in Executors as it is part of the Driver context only. UDFs are typically used to apply custom transformations to individual DataFrame rows or columns, but retrieving distinct values from specific columns based on certain conditions can be done using PySparks's build int DF operations.
Will this work for your use case instead?
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
# Initialize Spark session
spark = SparkSession.builder.appName("sample").getOrCreate()
def get_values_from_table(field1_value, field2_value):
df = spark.table("test.table")
filtered_df = df.filter((col("field1") == field1_value) & (col("field2") == field2_value))
result_dict = {}
columns_to_query = ["value1", "value2"]
for column in columns_to_query:
distinct_values = filtered_df.select(column).distinct().rdd.flatMap(lambda x: x).collect()
result_dict[column] = distinct_values
return result_dict
# Example usage
field1_value = 10
field2_value = 20
result = get_values_from_table(field1_value, field2_value)
print(result)
Monday
Actually this would if I am using it in a native notebook environment, however I am trying to create a UDF because I want these queries to be executed from an external, JDBC connection, and I dont wish to wait for the cluster to spin up for a notebook, is there a way I can achieve this?
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group