cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Correlated column exception in SQL UDF when using UDF parameters.

Johan_Van_Noten
New Contributor III

Environment

Azure Databricks 10.1, including Spark 3.2.0

Scenario

I want to retrieve the average of a series of values between two timestamps, using a SQL UDF.

The average is obviously just an example. In a real scenario, I would like to hide some additional querying complexity behind a SQL UDF.

Tried (and working)

%sql
SELECT avg(temperature) as averageTemperature 
    from oventemperatures 
    where ovenTimestamp
          between to_timestamp("1999-01-01") 
          and to_timestamp("2021-12-31")

And this seems possible in a UDF as well.

CREATE FUNCTION
averageTemperatureUDF(ovenID STRING, startTime TIMESTAMP, endTime TIMESTAMP) 
   RETURNS FLOAT
   READS SQL DATA SQL SECURITY DEFINER
   RETURN SELECT avg(ovenTemperature) as averageTemperature 
          from oventemperatures 
          where ovenTimestamp
                between to_timestamp("1999-01-01") 
                and to_timestamp("2021-12-31")

Tried (and failed)

When I want to use the UDF's parameters in the filter condition, the function definition fails.

CREATE FUNCTION
averageTemperatureUDF(ovenID STRING, startTime TIMESTAMP, endTime TIMESTAMP) 
   RETURNS FLOAT
   READS SQL DATA SQL SECURITY DEFINER
   RETURN SELECT avg(ovenTemperature) as averageTemperature 
          from oventemperatures 
          where ovenTimestamp
                between startTime
                and endTime

The error message complains on "correlated column".

Error in SQL statement: AnalysisException: 
Correlated column is not allowed in predicate 
(spark_catalog.default.oventemperatures.ovenTimestamp >= outer(averageTemperatureUDF.startTime))
(spark_catalog.default.oventemperatures.ovenTimestamp <= outer(averageTemperatureUDF.endTime)):
Aggregate [avg(ovenTemperature#275) AS averageTemperature#9299]
+- Filter ((ovenTimestamp#273 >= outer(startTime#9301)) AND (ovenTimestamp#273 <= outer(endTime#9302)))
   +- SubqueryAlias spark_catalog.default.oventemperatures
      +- Relation default.oventemperatures[ovenTimestamp#273,(...),ovenTemperature#275,(...)] 
	     JDBCRelation(OvenTemperatures) [numPartitions=1]

Question(s)

It seems that it is not accepted to use the UDF's parameters inside the expressions.

  • Is that a correct conclusion?
  • Any reason for this limitation?
  • Any alternatives to work around this?
1 ACCEPTED SOLUTION

Accepted Solutions

jose_gonzalez
Databricks Employee
Databricks Employee

Hi @Johan Van Notenโ€‹ ,

It seems like there is an issue with the columns that are part of your "where" clause. These columns are part of the UDF itself. I would like to recommend to the check the following docs

1) https://docs.databricks.com/spark/latest/spark-sql/language-manual/sql-ref-syntax-ddl-create-functio...

2) https://docs.databricks.com/spark/latest/spark-sql/language-manual/sql-ref-functions-udf-scalar.html

3) https://docs.databricks.com/spark/latest/spark-sql/language-manual/sql-ref-functions-udf-aggregate.h...

Hopefully, these link could be helpful to solve this issue.

Thank you.

View solution in original post

19 REPLIES 19

Anonymous
Not applicable

Hello there, @Johan Van Notenโ€‹!

My name is Piper and I'm one of the moderators here. Welcome to the community and thanks for your questions. Let's give it a while longer to see how the community responds. We can circle back to this later if we need to. ๐Ÿ™‚

Anonymous
Not applicable

@Johan Van Notenโ€‹ - I'm sorry about the delay. Our SMEs are running behind. ๐Ÿ˜ž

I'll bump this back to them. Hang in there.

jose_gonzalez
Databricks Employee
Databricks Employee

Hi @Johan Van Notenโ€‹ ,

It seems like there is an issue with the columns that are part of your "where" clause. These columns are part of the UDF itself. I would like to recommend to the check the following docs

1) https://docs.databricks.com/spark/latest/spark-sql/language-manual/sql-ref-syntax-ddl-create-functio...

2) https://docs.databricks.com/spark/latest/spark-sql/language-manual/sql-ref-functions-udf-scalar.html

3) https://docs.databricks.com/spark/latest/spark-sql/language-manual/sql-ref-functions-udf-aggregate.h...

Hopefully, these link could be helpful to solve this issue.

Thank you.

lav
New Contributor III

HI @Johan Van Notenโ€‹ 

Were you able to resolve this issue? I am stuck on a same issue. Please if you can guide me if you are able to resolve this issue.

Thank You.

Johan_Van_Noten
New Contributor III

Hi Iav

Unfortunately, I've not been able to solve this or work around it.

