cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Trying to check if a column exist in a dataframe or not if not then i have to give NULL if yes then i need to give the column itself by using UDF

cuteabhi32
New Contributor III

from pyspark import SparkContext

from pyspark import SparkConf

from pyspark.sql.types import *

from pyspark.sql.functions import *

from pyspark.sql import *

from pyspark.sql.types import StringType

from pyspark.sql.functions import udf

df1 = spark.read.format("csv").option("header","true").load("file:///home/cloudera/data/a.csv")

def func_udf(df,col):

column = list(df.columns)

if (col in column):

return df.col

else:

return NULL

spark.udf.register("columncheck",func_udf)

resultdf=df1.withColumn("ref_date",expr("CASE WHEN Flag = 3 THEN '06MAY2022' ELSE columncheck(df1,ref_date) END"))

resultdf.show()

this is the code i am trying to check if a column exist in a dataframe or not if not then i have to give NULL if yes then i need to give the column itself by using UDF and its throwing error for udf not sure what i am doing wrong.kindly help how to resolve error as resultdf dataframe is throwing below error

Traceback (most recent call last):

 File "<stdin>", line 1, in <module>

 File "/usr/local/spark/python/pyspark/sql/dataframe.py", line 1849, in withColumn

  return DataFrame(self._jdf.withColumn(colName, col._jc), self.sql_ctx)

 File "/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__

 File "/usr/local/spark/python/pyspark/sql/utils.py", line 69, in deco

  raise AnalysisException(s.split(': ', 1)[1], stackTrace)

pyspark.sql.utils.AnalysisException: "cannot resolve '`df1`' given input columns: [ref_date, Salary_qtr4, Salary_qtr3, Flag, Name, Salary_qtr1, Salary, Std, Salary_qtr2, Id, Mean]; line 1 pos 53;\n'Project [Id#10, Name#11, Salary#12, CASE WHEN (cast(Flag#20 as int) = 3) THEN 06MAY2022 ELSE 'columncheck('df1, ref_date#13) END AS ref_date#35, Salary_qtr1#14, Salary_qtr2#15, Salary_qtr3#16, Salary_qtr4#17, Mean#18, Std#19, Flag#20]\n+- AnalysisBarrier\n   +- Relation

1 ACCEPTED SOLUTION

Accepted Solutions

-werners-
Esteemed Contributor III

ok so let us keep it simple:

df1 = spark.read.format("csv").option("header","true").load("file:///home/cloudera/data/a.csv")
 
if "col" in df1.columns:
 df2 = df1
