cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Hello Experts - I am facing one technical issue with Databricks SQL - IF-ELSE or CASE statement implementation when trying to execute two separate set of queries based on a valued of a column of the Delta table.

Atul_Sharan
New Contributor II

Hi Experts,

I'm reading a pipe delimited source file where first row does not contain the data but contains the - REPLACE or UPDATE values which indicate the refresh type if it is Full refresh Or Upsert. The second row is nothing but header and the actual column wise data starts from 3rd row.

I am processing this file using DF and saving the data into a temporary delta table for further processing. However I am facing issue in implementing the IF-ELSE logic where I can simply perform two separate SQL operations based on the value of the REPLACE or UPDATE in the delta table column as we can perform using T-SQL.

Any pointes/leads/probable solution in this regard will be appreciated. Thanks !!

1 ACCEPTED SOLUTION

Accepted Solutions

User16826994569
New Contributor III

Hi Atul,

Based on the data which I cooked here is the code base which will read file and generate a dataframe where REFRESH TYPE will be added as a new column and rest of the columns will be split into different columns

*******************************************************************************

# Import Libararies

from pyspark.sql.functions import input_file_name, lit, split, col

# Declare UDF to Sort Records (Later Used To Remove Headers)

def dfZipWithIndex (df, offset=1, colName="rowId"):

 '''

    Enumerates dataframe rows is native order, like rdd.ZipWithIndex(), but on a dataframe 

    and preserves a schema

    :param df: source dataframe

    :param offset: adjustment to zipWithIndex()'s index

    :param colName: name of the index column

  '''

 new_schema = StructType(

          [StructField(colName,LongType(),True)]    # new added field in front

          + df.schema.fields              # previous schema

        )

 zipped_rdd = df.rdd.zipWithIndex()

 new_rdd = zipped_rdd.map(lambda args: ([args[1] + offset] + list(args[0])))

 return spark.createDataFrame(new_rdd, new_schema)

# Read Data File

df = sqlContext.read.format('com.databricks.spark.csv').load("/FileStore/test.csv")

# Extract First Row Which is REFRESH TYPE

x = lit(str(df.limit(1).collect()[0][0]))

# Add REFRESH TYPE as a NEW COLUMN

df = df.withColumn("RefreshType",x)

# REMOVE/FILTER REFRESH TYPE FROM DATAFRAME

df2=dfZipWithIndex(df)

df2 = df2.filter(df2.rowId>1).drop("rowId")

df2.show()

#Split UDF 

split_col = split(df2._c0, '\\|',)

# Finally split ID & Name column and create a data frame

df3=df2.withColumn("ID",split_col.getItem(0)) \

 .withColumn("Name",split_col.getItem(1)) \

 .drop(col("_c0"))

df3.show()

image 

Idea is that finally you can save this dataframe into a temporary table and then it will be easy for you to do IF-ELSE statement. My understanding was that currently as you may be writing complete raw dataset into table its like complete data into a single column (like below)

image 

Let me know if we have different understanding.

View solution in original post

5 REPLIES 5

Anonymous
Not applicable

Hello @Atul Sharan​. My name is Piper, and I'm a moderator for Databricks. Welcome and thank you for your question! Let's give it a bit longer to see how the community responds. If necessary, we'll circle back to this.

User16826994569
New Contributor III

Hi Atul,

Thanks for your question. Further can you please try to add sample dataset as an example.

My understanding is your data should look like this.

File-1

******

REPLACE

ID|Name

1|Alex

2|James

3|Smith

File-2

******

UDPATE

ID|Name

1|Alex Ho

2|James King

Cheers

GS

User16826994569
New Contributor III

Hi Atul,

Based on the data which I cooked here is the code base which will read file and generate a dataframe where REFRESH TYPE will be added as a new column and rest of the columns will be split into different columns

*******************************************************************************

# Import Libararies

from pyspark.sql.functions import input_file_name, lit, split, col

# Declare UDF to Sort Records (Later Used To Remove Headers)

def dfZipWithIndex (df, offset=1, colName="rowId"):

 '''

    Enumerates dataframe rows is native order, like rdd.ZipWithIndex(), but on a dataframe 

    and preserves a schema

    :param df: source dataframe

    :param offset: adjustment to zipWithIndex()'s index

    :param colName: name of the index column

  '''

 new_schema = StructType(

          [StructField(colName,LongType(),True)]    # new added field in front

          + df.schema.fields              # previous schema

        )

 zipped_rdd = df.rdd.zipWithIndex()

 new_rdd = zipped_rdd.map(lambda args: ([args[1] + offset] + list(args[0])))

 return spark.createDataFrame(new_rdd, new_schema)

# Read Data File

df = sqlContext.read.format('com.databricks.spark.csv').load("/FileStore/test.csv")

# Extract First Row Which is REFRESH TYPE

x = lit(str(df.limit(1).collect()[0][0]))

# Add REFRESH TYPE as a NEW COLUMN

df = df.withColumn("RefreshType",x)

# REMOVE/FILTER REFRESH TYPE FROM DATAFRAME

df2=dfZipWithIndex(df)

df2 = df2.filter(df2.rowId>1).drop("rowId")

df2.show()

#Split UDF 

split_col = split(df2._c0, '\\|',)

# Finally split ID & Name column and create a data frame

df3=df2.withColumn("ID",split_col.getItem(0)) \

 .withColumn("Name",split_col.getItem(1)) \

 .drop(col("_c0"))

df3.show()

image 

Idea is that finally you can save this dataframe into a temporary table and then it will be easy for you to do IF-ELSE statement. My understanding was that currently as you may be writing complete raw dataset into table its like complete data into a single column (like below)

image 

Let me know if we have different understanding.

Atul_Sharan
New Contributor II

Thanks a ton Gurpreet, Suggested approach was helpful in resolution of the issue. Appreciate your help !!

Welcome Atul