cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Trying to check if a column exist in a dataframe or not if not then i have to give NULL if yes then i need to give the column itself by using UDF

cuteabhi32
New Contributor III

from pyspark import SparkContext

from pyspark import SparkConf

from pyspark.sql.types import *

from pyspark.sql.functions import *

from pyspark.sql import *

from pyspark.sql.types import StringType

from pyspark.sql.functions import udf

df1 = spark.read.format("csv").option("header","true").load("file:///home/cloudera/data/a.csv")

def func_udf(df,col):

column = list(df.columns)

if (col in column):

return df.col

else:

return NULL

spark.udf.register("columncheck",func_udf)

resultdf=df1.withColumn("ref_date",expr("CASE WHEN Flag = 3 THEN '06MAY2022' ELSE columncheck(df1,ref_date) END"))

resultdf.show()

this is the code i am trying to check if a column exist in a dataframe or not if not then i have to give NULL if yes then i need to give the column itself by using UDF and its throwing error for udf not sure what i am doing wrong.kindly help how to resolve error as resultdf dataframe is throwing below error

Traceback (most recent call last):

 File "<stdin>", line 1, in <module>

 File "/usr/local/spark/python/pyspark/sql/dataframe.py", line 1849, in withColumn

  return DataFrame(self._jdf.withColumn(colName, col._jc), self.sql_ctx)

 File "/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__

 File "/usr/local/spark/python/pyspark/sql/utils.py", line 69, in deco

  raise AnalysisException(s.split(': ', 1)[1], stackTrace)

pyspark.sql.utils.AnalysisException: "cannot resolve '`df1`' given input columns: [ref_date, Salary_qtr4, Salary_qtr3, Flag, Name, Salary_qtr1, Salary, Std, Salary_qtr2, Id, Mean]; line 1 pos 53;\n'Project [Id#10, Name#11, Salary#12, CASE WHEN (cast(Flag#20 as int) = 3) THEN 06MAY2022 ELSE 'columncheck('df1, ref_date#13) END AS ref_date#35, Salary_qtr1#14, Salary_qtr2#15, Salary_qtr3#16, Salary_qtr4#17, Mean#18, Std#19, Flag#20]\n+- AnalysisBarrier\n   +- Relation

1 ACCEPTED SOLUTION

Accepted Solutions

-werners-
Esteemed Contributor III

ok so let us keep it simple:

df1 = spark.read.format("csv").option("header","true").load("file:///home/cloudera/data/a.csv")
 
if "col" in df1.columns:
 df2 = df1
