06-06-2022 08:17 AM
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import *
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf
df1 = spark.read.format("csv").option("header","true").load("file:///home/cloudera/data/a.csv")
def func_udf(df,col):
column = list(df.columns)
if (col in column):
return df.col
else:
return NULL
spark.udf.register("columncheck",func_udf)
resultdf=df1.withColumn("ref_date",expr("CASE WHEN Flag = 3 THEN '06MAY2022' ELSE columncheck(df1,ref_date) END"))
resultdf.show()
this is the code i am trying to check if a column exist in a dataframe or not if not then i have to give NULL if yes then i need to give the column itself by using UDF and its throwing error for udf not sure what i am doing wrong.kindly help how to resolve error as resultdf dataframe is throwing below error
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/local/spark/python/pyspark/sql/dataframe.py", line 1849, in withColumn
return DataFrame(self._jdf.withColumn(colName, col._jc), self.sql_ctx)
File "/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
File "/usr/local/spark/python/pyspark/sql/utils.py", line 69, in deco
raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: "cannot resolve '`df1`' given input columns: [ref_date, Salary_qtr4, Salary_qtr3, Flag, Name, Salary_qtr1, Salary, Std, Salary_qtr2, Id, Mean]; line 1 pos 53;\n'Project [Id#10, Name#11, Salary#12, CASE WHEN (cast(Flag#20 as int) = 3) THEN 06MAY2022 ELSE 'columncheck('df1, ref_date#13) END AS ref_date#35, Salary_qtr1#14, Salary_qtr2#15, Salary_qtr3#16, Salary_qtr4#17, Mean#18, Std#19, Flag#20]\n+- AnalysisBarrier\n +- Relation
06-07-2022 06:32 AM
ok so let us keep it simple:
df1 = spark.read.format("csv").option("header","true").load("file:///home/cloudera/data/a.csv")
if "col" in df1.columns:
df2 = df1
else:
df2 = df1.withColumn("col", lit(None).cast(TypeYouWant)
Next you can use your case etc on df2.
06-07-2022 01:18 AM
try to return a column with content = null instead of plain null.
So:
if (col in column):
return df
else:
df.withColumn("col", lit(null))
06-07-2022 01:48 AM
Hi Werners thanks for the input but still its throwing error
AnalysisException: Column 'a' does not exist. Did you mean one of the following? [a.id, a.std, a.Dept, a.flag, a.mean, a.salary, a.neg_std, a.new_var, a.ref_date, a.mean20perc, a.neg_mean20perc]; line 1 pos 57;
this is my updated code i am using in case statement .Kindly help
def func_udf(df,col):
column =list(df.columns)
for col in column:
return col
else:
lit(NULL)
spark.udf.register("ColumnChecker",func_udf)
a = a.withColumn('ref_date',expr(f"CASE WHEN flag = 3 THEN '{sf_start_dt}' ELSE ColumnChecker(a,ref_date) END"))
06-07-2022 02:19 AM
ok so you have this columns list (df.columns).
If you then do:
if "columnName" in columns: return df
else return df.withColumn("col", lit(null))
You do not need to loop.
In your loop you return col, not a df. You want to return the complete dataframe with an optional extra column,
06-07-2022 03:58 AM
from pyspark.sql.functions import *
from pyspark.sql.functions import col
import pyspark.sql.functions as F
from datetime import date,datetime
import time
from dateutil.relativedelta import relativedelta
from dateutil import parser
from pyspark.sql.window import Window
from pyspark.sql.types import *
import locale
# ---- merge statements ----
df1 = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/shared_uploads/a.csv")
df1.show()
df2 = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/shared_uploads/b.csv")
df2.show()
# ---- merge statements ----
sf_start_dt = '02MAY2022'
a=df1
a = a.filter(col("ref_date") == f"{sf_start_dt}")
a = a.drop('name')
a = a.select('id','salary','ref_date','std','mean',)
b = df2
a.createOrReplaceTempView("a")
b.createOrReplaceTempView("b")
a = a.join(b,'id',"outer")
df1 = spark.sql("select tbl1.id,(select 1) as tempCol1 from a tbl1 inner join b tbl2 on tbl1.id = tbl2.id")
df2 = spark.sql("select tbl1.id,(select 2) as tempCol2 from a tbl1 left join b tbl2 on tbl1.id = tbl2.id where tbl2.id is null")
df3 = spark.sql("select tbl1.id,(select 3) as tempCol3 from b tbl1 left join a tbl2 on tbl1.id = tbl2.id where tbl2.id is null")
a = a.join(df1,'id',"outer").join(df2,'id',"outer").join(df3,'id',"outer")
a = a.na.fill(0,'tempCol1')
a = a.na.fill(0,'tempCol2')
a = a.na.fill(0,'tempCol3')
a = a.withColumn('flag', coalesce(col('tempCol1')+col('tempCol2')+col('tempCol3')) )
a = a.drop('tempCol1')
a = a.drop('tempCol2')
a = a.drop('tempCol3')
a = a\
.withColumn("neg_std",F.expr(f"(std*(-1))"))
a = a\
.withColumn("mean20perc",F.expr(f"(0.20*mean)"))
a = a\
.withColumn("neg_mean20perc",F.expr(f"(mean20perc*(-1))"))
a = a\
.withColumn("new_var",F.expr(f"'{sf_start_dt}'"))
columnsToDrop = []
selectClause = ''
a.createOrReplaceTempView("a")
a = spark.sql("select * from a")
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf
column =list(a.columns)
print(column)
def func_udf(df,col):
column =list(df.columns)
if col in column:
return df
else:
df.withColumn("col", lit(null))
spark.udf.register("ColumnChecker",func_udf)
a = a.withColumn('ref_date',expr(f"CASE WHEN flag = 3 THEN '{sf_start_dt}' ELSE ColumnChecker(a,ref_date) END"))
a = a.withColumn('balance',expr(f"CASE WHEN flag = 3 THEN 0 END"))
a.show()
a = a.withColumn('new_col',expr(f"CASE WHEN flag = 3 THEN '{sf_start_dt}' END"))
a.show()
work_ppcin_bal2_2019_1 = a
work_ppcin_bal2_2019_1.show()
# ---- end of merge statements ----
this is the full fledge code
06-07-2022 03:59 AM
please find attached the files
06-07-2022 04:07 AM
Why don't you first make sure the column is present in your df, and then use the CASE?
f.e. df2 = func_udf(df1, col) #add missing col
df2 = df.withColumn( CASE...)
06-07-2022 04:54 AM
AttributeError: 'DataFrame' object has no attribute 'ColumnChecker'
def func_udf(df,col):
column =list(df.columns)
if col in column:
return df.col
else:
df.withColumn("col", lit(null))
spark.udf.register("ColumnChecker",func_udf)
dfg=a.ColumnChecker(a,ref_date)
dfg.show()
this is the code i am running
06-07-2022 05:31 AM
Let us make abstraction of what you already have written.
If I understand correctly, you need a certain column for a case statement. But that column may or may not be present, correct?
If so, the proposed approach should work, no need for a function.
If not, can you explain what you try to do?
Because I have the impression that we are not aligned.
06-07-2022 05:56 AM
yes your understanding is correct i need a column either it may be dynamically created on the fly or pre-existing in the dataframe .if that column is present that function shoud return that column as output if its newly getting created it should return null as value
06-07-2022 06:32 AM
ok so let us keep it simple:
df1 = spark.read.format("csv").option("header","true").load("file:///home/cloudera/data/a.csv")
if "col" in df1.columns:
df2 = df1
else:
df2 = df1.withColumn("col", lit(None).cast(TypeYouWant)
Next you can use your case etc on df2.
06-07-2022 07:29 AM
Thanks i modified my code as per your suggestion and it worked perfectly Thanks again for all your inputs
dflist= spark.createDataFrame(list(a.columns), "string").toDF("Name")
dfg=dflist.filter(col('name').isin('ref_date')).count()
if dfg==1 :
a = a.withColumn('ref_date',expr(f"CASE WHEN flag = 3 THEN '{sf_start_dt}' ELSE ref_date END"))
else:
a = a.withColumn('ref_date',expr(f"CASE WHEN flag = 3 THEN '{sf_start_dt}' END"))
Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!
Sign Up Now