################################################ New Code as per UC ################################################
def parse_json(df, *cols, clean=True๐
res = df
for i in cols:
if clean:
res = (
res.withColumn(
i,
F.concat(F.lit('{"data": '), i, F.lit('}'))
)
)
data_str = res.limit(100).collect()[0].asDict()[i]
schema = spark.sql(f"select schema_of_json('{data_str}')").collect()[0][0]
res = res.withColumn(i, F.from_json(F.col(i), schema))
if clean:
res = res.withColumn(i, F.col(i).data)
return res
################################################ OLD Code as per Non-UC ################################################
# def parse_json(df, *cols, clean=True):
# res = df
# for i in cols:
# if clean:
# res = (
# res.withColumn(
# i,
# F.concat(F.lit('{"data": '), i, F.lit('}'))
# )
# )
# schema = spark.read.json(res.rdd.map(lambda x: x[i])).schema
# res = res.withColumn(i, F.from_json(F.col(i), schema))
# if clean:
# res = res.withColumn(i, F.col(i).data)
# return res
previously we were using rdd.flatmap , but as rdd.flatmap doesnt supports in UC. so we modified the code and now new code we have put limit as 100 to read json data as if i dont put it throws memory error. and if i put limit many elements we get as null which shouldnt happen . its nested JSON. pls help