- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
11-23-2021 06:50 AM
I have a dataframe like below with col2 as key-value pairs. I would like to filter col2 to only the rows with a valid schema.
There could be many of pairs, sometimes less, sometimes more and this is fine as long as the structure is fine. Nulls in col2 are also allowed.
The wrong values are like in case 4 and 5 where one of "name" or "value" is missing or there are lack of parenthesis []
schema:
[
{
"name": "aa",
"value": "abc"
},
{
"name": "bb",
"value": "12"
},
{
"name": "cc",
"value": "3"
}
]
data sample:
col1 col2
1 [{"name":"aaa","value":"5"},{"name":"bbb","value":"500"},{"name":"ccc","value":"300"}]
2 [{"name":"aaa","value":"5"},{"name":"bbb","value":"500"}]
3
4 {"name":"aaa","value":"5"},{"name":"bbb","value":"500"}
5 [{"name":"aaa"},{"name":"bbb","value":"500"}]
- Labels:
-
Dataframe
Accepted Solutions
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
12-01-2021 02:41 AM
Have this finally resolved.
Corrupted rows are flagged with 1 and could be then easly filtered out
#define a schema for col2
from pyspark.sql.types import StructType, StructField
json_schema = ArrayType(StructType([StructField("name", StringType(), nullable = True), StructField("value", StringType(), nullable = True)]))
# from_json is used to validate if col2 has a valid schema. If yes -> correct_json = col2, if no -> correct_json = null
# null is a default value returned by from_json when a valid json could not be created
# rows with corrupted jsons are flagged with 1 by checking a result before and after validation. If col2 was not null and after a validation become null it means that json is corrupted
df = data\
.withColumn("correct_json", from_json(col("col2"), json_schema))\
.withColumn("json_flag", when(col("col2").isNotNull() & col("correct_json").isNull(), 1).otherwise(0))\
.drop("correct_json")
display(df)
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
11-23-2021 08:12 AM
when reading file sources you can set a 'badrecordspath':
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
11-24-2021 12:53 AM
Thanks.
As I read this method requires csv or json as a source and is implemented while reading from file. The schema must be defined on the whole file not a particular column. In my case, I have a dataframe with a json column. So it looks the case is different
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
11-24-2021 01:04 AM
So you have this data already written in parquet?
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
11-24-2021 01:36 AM
data are from hive table created by another process. I could have it in parquet if necessary
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
11-24-2021 02:25 AM
Not a literal answer:
https://docs.databricks.com/spark/latest/dataframes-datasets/complex-nested-data.html
You can try to achieve this using a defined schema, and then read the table, or more advanced technique like regex etc.
There are quite some options when working with nested data as you will see.
You could even work with delta lake and check constraints.
(https://docs.microsoft.com/en-us/azure/databricks/delta/delta-constraints)
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
11-24-2021 06:12 AM
To verify is the structure correct I tried json.load. It should retrive error in case it is wrong or true in case it is correct. Then, I would only filter the True ones. Unfortunatelly I am getting error with the below code, which when solve could work for me.
"TypeError: Column is not iterable"
def is_json(myjson):
try:
for x in myjson:
json.loads(x)
except ValueError as e:
return False
return True
df = data\
.withColumn("Col3", is_json(col("Col2")))\
.filter(col("Col3") == True)
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
11-24-2021 07:37 AM
you want to loop over a DF-column, that is the issue as Column is not iterable.
if you give the column name instead of the column itself it should work.
But json.loads might not know what to do but then you could also add the dataframe as a function parameter.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
11-24-2021 09:52 AM
Thanks. Not really understand what you mean by "add the dataframe as a function parameter". Were you able to draft? I could then test it?
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
11-25-2021 12:02 AM
What I was trying to say is that json.loads will not work on a spark column (I was not very clear).
It is not just a list of values. So the for-loop will not work.
Instead you should use a spark function to check the validity of the json string, f.e. to_json.
And what I mean by passing in the df as a function parameter is just def is_json(df, ...).
It is sometimes necessary to work with column names (so not the column itself but only the name) and also with the col itself (so the actual df-column with the values).
If that is the case you also have to put the DF into the picture.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
12-01-2021 02:41 AM
Have this finally resolved.
Corrupted rows are flagged with 1 and could be then easly filtered out
#define a schema for col2
from pyspark.sql.types import StructType, StructField
json_schema = ArrayType(StructType([StructField("name", StringType(), nullable = True), StructField("value", StringType(), nullable = True)]))
# from_json is used to validate if col2 has a valid schema. If yes -> correct_json = col2, if no -> correct_json = null
# null is a default value returned by from_json when a valid json could not be created
# rows with corrupted jsons are flagged with 1 by checking a result before and after validation. If col2 was not null and after a validation become null it means that json is corrupted
df = data\
.withColumn("correct_json", from_json(col("col2"), json_schema))\
.withColumn("json_flag", when(col("col2").isNotNull() & col("correct_json").isNull(), 1).otherwise(0))\
.drop("correct_json")
display(df)

- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
12-01-2021 08:41 AM
@Bartosz Wachocki - Thank you for sharing your solution and marking it as best.