else:
 df2 = df1.withColumn("col", lit(None).cast(TypeYouWant)

Next you can use your case etc on df2.

View solution in original post

11 REPLIES 11

-werners-
Esteemed Contributor III

try to return a column with content = null instead of plain null.

So:

if (col in column):

return df

else:

df.withColumn("col", lit(null))

cuteabhi32
New Contributor III

Hi Werners thanks for the input but still its throwing error

AnalysisException: Column 'a' does not exist. Did you mean one of the following? [a.id, a.std, a.Dept, a.flag, a.mean, a.salary, a.neg_std, a.new_var, a.ref_date, a.mean20perc, a.neg_mean20perc]; line 1 pos 57;

this is my updated code i am using in case statement .Kindly help

def func_udf(df,col):

  column =list(df.columns)

  for col in column:

    return col

  else:

     lit(NULL)

spark.udf.register("ColumnChecker",func_udf)

a = a.withColumn('ref_date',expr(f"CASE WHEN flag = 3 THEN '{sf_start_dt}' ELSE ColumnChecker(a,ref_date) END"))

-werners-
Esteemed Contributor III

ok so you have this columns list (df.columns).

If you then do:

if "columnName" in columns: return df

else return df.withColumn("col", lit(null))

You do not need to loop.

In your loop you return col, not a df. You want to return the complete dataframe with an optional extra column,

cuteabhi32
New Contributor III

from pyspark.sql.functions import * 

from pyspark.sql.functions import col

import pyspark.sql.functions as F

from datetime import date,datetime

import time 

from dateutil.relativedelta import relativedelta

from dateutil import parser

from pyspark.sql.window import Window

from pyspark.sql.types import *

import locale

# ---- merge statements ---- 

df1 = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/shared_uploads/a.csv")

df1.show()

df2 = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/shared_uploads/b.csv")

df2.show()

# ---- merge statements ---- 

sf_start_dt = '02MAY2022'

a=df1

a = a.filter(col("ref_date") == f"{sf_start_dt}")

a = a.drop('name')

a = a.select('id','salary','ref_date','std','mean',)

b = df2

a.createOrReplaceTempView("a")

b.createOrReplaceTempView("b")

a = a.join(b,'id',"outer")

df1 = spark.sql("select tbl1.id,(select 1) as tempCol1 from a tbl1 inner join b tbl2 on tbl1.id = tbl2.id")

df2 = spark.sql("select tbl1.id,(select 2) as tempCol2 from a tbl1 left join b tbl2 on tbl1.id = tbl2.id where tbl2.id is null")

df3 = spark.sql("select tbl1.id,(select 3) as tempCol3 from b tbl1 left join a tbl2 on tbl1.id = tbl2.id where tbl2.id is null")

a = a.join(df1,'id',"outer").join(df2,'id',"outer").join(df3,'id',"outer")

a = a.na.fill(0,'tempCol1')

a = a.na.fill(0,'tempCol2')

a = a.na.fill(0,'tempCol3')

a = a.withColumn('flag', coalesce(col('tempCol1')+col('tempCol2')+col('tempCol3')) )

a = a.drop('tempCol1')

a = a.drop('tempCol2')

a = a.drop('tempCol3')

a = a\

.withColumn("neg_std",F.expr(f"(std*(-1))"))

a = a\

.withColumn("mean20perc",F.expr(f"(0.20*mean)"))

a = a\

.withColumn("neg_mean20perc",F.expr(f"(mean20perc*(-1))"))

a = a\

.withColumn("new_var",F.expr(f"'{sf_start_dt}'"))

columnsToDrop = []

selectClause = ''

a.createOrReplaceTempView("a")

a = spark.sql("select * from a")

from pyspark.sql.types import StringType

from pyspark.sql.functions import udf

column =list(a.columns)

print(column)

def func_udf(df,col):

  column =list(df.columns)

  if col in column:

    return df

  else:

     df.withColumn("col", lit(null))

spark.udf.register("ColumnChecker",func_udf)

a = a.withColumn('ref_date',expr(f"CASE WHEN flag = 3 THEN '{sf_start_dt}' ELSE ColumnChecker(a,ref_date) END"))

a = a.withColumn('balance',expr(f"CASE WHEN flag = 3 THEN 0 END"))

a.show()

a = a.withColumn('new_col',expr(f"CASE WHEN flag = 3 THEN '{sf_start_dt}' END"))

a.show()

work_ppcin_bal2_2019_1 = a

work_ppcin_bal2_2019_1.show()

# ---- end of merge statements ---- 

this is the full fledge code

please find attached the files

-werners-
Esteemed Contributor III

Why don't you first make sure the column is present in your df, and then use the CASE?

f.e. df2 = func_udf(df1, col) #add missing col

df2 = df.withColumn( CASE...)

cuteabhi32
New Contributor III

AttributeError: 'DataFrame' object has no attribute 'ColumnChecker'

def func_udf(df,col):

  column =list(df.columns)

  if col in column:

    return df.col

  else:

     df.withColumn("col", lit(null))

spark.udf.register("ColumnChecker",func_udf)

dfg=a.ColumnChecker(a,ref_date)

dfg.show()

this is the code i am running

-werners-
Esteemed Contributor III

Let us make abstraction of what you already have written.

If I understand correctly, you need a certain column for a case statement. But that column may or may not be present, correct?

If so, the proposed approach should work, no need for a function.

If not, can you explain what you try to do?

Because I have the impression that we are not aligned.

cuteabhi32
New Contributor III

yes your understanding is correct i need a column either it may be dynamically created on the fly or pre-existing in the dataframe .if that column is present that function shoud return that column as output if its newly getting created it should return null as value

-werners-
Esteemed Contributor III

ok so let us keep it simple:

df1 = spark.read.format("csv").option("header","true").load("file:///home/cloudera/data/a.csv")
 
if "col" in df1.columns:
 df2 = df1
else:
 df2 = df1.withColumn("col", lit(None).cast(TypeYouWant)

Next you can use your case etc on df2.

cuteabhi32
New Contributor III

Thanks i modified my code as per your suggestion and it worked perfectly Thanks again for all your inputs

dflist= spark.createDataFrame(list(a.columns), "string").toDF("Name")

dfg=dflist.filter(col('name').isin('ref_date')).count()

if dfg==1 :

  a = a.withColumn('ref_date',expr(f"CASE WHEN flag = 3 THEN '{sf_start_dt}' ELSE ref_date END"))

else:

  a = a.withColumn('ref_date',expr(f"CASE WHEN flag = 3 THEN '{sf_start_dt}' END"))

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group