cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Correlated column exception in SQL UDF when using UDF parameters.

Johan_Van_Noten
New Contributor III

Environment

Azure Databricks 10.1, including Spark 3.2.0

Scenario

I want to retrieve the average of a series of values between two timestamps, using a SQL UDF.

The average is obviously just an example. In a real scenario, I would like to hide some additional querying complexity behind a SQL UDF.

Tried (and working)

%sql
SELECT avg(temperature) as averageTemperature 
    from oventemperatures 
    where ovenTimestamp
          between to_timestamp("1999-01-01") 
          and to_timestamp("2021-12-31")

And this seems possible in a UDF as well.

CREATE FUNCTION
averageTemperatureUDF(ovenID STRING, startTime TIMESTAMP, endTime TIMESTAMP) 
   RETURNS FLOAT
   READS SQL DATA SQL SECURITY DEFINER
   RETURN SELECT avg(ovenTemperature) as averageTemperature 
          from oventemperatures 
          where ovenTimestamp
                between to_timestamp("1999-01-01") 
                and to_timestamp("2021-12-31")

Tried (and failed)

When I want to use the UDF's parameters in the filter condition, the function definition fails.

CREATE FUNCTION
averageTemperatureUDF(ovenID STRING, startTime TIMESTAMP, endTime TIMESTAMP) 
   RETURNS FLOAT
   READS SQL DATA SQL SECURITY DEFINER
   RETURN SELECT avg(ovenTemperature) as averageTemperature 
          from oventemperatures 
          where ovenTimestamp
                between startTime
                and endTime

The error message complains on "correlated column".

Error in SQL statement: AnalysisException: 
Correlated column is not allowed in predicate 
(spark_catalog.default.oventemperatures.ovenTimestamp >= outer(averageTemperatureUDF.startTime))
(spark_catalog.default.oventemperatures.ovenTimestamp <= outer(averageTemperatureUDF.endTime)):
Aggregate [avg(ovenTemperature#275) AS averageTemperature#9299]
+- Filter ((ovenTimestamp#273 >= outer(startTime#9301)) AND (ovenTimestamp#273 <= outer(endTime#9302)))
   +- SubqueryAlias spark_catalog.default.oventemperatures
      +- Relation default.oventemperatures[ovenTimestamp#273,(...),ovenTemperature#275,(...)] 
	     JDBCRelation(OvenTemperatures) [numPartitions=1]

Question(s)

It seems that it is not accepted to use the UDF's parameters inside the expressions.

  • Is that a correct conclusion?
  • Any reason for this limitation?
  • Any alternatives to work around this?
19 REPLIES 19

HI @Johan Van Noten​ 

In the instance that i had which was quite simple it did perform ok, but you are right, about performance, this is something that needs to be monitored. I am sure SQL server would go crazy with this approach.

Thanks

Raghu

Own
Contributor

Iav's solution works.​

Own
Contributor

In case of Azure Databricks you can leverage ADF and run this function using SQL Integration runtime while ingesting without having any dependency on lower SQL environment.

Johan_Van_Noten
New Contributor III

Thanks guys,

@Lav Chandaria​ and @Raghu Bindinganavale​ 's solutions both work, but as indicated in my reply above, I'm worried about the performance of evaluating the datediff (Lav) or the label (Raghu) over the full dataset without the engine being able to just cut away "half" of the potential outcomes based on the single < and information from an index / partition. (see my comment above to Raghu).

I'll try it on a significant dataset once I get the opportunity.

Johan

creastysomp
New Contributor II

Thanks for your suggestion. The fact that I want to do this in SparkSQL is because there is no underlying SQLServer.