cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Trying to build subquery in Databricks notebook, similar to SQL in a data frame with the Top(1)

DeviJaviya
New Contributor II

Hello Everyone,

I am new to Databricks, so I am at the learning stage. 

It would be very helpful if someone helps in resolving the issue or I can say helped me to fix my code.

I have built the query that fetches the data based on CASE, in Case I have a used sub-query of the same table that returns the data based on TOP(1), order by date desc.

I tried to do it using Pyspark and after a lot of R&D I found that order by and select top 1 is not used in Pyspark,

Please provide me some solution for this.

Original Code in SQL:

SELECT distinct
		@CompanyId,
		curr.t_ccur,
		@TargetCurrency,
		curr.t_rtyp,
		curr.t_stdt,
		CASE WHEN @Basecurrency = @TargetCurrency THEN 1
			 WHEN @Basecurrency <> @ReferenceCurrency and @TargetCurrency <> @ReferenceCurrency THEN
				(SELECT TOP(1) (CASE WHEN it2.t_excb = 2 THEN (select top(1) it.t_rate from dbo.currenciestable it where it.t_bcur = @ReferenceCurrency and it.t_ccur = @Basecurrency and it.t_stdt <= curr.t_stdt and it.t_rtyp = curr.t_rtyp order by it.t_stdt desc) /it2.t_rate 
								ELSE it2.t_rate * (select top(1) it.t_rate from dbo.currenciestable it where it.t_bcur = @ReferenceCurrency and it.t_ccur = @Basecurrency and it.t_stdt <= curr.t_stdt and it.t_rtyp = curr.t_rtyp order by it.t_stdt desc) END) From ttcmcs008100 it2 where it2.t_bcur = @ReferenceCurrency and it2.t_ccur = @TargetCurrency and it2.t_stdt <= curr.t_stdt and it2.t_rtyp = curr.t_rtyp order by it2.t_stdt desc)
				
		        ELSE 0 END
	from dbo.currenciestable curr
	where curr.t_bcur = @ReferenceCurrency and curr.t_ccur = @Basecurrency

Tried to replicate the above code in Pyspark, it is executing but giving the wrong data

from pyspark.sql.functions import current_timestamp
 
CompanyId = 100
Basecurrency = 'USD'
TargetCurrency = 'SAR'
ReferenceCurrency = 'AED'
 
w = spark.sql(f"""
SELECT Distinct
'{CompanyId}' AS CompanyId,
curr.t_ccur AS BaseCurrency,
'{TargetCurrency}' AS TargetCurrency,
curr.t_rtyp AS RateType,
curr.t_stdt AS EffectiveDate,
    CASE WHEN ('{Basecurrency}' == '{TargetCurrency}') THEN 
        WHEN ('{Basecurrency}' <> '{ReferenceCurrency}' and '{TargetCurrency}' <> '{ReferenceCurrency}') THEN 
        (SELECT first(CASE WHEN it2.t_excb = 2 THEN (select first(it1.t_rate) from currenciestable it1 inner join currenciestablecurr on it1.t_stdt <= curr.t_stdt and it1.t_rtyp = curr.t_rtyp and it1.t_bcur = '{ReferenceCurrency}' and it1.t_ccur = '{Basecurrency}' ) /it2.t_rate ELSE it2.t_rate * (select first(it.t_rate) from currenciestable it inner join currenciestable curr on it.t_bcur = '{ReferenceCurrency}' and it.t_ccur = '{Basecurrency}' and it.t_stdt <= curr.t_stdt and it.t_rtyp = curr.t_rtyp ) END) From currenciestable it2 inner join currenciestable curr ON it2.t_bcur = '{ReferenceCurrency}' and it2.t_ccur = '{TargetCurrency}' and it2.t_stdt <= curr.t_stdt and it2.t_rtyp = curr.t_rtyp )															
																
							     
    ELSE 0 END AS Rate
from currenciestable curr
where curr.t_bcur = '{ReferenceCurrency}' and curr.t_ccur = '{Basecurrency}'
    """)
display(w)

Moreover, I tried to replicate the same using Dataframe,

from pyspark.sql.functions import when, col
 
df_empty = df_empty.withColumn(
    "Rate",
    when(
        (df_empty["Basecurrency"] == df_empty["TargetCurrency"]),
        1
    )
    .when(
        (
            (df_empty["Basecurrency"] != df_empty["ReferenceCurrency"]) & 
            (df_empty["TargetCurrency"] != df_empty["ReferenceCurrency"])
        ),
        when(
            df_empty["ExpressionBaseCurrency"] == 2,
            (
                df_curr.select("t_rate")
                .where(
                    (df_curr["t_bcur"] == col("t_bcur")) &
                    (df_curr["t_ccur"] == col("t_ccur")) &
                    (df_curr["t_stdt"] <= col("t_stdt")) &
                    (df_curr["t_rtyp"] == col("t_rtyp"))
                )
                .orderBy(df_curr["t_stdt"].desc())
                .limit(1)
                .selectExpr("t_rate / t_rate as rate")
            )
            .otherwise(
                df_curr.select("t_rate")
                .where(
                    (col("t_bcur") == col("t_bcur")) &
                    (col("t_ccur") == col("t_ccur")) &
                    (col("t_stdt") <= col("t_stdt")) &
                    (col("t_rtyp") == col("t_rtyp"))
                )
                .orderBy(col("t_stdt").desc())
                .limit(1)
                .selectExpr("t_rate * t_rate as rate")
            )
        )
    )
    .otherwise(df_empty["Rate"])
)

Kindly help

Thanks in advance

2 REPLIES 2

Rishabh-Pandey
Esteemed Contributor

Top keyword is not supported so in place of that you can do one thing order your data in desc and then use the limit 1 in last it will give only the max record so you can modify your query according to that

Rishabh Pandey

DeviJaviya
New Contributor II

Hello Rishabh,

Thank you for your suggestion, we tried to limit 1 but the output values are coming the same for all the dates. which is not correct.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group