Based on @Jose Gonzalezโ€‹ ' reply, I've been reading the documents he pointed to and I concluded that this is just not possible in SparkSQL. It significantly disables the usability of UDFs, but I can understand why it doesn't work.

Johan

lav
New Contributor III

Hi @Johan Van Notenโ€‹ 

I got a work around it. If this helps you. Below is the query I wrote

Target Query:

create or replace function TestAverage(DateBudget Date) RETURNS FLOAT

  return select Avg(pd.Amount) as Amount

        from TestTable1 as pd 

          left join TestTable2 er

            on er.PK = pd.Exchange_Rate_PK

          where sign(DATEDIFF(DateBudget, date_add(to_date(pd.From_Date), -1))) = 1

             and sign(DATEDIFF(DateBudget, date_add(to_date(pd.To_Date), 1))) = -1;

Original Query:

select AVG(pd.Amount) as Amount

      from TestTable1 as pd 

        left join TestTable2 er

          on er.PK = pd.Exchange_Rate_PK

      where fci.DATEBUDGET >= pd.From_Date

        and fci.DATEBUDGET <= pd.To_Date

lav
New Contributor III

I have used sign function to determine the date diff.

Rhys
New Contributor II

Hi Iav,

With sign I still get Correlated column bug, any thoughts?

CREATE FUNCTION IF NOT EXISTS rw_weekday_diff(start_date DATE, end_date DATE)

RETURNS INT

COMMENT 'Calculated days between two dates excluding weekend'

RETURN DATEDIFF(DAY, start_date, end_date) 

  - (DATEDIFF(WEEK, start_date, end_date) * 2)

  - (CASE WHEN dayofweek(start_date) = 1 THEN 1 ELSE 0 END) 

  + (CASE WHEN dayofweek(start_date) = 7 THEN 1 ELSE 0 END)

  - (SELECT COUNT(*) FROM bank_holidays AS h WHERE 

      sign(DATEDIFF(h.date, date_add(to_date(start_date), -1))) = 1 AND 

      sign(DATEDIFF(h.date, date_add(to_date(end_date), 1))) = -1);   

--   original where clause - (SELECT COUNT(*) FROM bank_holidays.uk_bank_holidays AS h WHERE h.date >= start_date AND h.date = end_date);

Robert_Smith
New Contributor II

This UDF takes two parameters startDate and tillDate, and returns an integer number. Create a udf in sql server with return type integer.

Johan_Van_Noten
New Contributor III

Hi Robert,

Thanks for your suggestion. The fact that I want to do this in SparkSQL is because there is no underlying SQLServer. Well, there might be, but often there isn't. We make use of the possibility of SparkSQL to abstract the underlying data stores (Parquet files, CSV files or SQL data) into a single SQL-alike layer. That is a kind of virtual curation.

In that context, I do not see an option to delegate the UDF to a lower SQL server. Am I overlooking something?

Best regards,

Johan

Raghu_Bindingan
New Contributor III

Hi @Johan Van Notenโ€‹ ,

Were you able to work around this issue. The UDF does not mind an equivalent operator but does not like it when the predicate has a non equivalent operator like > or <.

Hi @Raghu Bindinganavaleโ€‹ ,

I wasn't able to work around this and considered it a restriction of SparkSQL. We are coping with it on a higher layer now, which is unfortunate.

Thanks,

Johan

Hi @Johan Van Notenโ€‹ 

I got it to work, its a very different kind of SQL then what i am used to but it works, you could try the below and tweak it to your needs. Basically spark sql does not like predicates that are not equal, so you will need to use a self join to the same table and create a column that equates to your conditions and then join that back to your main table to get the desired result. Very different from our daily SQL ๐Ÿ˜€

CREATE FUNCTION

averageTemperatureUDF(ovenID STRING, startTime TIMESTAMP, endTime TIMESTAMP) 

  RETURNS FLOAT

  READS SQL DATA SQL SECURITY DEFINER

  RETURN SELECT avg(ovenTemperature) as averageTemperature 

     from oventemperatures base

 INNER JOIN (SELECT ovenTimestamp BETWEEN startTime AND endTime AS label, ovenTimestamp FROM oventemperatures) *****

ON base.ovenTimestamp = *****.ovenTimestamp

     where *****.label IS true

Hi @Raghu Bindinganavaleโ€‹ ,

Thanks for your suggestion.

I'm not sure how the SQL engine will optimize this query, but you are essentially calculating the condition-label over all oventemperatures in the full storage (millions) and only afterwards filtering the good from the bad ones in the where clause.

While the oventemperatures (virtual) table is partitioned on ovenTimestamp and should therefore be able to resolve the BETWEEN extremely quickly, I guess that your solution may not run with acceptable performance. Do you think the query optimizer will be able to work its way through this efficiently?

In my scenario description, I indicated that this is just a simplified example to clarify the issue. I guess though that in our general cases, isolating the condition in the way you propose may be possible. Although I remain sceptical about the performance. I'll need to try your proposal once I have the chance to refactor the application.

Best regards,

Johan

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group