12-15-2021 04:56 AM
Hi Experts,
I'm reading a pipe delimited source file where first row does not contain the data but contains the - REPLACE or UPDATE values which indicate the refresh type if it is Full refresh Or Upsert. The second row is nothing but header and the actual column wise data starts from 3rd row.
I am processing this file using DF and saving the data into a temporary delta table for further processing. However I am facing issue in implementing the IF-ELSE logic where I can simply perform two separate SQL operations based on the value of the REPLACE or UPDATE in the delta table column as we can perform using T-SQL.
Any pointes/leads/probable solution in this regard will be appreciated. Thanks !!
12-15-2021 11:38 PM
Hi Atul,
Based on the data which I cooked here is the code base which will read file and generate a dataframe where REFRESH TYPE will be added as a new column and rest of the columns will be split into different columns
*******************************************************************************
# Import Libararies
from pyspark.sql.functions import input_file_name, lit, split, col
# Declare UDF to Sort Records (Later Used To Remove Headers)
def dfZipWithIndex (df, offset=1, colName="rowId"):
'''
Enumerates dataframe rows is native order, like rdd.ZipWithIndex(), but on a dataframe
and preserves a schema
:param df: source dataframe
:param offset: adjustment to zipWithIndex()'s index
:param colName: name of the index column
'''
new_schema = StructType(
[StructField(colName,LongType(),True)] # new added field in front
+ df.schema.fields # previous schema
)
zipped_rdd = df.rdd.zipWithIndex()
new_rdd = zipped_rdd.map(lambda args: ([args[1] + offset] + list(args[0])))
return spark.createDataFrame(new_rdd, new_schema)
# Read Data File
df = sqlContext.read.format('com.databricks.spark.csv').load("/FileStore/test.csv")
# Extract First Row Which is REFRESH TYPE
x = lit(str(df.limit(1).collect()[0][0]))
# Add REFRESH TYPE as a NEW COLUMN
df = df.withColumn("RefreshType",x)
# REMOVE/FILTER REFRESH TYPE FROM DATAFRAME
df2=dfZipWithIndex(df)
df2 = df2.filter(df2.rowId>1).drop("rowId")
df2.show()
#Split UDF
split_col = split(df2._c0, '\\|',)
# Finally split ID & Name column and create a data frame
df3=df2.withColumn("ID",split_col.getItem(0)) \
.withColumn("Name",split_col.getItem(1)) \
.drop(col("_c0"))
df3.show()
Idea is that finally you can save this dataframe into a temporary table and then it will be easy for you to do IF-ELSE statement. My understanding was that currently as you may be writing complete raw dataset into table its like complete data into a single column (like below)
Let me know if we have different understanding.
12-15-2021 10:29 AM
Hello @Atul Sharan. My name is Piper, and I'm a moderator for Databricks. Welcome and thank you for your question! Let's give it a bit longer to see how the community responds. If necessary, we'll circle back to this.
12-15-2021 06:59 PM
Hi Atul,
Thanks for your question. Further can you please try to add sample dataset as an example.
My understanding is your data should look like this.
File-1
******
REPLACE
ID|Name
1|Alex
2|James
3|Smith
File-2
******
UDPATE
ID|Name
1|Alex Ho
2|James King
Cheers
GS
12-15-2021 11:38 PM
Hi Atul,
Based on the data which I cooked here is the code base which will read file and generate a dataframe where REFRESH TYPE will be added as a new column and rest of the columns will be split into different columns
*******************************************************************************
# Import Libararies
from pyspark.sql.functions import input_file_name, lit, split, col
# Declare UDF to Sort Records (Later Used To Remove Headers)
def dfZipWithIndex (df, offset=1, colName="rowId"):
'''
Enumerates dataframe rows is native order, like rdd.ZipWithIndex(), but on a dataframe
and preserves a schema
:param df: source dataframe
:param offset: adjustment to zipWithIndex()'s index
:param colName: name of the index column
'''
new_schema = StructType(
[StructField(colName,LongType(),True)] # new added field in front
+ df.schema.fields # previous schema
)
zipped_rdd = df.rdd.zipWithIndex()
new_rdd = zipped_rdd.map(lambda args: ([args[1] + offset] + list(args[0])))
return spark.createDataFrame(new_rdd, new_schema)
# Read Data File
df = sqlContext.read.format('com.databricks.spark.csv').load("/FileStore/test.csv")
# Extract First Row Which is REFRESH TYPE
x = lit(str(df.limit(1).collect()[0][0]))
# Add REFRESH TYPE as a NEW COLUMN
df = df.withColumn("RefreshType",x)
# REMOVE/FILTER REFRESH TYPE FROM DATAFRAME
df2=dfZipWithIndex(df)
df2 = df2.filter(df2.rowId>1).drop("rowId")
df2.show()
#Split UDF
split_col = split(df2._c0, '\\|',)
# Finally split ID & Name column and create a data frame
df3=df2.withColumn("ID",split_col.getItem(0)) \
.withColumn("Name",split_col.getItem(1)) \
.drop(col("_c0"))
df3.show()
Idea is that finally you can save this dataframe into a temporary table and then it will be easy for you to do IF-ELSE statement. My understanding was that currently as you may be writing complete raw dataset into table its like complete data into a single column (like below)
Let me know if we have different understanding.
12-16-2021 10:19 AM
Thanks a ton Gurpreet, Suggested approach was helpful in resolution of the issue. Appreciate your help !!
12-16-2021 12:56 PM
Welcome Atul
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group