โ06-06-2022 08:17 AM
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import *
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf
df1 = spark.read.format("csv").option("header","true").load("file:///home/cloudera/data/a.csv")
def func_udf(df,col):
column = list(df.columns)
if (col in column):
return df.col
else:
return NULL
spark.udf.register("columncheck",func_udf)
resultdf=df1.withColumn("ref_date",expr("CASE WHEN Flag = 3 THEN '06MAY2022' ELSE columncheck(df1,ref_date) END"))
resultdf.show()
this is the code i am trying to check if a column exist in a dataframe or not if not then i have to give NULL if yes then i need to give the column itself by using UDF and its throwing error for udf not sure what i am doing wrong.kindly help how to resolve error as resultdf dataframe is throwing below error
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/local/spark/python/pyspark/sql/dataframe.py", line 1849, in withColumn
return DataFrame(self._jdf.withColumn(colName, col._jc), self.sql_ctx)
File "/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
File "/usr/local/spark/python/pyspark/sql/utils.py", line 69, in deco
raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: "cannot resolve '`df1`' given input columns: [ref_date, Salary_qtr4, Salary_qtr3, Flag, Name, Salary_qtr1, Salary, Std, Salary_qtr2, Id, Mean]; line 1 pos 53;\n'Project [Id#10, Name#11, Salary#12, CASE WHEN (cast(Flag#20 as int) = 3) THEN 06MAY2022 ELSE 'columncheck('df1, ref_date#13) END AS ref_date#35, Salary_qtr1#14, Salary_qtr2#15, Salary_qtr3#16, Salary_qtr4#17, Mean#18, Std#19, Flag#20]\n+- AnalysisBarrier\n +- Relation
โ06-07-2022 06:32 AM
ok so let us keep it simple:
df1 = spark.read.format("csv").option("header","true").load("file:///home/cloudera/data/a.csv")
if "col" in df1.columns:
df2 = df1
else:
df2 = df1.withColumn("col", lit(None).cast(TypeYouWant)
Next you can use your case etc on df2.
โ06-07-2022 01:18 AM
try to return a column with content = null instead of plain null.
So:
if (col in column):
return df
else:
df.withColumn("col", lit(null))
โ06-07-2022 01:48 AM
Hi Werners thanks for the input but still its throwing error
AnalysisException: Column 'a' does not exist. Did you mean one of the following? [a.id, a.std, a.Dept, a.flag, a.mean, a.salary, a.neg_std, a.new_var, a.ref_date, a.mean20perc, a.neg_mean20perc]; line 1 pos 57;
this is my updated code i am using in case statement .Kindly help
def func_udf(df,col):
column =list(df.columns)
for col in column:
return col
else:
lit(NULL)
spark.udf.register("ColumnChecker",func_udf)
a = a.withColumn('ref_date',expr(f"CASE WHEN flag = 3 THEN '{sf_start_dt}' ELSE ColumnChecker(a,ref_date) END"))
โ06-07-2022 02:19 AM
ok so you have this columns list (df.columns).
If you then do:
if "columnName" in columns: return df
else return df.withColumn("col", lit(null))
You do not need to loop.
In your loop you return col, not a df. You want to return the complete dataframe with an optional extra column,
โ06-07-2022 03:58 AM
from pyspark.sql.functions import *
from pyspark.sql.functions import col
import pyspark.sql.functions as F
from datetime import date,datetime
import time
from dateutil.relativedelta import relativedelta
from dateutil import parser
from pyspark.sql.window import Window
from pyspark.sql.types import *
import locale
# ---- merge statements ----
df1 = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/shared_uploads/a.csv")
df1.show()
df2 = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/shared_uploads/b.csv")
df2.show()
# ---- merge statements ----
sf_start_dt = '02MAY2022'
a=df1
a = a.filter(col("ref_date") == f"{sf_start_dt}")
a = a.drop('name')
a = a.select('id','salary','ref_date','std','mean',)
b = df2
a.createOrReplaceTempView("a")
b.createOrReplaceTempView("b")
a = a.join(b,'id',"outer")
df1 = spark.sql("select tbl1.id,(select 1) as tempCol1 from a tbl1 inner join b tbl2 on tbl1.id = tbl2.id")
df2 = spark.sql("select tbl1.id,(select 2) as tempCol2 from a tbl1 left join b tbl2 on tbl1.id = tbl2.id where tbl2.id is null")
df3 = spark.sql("select tbl1.id,(select 3) as tempCol3 from b tbl1 left join a tbl2 on tbl1.id = tbl2.id where tbl2.id is null")
a = a.join(df1,'id',"outer").join(df2,'id',"outer").join(df3,'id',"outer")
a = a.na.fill(0,'tempCol1')
a = a.na.fill(0,'tempCol2')
a = a.na.fill(0,'tempCol3')
a = a.withColumn('flag', coalesce(col('tempCol1')+col('tempCol2')+col('tempCol3')) )
a = a.drop('tempCol1')
a = a.drop('tempCol2')
a = a.drop('tempCol3')
a = a\
.withColumn("neg_std",F.expr(f"(std*(-1))"))
a = a\
.withColumn("mean20perc",F.expr(f"(0.20*mean)"))
a = a\
.withColumn("neg_mean20perc",F.expr(f"(mean20perc*(-1))"))
a = a\
.withColumn("new_var",F.expr(f"'{sf_start_dt}'"))
columnsToDrop = []
selectClause = ''
a.createOrReplaceTempView("a")
a = spark.sql("select * from a")
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf
column =list(a.columns)
print(column)
def func_udf(df,col):
column =list(df.columns)
if col in column:
return df
else:
df.withColumn("col", lit(null))
spark.udf.register("ColumnChecker",func_udf)
a = a.withColumn('ref_date',expr(f"CASE WHEN flag = 3 THEN '{sf_start_dt}' ELSE ColumnChecker(a,ref_date) END"))
a = a.withColumn('balance',expr(f"CASE WHEN flag = 3 THEN 0 END"))
a.show()
a = a.withColumn('new_col',expr(f"CASE WHEN flag = 3 THEN '{sf_start_dt}' END"))
a.show()
work_ppcin_bal2_2019_1 = a
work_ppcin_bal2_2019_1.show()
# ---- end of merge statements ----
this is the full fledge code
โ06-07-2022 03:59 AM
please find attached the files
โ06-07-2022 04:07 AM
Why don't you first make sure the column is present in your df, and then use the CASE?
f.e. df2 = func_udf(df1, col) #add missing col
df2 = df.withColumn( CASE...)
โ06-07-2022 04:54 AM
AttributeError: 'DataFrame' object has no attribute 'ColumnChecker'
def func_udf(df,col):
column =list(df.columns)
if col in column:
return df.col
else:
df.withColumn("col", lit(null))
spark.udf.register("ColumnChecker",func_udf)
dfg=a.ColumnChecker(a,ref_date)
dfg.show()
this is the code i am running
โ06-07-2022 05:31 AM
Let us make abstraction of what you already have written.
If I understand correctly, you need a certain column for a case statement. But that column may or may not be present, correct?
If so, the proposed approach should work, no need for a function.
If not, can you explain what you try to do?
Because I have the impression that we are not aligned.
โ06-07-2022 05:56 AM
yes your understanding is correct i need a column either it may be dynamically created on the fly or pre-existing in the dataframe .if that column is present that function shoud return that column as output if its newly getting created it should return null as value
โ06-07-2022 06:32 AM
ok so let us keep it simple:
df1 = spark.read.format("csv").option("header","true").load("file:///home/cloudera/data/a.csv")
if "col" in df1.columns:
df2 = df1
else:
df2 = df1.withColumn("col", lit(None).cast(TypeYouWant)
Next you can use your case etc on df2.
โ06-07-2022 07:29 AM
Thanks i modified my code as per your suggestion and it worked perfectly Thanks again for all your inputs
dflist= spark.createDataFrame(list(a.columns), "string").toDF("Name")
dfg=dflist.filter(col('name').isin('ref_date')).count()
if dfg==1 :
a = a.withColumn('ref_date',expr(f"CASE WHEN flag = 3 THEN '{sf_start_dt}' ELSE ref_date END"))
else:
a = a.withColumn('ref_date',expr(f"CASE WHEN flag = 3 THEN '{sf_start_dt}' END"))
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโt want to miss the chance to attend and share knowledge.
If there isnโt a group near you, start one and help create a community that brings people together.
Request a New Group