else:
 df2 = df1.withColumn("col", lit(None).cast(TypeYouWant)

Next you can use your case etc on df2.

View solution in original post

11 REPLIES 11

-werners-
Esteemed Contributor III

try to return a column with content = null instead of plain null.

So:

if (col in column):

return df

else:

df.withColumn("col", lit(null))

cuteabhi32
New Contributor III

Hi Werners thanks for the input but still its throwing error

AnalysisException: Column 'a' does not exist. Did you mean one of the following? [a.id, a.std, a.Dept, a.flag, a.mean, a.salary, a.neg_std, a.new_var, a.ref_date, a.mean20perc, a.neg_mean20perc]; line 1 pos 57;

this is my updated code i am using in case statement .Kindly help

def func_udf(df,col):

  column =list(df.columns)

  for col in column:

    return col

  else:

     lit(NULL)

spark.udf.register("ColumnChecker",func_udf)

a = a.withColumn('ref_date',expr(f"CASE WHEN flag = 3 THEN '{sf_start_dt}' ELSE ColumnChecker(a,ref_date) END"))

-werners-
Esteemed Contributor III

ok so you have this columns list (df.columns).

If you then do:

if "columnName" in columns: return df

else return df.withColumn("col", lit(null))

You do not need to loop.

In your loop you return col, not a df. You want to return the complete dataframe with an optional extra column,

cuteabhi32
New Contributor III

from pyspark.sql.functions import * 

from pyspark.sql.functions import col

import pyspark.sql.functions as F

from datetime import date,datetime

import time 

from dateutil.relativedelta import relativedelta

from dateutil import parser

from pyspark.sql.window import Window

from pyspark.sql.types import *

import locale

# ---- merge statements ---- 

df1 = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/shared_uploads/a.csv")

df1.show()

df2 = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/shared_uploads/b.csv")

df2.show()

# ---- merge statements ---- 

sf_start_dt = '02MAY2022'

a=df1

a = a.filter(col("ref_date") == f"{sf_start_dt}")

a = a.drop('name')

a = a.select('id','salary','ref_date','std','mean',)

b = df2

a.createOrReplaceTempView("a")

b.createOrReplaceTempView("b")

a = a.join(b,'id',"outer")

df1 = spark.sql("select tbl1.id,(select 1) as tempCol1 from a tbl1 inner join b tbl2 on tbl1.id = tbl2.id")

df2 = spark.sql("select tbl1.id,(select 2) as tempCol2 from a tbl1 left join b tbl2 on tbl1.id = tbl2.id where tbl2.id is null")

df3 = spark.sql("select tbl1.id,(select 3) as tempCol3 from b tbl1 left join a tbl2 on tbl1.id = tbl2.id where tbl2.id is null")

a = a.join(df1,'id',"outer").join(df2,'id',"outer").join(df3,'id',"outer")

a = a.na.fill(0,'tempCol1')

a = a.na.fill(0,'tempCol2')

a = a.na.fill(0,'tempCol3')

a = a.withColumn('flag', coalesce(col('tempCol1')+col('tempCol2')+col('tempCol3')) )

a = a.drop('tempCol1')

a = a.drop('tempCol2')

a = a.drop('tempCol3')

a = a\

.withColumn("neg_std",F.expr(f"(std*(-1))"))

a = a\

.withColumn("mean20perc",F.expr(f"(0.20*mean)"))

a = a\

.withColumn("neg_mean20perc",F.expr(f"(mean20perc*(-1))"))

a = a\

.withColumn("new_var",F.expr(f"'{sf_start_dt}'"))

columnsToDrop = []

selectClause = ''

a.createOrReplaceTempView("a")

a = spark.sql("select * from a")

from pyspark.sql.types import StringType

from pyspark.sql.functions import udf

column =list(a.columns)

print(column)

def func_udf(df,col):

  column =list(df.columns)

  if col in column:

    return df

  else:

     df.withColumn("col", lit(null))

spark.udf.register("ColumnChecker",func_udf)

a = a.withColumn('ref_date',expr(f"CASE WHEN flag = 3 THEN '{sf_start_dt}' ELSE ColumnChecker(a,ref_date) END"))

a = a.withColumn('balance',expr(f"CASE WHEN flag = 3 THEN 0 END"))

a.show()

a = a.withColumn('new_col',expr(f"CASE WHEN flag = 3 THEN '{sf_start_dt}' END"))

a.show()

work_ppcin_bal2_2019_1 = a

work_ppcin_bal2_2019_1.show()

# ---- end of merge statements ---- 

this is the full fledge code

please find attached the files

-werners-
Esteemed Contributor III

Why don't you first make sure the column is present in your df, and then use the CASE?

f.e. df2 = func_udf(df1, col) #add missing col

df2 = df.withColumn( CASE...)

cuteabhi32
New Contributor III

AttributeError: 'DataFrame' object has no attribute 'ColumnChecker'

def func_udf(df,col):

  column =list(df.columns)

  if col in column:

    return df.col

  else:

     df.withColumn("col", lit(null))

spark.udf.register("ColumnChecker",func_udf)

dfg=a.ColumnChecker(a,ref_date)

dfg.show()

this is the code i am running

-werners-
Esteemed Contributor III

Let us make abstraction of what you already have written.

If I understand correctly, you need a certain column for a case statement. But that column may or may not be present, correct?

If so, the proposed approach should work, no need for a function.

If not, can you explain what you try to do?

Because I have the impression that we are not aligned.

cuteabhi32
New Contributor III

yes your understanding is correct i need a column either it may be dynamically created on the fly or pre-existing in the dataframe .if that column is present that function shoud return that column as output if its newly getting created it should return null as value

-werners-
Esteemed Contributor III

ok so let us keep it simple:

df1 = spark.read.format("csv").option("header","true").load("file:///home/cloudera/data/a.csv")
 
if "col" in df1.columns:
 df2 = df1
else:
 df2 = df1.withColumn("col", lit(None).cast(TypeYouWant)

Next you can use your case etc on df2.

cuteabhi32
New Contributor III

Thanks i modified my code as per your suggestion and it worked perfectly Thanks again for all your inputs

dflist= spark.createDataFrame(list(a.columns), "string").toDF("Name")

dfg=dflist.filter(col('name').isin('ref_date')).count()

if dfg==1 :

  a = a.withColumn('ref_date',expr(f"CASE WHEN flag = 3 THEN '{sf_start_dt}' ELSE ref_date END"))

else:

  a = a.withColumn('ref_date',expr(f"CASE WHEN flag = 3 THEN '{sf_start_dt}' END"))

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.