โ12-06-2021 04:38 AM
Environment
Azure Databricks 10.1, including Spark 3.2.0
Scenario
I want to retrieve the average of a series of values between two timestamps, using a SQL UDF.
The average is obviously just an example. In a real scenario, I would like to hide some additional querying complexity behind a SQL UDF.
Tried (and working)
%sql
SELECT avg(temperature) as averageTemperature
from oventemperatures
where ovenTimestamp
between to_timestamp("1999-01-01")
and to_timestamp("2021-12-31")
And this seems possible in a UDF as well.
CREATE FUNCTION
averageTemperatureUDF(ovenID STRING, startTime TIMESTAMP, endTime TIMESTAMP)
RETURNS FLOAT
READS SQL DATA SQL SECURITY DEFINER
RETURN SELECT avg(ovenTemperature) as averageTemperature
from oventemperatures
where ovenTimestamp
between to_timestamp("1999-01-01")
and to_timestamp("2021-12-31")
Tried (and failed)
When I want to use the UDF's parameters in the filter condition, the function definition fails.
CREATE FUNCTION
averageTemperatureUDF(ovenID STRING, startTime TIMESTAMP, endTime TIMESTAMP)
RETURNS FLOAT
READS SQL DATA SQL SECURITY DEFINER
RETURN SELECT avg(ovenTemperature) as averageTemperature
from oventemperatures
where ovenTimestamp
between startTime
and endTime
The error message complains on "correlated column".
Error in SQL statement: AnalysisException:
Correlated column is not allowed in predicate
(spark_catalog.default.oventemperatures.ovenTimestamp >= outer(averageTemperatureUDF.startTime))
(spark_catalog.default.oventemperatures.ovenTimestamp <= outer(averageTemperatureUDF.endTime)):
Aggregate [avg(ovenTemperature#275) AS averageTemperature#9299]
+- Filter ((ovenTimestamp#273 >= outer(startTime#9301)) AND (ovenTimestamp#273 <= outer(endTime#9302)))
+- SubqueryAlias spark_catalog.default.oventemperatures
+- Relation default.oventemperatures[ovenTimestamp#273,(...),ovenTemperature#275,(...)]
JDBCRelation(OvenTemperatures) [numPartitions=1]
Question(s)
It seems that it is not accepted to use the UDF's parameters inside the expressions.
โ01-11-2022 04:54 PM
Hi @Johan Van Notenโ ,
It seems like there is an issue with the columns that are part of your "where" clause. These columns are part of the UDF itself. I would like to recommend to the check the following docs
2) https://docs.databricks.com/spark/latest/spark-sql/language-manual/sql-ref-functions-udf-scalar.html
Hopefully, these link could be helpful to solve this issue.
Thank you.
โ12-06-2021 03:45 PM
Hello there, @Johan Van Notenโ!
My name is Piper and I'm one of the moderators here. Welcome to the community and thanks for your questions. Let's give it a while longer to see how the community responds. We can circle back to this later if we need to. ๐
โ12-28-2021 08:13 AM
@Johan Van Notenโ - I'm sorry about the delay. Our SMEs are running behind. ๐
I'll bump this back to them. Hang in there.
โ01-11-2022 04:54 PM
Hi @Johan Van Notenโ ,
It seems like there is an issue with the columns that are part of your "where" clause. These columns are part of the UDF itself. I would like to recommend to the check the following docs
2) https://docs.databricks.com/spark/latest/spark-sql/language-manual/sql-ref-functions-udf-scalar.html
Hopefully, these link could be helpful to solve this issue.
Thank you.
โ07-04-2022 10:21 PM
HI @Johan Van Notenโ
Were you able to resolve this issue? I am stuck on a same issue. Please if you can guide me if you are able to resolve this issue.
Thank You.
โ07-05-2022 12:09 AM
Hi Iav
Unfortunately, I've not been able to solve this or work around it.
Based on @Jose Gonzalezโ ' reply, I've been reading the documents he pointed to and I concluded that this is just not possible in SparkSQL. It significantly disables the usability of UDFs, but I can understand why it doesn't work.
Johan
โ07-05-2022 09:40 AM
Hi @Johan Van Notenโ
I got a work around it. If this helps you. Below is the query I wrote
Target Query:
create or replace function TestAverage(DateBudget Date) RETURNS FLOAT
return select Avg(pd.Amount) as Amount
from TestTable1 as pd
left join TestTable2 er
on er.PK = pd.Exchange_Rate_PK
where sign(DATEDIFF(DateBudget, date_add(to_date(pd.From_Date), -1))) = 1
and sign(DATEDIFF(DateBudget, date_add(to_date(pd.To_Date), 1))) = -1;
Original Query:
select AVG(pd.Amount) as Amount
from TestTable1 as pd
left join TestTable2 er
on er.PK = pd.Exchange_Rate_PK
where fci.DATEBUDGET >= pd.From_Date
and fci.DATEBUDGET <= pd.To_Date
โ07-05-2022 09:41 AM
I have used sign function to determine the date diff.
โ01-24-2023 09:14 AM
Hi Iav,
With sign I still get Correlated column bug, any thoughts?
CREATE FUNCTION IF NOT EXISTS rw_weekday_diff(start_date DATE, end_date DATE)
RETURNS INT
COMMENT 'Calculated days between two dates excluding weekend'
RETURN DATEDIFF(DAY, start_date, end_date)
- (DATEDIFF(WEEK, start_date, end_date) * 2)
- (CASE WHEN dayofweek(start_date) = 1 THEN 1 ELSE 0 END)
+ (CASE WHEN dayofweek(start_date) = 7 THEN 1 ELSE 0 END)
- (SELECT COUNT(*) FROM bank_holidays AS h WHERE
sign(DATEDIFF(h.date, date_add(to_date(start_date), -1))) = 1 AND
sign(DATEDIFF(h.date, date_add(to_date(end_date), 1))) = -1);
-- original where clause - (SELECT COUNT(*) FROM bank_holidays.uk_bank_holidays AS h WHERE h.date >= start_date AND h.date = end_date);
โ07-05-2022 02:16 AM
This UDF takes two parameters startDate and tillDate, and returns an integer number. Create a udf in sql server with return type integer.
โ07-05-2022 06:38 AM
Hi Robert,
Thanks for your suggestion. The fact that I want to do this in SparkSQL is because there is no underlying SQLServer. Well, there might be, but often there isn't. We make use of the possibility of SparkSQL to abstract the underlying data stores (Parquet files, CSV files or SQL data) into a single SQL-alike layer. That is a kind of virtual curation.
In that context, I do not see an option to delegate the UDF to a lower SQL server. Am I overlooking something?
Best regards,
Johan
โ12-01-2022 11:57 AM
Hi @Johan Van Notenโ ,
Were you able to work around this issue. The UDF does not mind an equivalent operator but does not like it when the predicate has a non equivalent operator like > or <.
โ12-02-2022 12:39 AM
Hi @Raghu Bindinganavaleโ ,
I wasn't able to work around this and considered it a restriction of SparkSQL. We are coping with it on a higher layer now, which is unfortunate.
Thanks,
Johan
โ12-02-2022 05:11 AM
Hi @Johan Van Notenโ
I got it to work, its a very different kind of SQL then what i am used to but it works, you could try the below and tweak it to your needs. Basically spark sql does not like predicates that are not equal, so you will need to use a self join to the same table and create a column that equates to your conditions and then join that back to your main table to get the desired result. Very different from our daily SQL ๐
CREATE FUNCTION
averageTemperatureUDF(ovenID STRING, startTime TIMESTAMP, endTime TIMESTAMP)
RETURNS FLOAT
READS SQL DATA SQL SECURITY DEFINER
RETURN SELECT avg(ovenTemperature) as averageTemperature
from oventemperatures base
INNER JOIN (SELECT ovenTimestamp BETWEEN startTime AND endTime AS label, ovenTimestamp FROM oventemperatures) *****
ON base.ovenTimestamp = *****.ovenTimestamp
where *****.label IS true
โ12-02-2022 06:14 AM
Hi @Raghu Bindinganavaleโ ,
Thanks for your suggestion.
I'm not sure how the SQL engine will optimize this query, but you are essentially calculating the condition-label over all oventemperatures in the full storage (millions) and only afterwards filtering the good from the bad ones in the where clause.
While the oventemperatures (virtual) table is partitioned on ovenTimestamp and should therefore be able to resolve the BETWEEN extremely quickly, I guess that your solution may not run with acceptable performance. Do you think the query optimizer will be able to work its way through this efficiently?
In my scenario description, I indicated that this is just a simplified example to clarify the issue. I guess though that in our general cases, isolating the condition in the way you propose may be possible. Although I remain sceptical about the performance. I'll need to try your proposal once I have the chance to refactor the application.
Best regards,
Johan
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโt want to miss the chance to attend and share knowledge.
If there isnโt a group near you, start one and help create a community that brings people together.
Request a New Group