Hello Everyone,
I am new to Databricks, so I am at the learning stage.
It would be very helpful if someone helps in resolving the issue or I can say helped me to fix my code.
I have built the query that fetches the data based on CASE, in Case I have a used sub-query of the same table that returns the data based on TOP(1), order by date desc.
I tried to do it using Pyspark and after a lot of R&D I found that order by and select top 1 is not used in Pyspark,
Please provide me some solution for this.
Original Code in SQL:
SELECT distinct
@CompanyId,
curr.t_ccur,
@TargetCurrency,
curr.t_rtyp,
curr.t_stdt,
CASE WHEN @Basecurrency = @TargetCurrency THEN 1
WHEN @Basecurrency <> @ReferenceCurrency and @TargetCurrency <> @ReferenceCurrency THEN
(SELECT TOP(1) (CASE WHEN it2.t_excb = 2 THEN (select top(1) it.t_rate from dbo.currenciestable it where it.t_bcur = @ReferenceCurrency and it.t_ccur = @Basecurrency and it.t_stdt <= curr.t_stdt and it.t_rtyp = curr.t_rtyp order by it.t_stdt desc) /it2.t_rate
ELSE it2.t_rate * (select top(1) it.t_rate from dbo.currenciestable it where it.t_bcur = @ReferenceCurrency and it.t_ccur = @Basecurrency and it.t_stdt <= curr.t_stdt and it.t_rtyp = curr.t_rtyp order by it.t_stdt desc) END) From ttcmcs008100 it2 where it2.t_bcur = @ReferenceCurrency and it2.t_ccur = @TargetCurrency and it2.t_stdt <= curr.t_stdt and it2.t_rtyp = curr.t_rtyp order by it2.t_stdt desc)
ELSE 0 END
from dbo.currenciestable curr
where curr.t_bcur = @ReferenceCurrency and curr.t_ccur = @Basecurrency
Tried to replicate the above code in Pyspark, it is executing but giving the wrong data
from pyspark.sql.functions import current_timestamp
CompanyId = 100
Basecurrency = 'USD'
TargetCurrency = 'SAR'
ReferenceCurrency = 'AED'
w = spark.sql(f"""
SELECT Distinct
'{CompanyId}' AS CompanyId,
curr.t_ccur AS BaseCurrency,
'{TargetCurrency}' AS TargetCurrency,
curr.t_rtyp AS RateType,
curr.t_stdt AS EffectiveDate,
CASE WHEN ('{Basecurrency}' == '{TargetCurrency}') THEN
WHEN ('{Basecurrency}' <> '{ReferenceCurrency}' and '{TargetCurrency}' <> '{ReferenceCurrency}') THEN
(SELECT first(CASE WHEN it2.t_excb = 2 THEN (select first(it1.t_rate) from currenciestable it1 inner join currenciestablecurr on it1.t_stdt <= curr.t_stdt and it1.t_rtyp = curr.t_rtyp and it1.t_bcur = '{ReferenceCurrency}' and it1.t_ccur = '{Basecurrency}' ) /it2.t_rate ELSE it2.t_rate * (select first(it.t_rate) from currenciestable it inner join currenciestable curr on it.t_bcur = '{ReferenceCurrency}' and it.t_ccur = '{Basecurrency}' and it.t_stdt <= curr.t_stdt and it.t_rtyp = curr.t_rtyp ) END) From currenciestable it2 inner join currenciestable curr ON it2.t_bcur = '{ReferenceCurrency}' and it2.t_ccur = '{TargetCurrency}' and it2.t_stdt <= curr.t_stdt and it2.t_rtyp = curr.t_rtyp )
ELSE 0 END AS Rate
from currenciestable curr
where curr.t_bcur = '{ReferenceCurrency}' and curr.t_ccur = '{Basecurrency}'
""")
display(w)
Moreover, I tried to replicate the same using Dataframe,
from pyspark.sql.functions import when, col
df_empty = df_empty.withColumn(
"Rate",
when(
(df_empty["Basecurrency"] == df_empty["TargetCurrency"]),
1
)
.when(
(
(df_empty["Basecurrency"] != df_empty["ReferenceCurrency"]) &
(df_empty["TargetCurrency"] != df_empty["ReferenceCurrency"])
),
when(
df_empty["ExpressionBaseCurrency"] == 2,
(
df_curr.select("t_rate")
.where(
(df_curr["t_bcur"] == col("t_bcur")) &
(df_curr["t_ccur"] == col("t_ccur")) &
(df_curr["t_stdt"] <= col("t_stdt")) &
(df_curr["t_rtyp"] == col("t_rtyp"))
)
.orderBy(df_curr["t_stdt"].desc())
.limit(1)
.selectExpr("t_rate / t_rate as rate")
)
.otherwise(
df_curr.select("t_rate")
.where(
(col("t_bcur") == col("t_bcur")) &
(col("t_ccur") == col("t_ccur")) &
(col("t_stdt") <= col("t_stdt")) &
(col("t_rtyp") == col("t_rtyp"))
)
.orderBy(col("t_stdt").desc())
.limit(1)
.selectExpr("t_rate * t_rate as rate")
)
)
)
.otherwise(df_empty["Rate"])
)
Kindly help
Thanks in advance