cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Validate a schema of json in column

Braxx
Contributor II

I have a dataframe like below with col2 as key-value pairs. I would like to filter col2 to only the rows with a valid schema.

dfThere could be many of pairs, sometimes less, sometimes more and this is fine as long as the structure is fine. Nulls in col2 are also allowed.

The wrong values are like in case 4 and 5 where one of "name" or "value" is missing or there are lack of parenthesis []

schema:

[
{
"name": "aa",
"value": "abc"
},
{
"name": "bb",
"value": "12"
},
{
"name": "cc",
"value": "3"
}
]

data sample:

col1	col2
1	        [{"name":"aaa","value":"5"},{"name":"bbb","value":"500"},{"name":"ccc","value":"300"}]
2	        [{"name":"aaa","value":"5"},{"name":"bbb","value":"500"}]
3	
4	        {"name":"aaa","value":"5"},{"name":"bbb","value":"500"}
5	        [{"name":"aaa"},{"name":"bbb","value":"500"}]

1 ACCEPTED SOLUTION

Accepted Solutions

Braxx
Contributor II

Have this finally resolved.

Corrupted rows are flagged with 1 and could be then easly filtered out

#define a schema for col2
from pyspark.sql.types import StructType, StructField
json_schema = ArrayType(StructType([StructField("name", StringType(), nullable = True), StructField("value", StringType(), nullable = True)]))
 
# from_json is used to validate if col2 has a valid schema. If yes -> correct_json = col2, if no -> correct_json = null
# null is a default value returned by from_json when a valid json could not be created
# rows with corrupted jsons are flagged with 1 by checking a result before and after validation. If col2 was not null and after a validation become null it means that json is corrupted
df = data\
  .withColumn("correct_json", from_json(col("col2"), json_schema))\
  .withColumn("json_flag", when(col("col2").isNotNull() & col("correct_json").isNull(), 1).otherwise(0))\
  .drop("correct_json")
 
display(df)

View solution in original post

12 REPLIES 12

Kaniz
Community Manager
Community Manager

Hi @Bartosz Wachocki​ , Let's see if your peers in the community have an answer to your question first. Or else I will get back to you soon. Thanks.

-werners-
Esteemed Contributor III

Thanks.

As I read this method requires csv or json as a source and is implemented while reading from file. The schema must be defined on the whole file not a particular column. In my case, I have a dataframe with a json column. So it looks the case is different

-werners-
Esteemed Contributor III

So you have this data already written in parquet?

data are from hive table created by another process. I could have it in parquet if necessary

-werners-
Esteemed Contributor III

Not a literal answer:

https://docs.databricks.com/spark/latest/dataframes-datasets/complex-nested-data.html

https://databricks.com/blog/2017/02/23/working-complex-data-formats-structured-streaming-apache-spar...

You can try to achieve this using a defined schema, and then read the table, or more advanced technique like regex etc.

There are quite some options when working with nested data as you will see.

You could even work with delta lake and check constraints.

(https://docs.microsoft.com/en-us/azure/databricks/delta/delta-constraints)

Braxx
Contributor II

To verify is the structure correct I tried json.load. It should retrive error in case it is wrong or true in case it is correct. Then, I would only filter the True ones. Unfortunatelly I am getting error with the below code, which when solve could work for me.

"TypeError: Column is not iterable"

def is_json(myjson):
  try:
    for x in myjson:
      json.loads(x)
  except ValueError as e:
    return False
  return True
 
df = data\
  .withColumn("Col3", is_json(col("Col2")))\
  .filter(col("Col3") == True)

-werners-
Esteemed Contributor III

you want to loop over a DF-column, that is the issue as Column is not iterable.

if you give the column name instead of the column itself it should work.

But json.loads might not know what to do but then you could also add the dataframe as a function parameter.

Braxx
Contributor II

Thanks. Not really understand what you mean by "add the dataframe as a function parameter". Were you able to draft? I could then test it?

-werners-
Esteemed Contributor III

What I was trying to say is that json.loads will not work on a spark column (I was not very clear).

It is not just a list of values. So the for-loop will not work.

Instead you should use a spark function to check the validity of the json string, f.e. to_json.

And what I mean by passing in the df as a function parameter is just def is_json(df, ...).

It is sometimes necessary to work with column names (so not the column itself but only the name) and also with the col itself (so the actual df-column with the values).

If that is the case you also have to put the DF into the picture.

Braxx
Contributor II

Have this finally resolved.

Corrupted rows are flagged with 1 and could be then easly filtered out

#define a schema for col2
from pyspark.sql.types import StructType, StructField
json_schema = ArrayType(StructType([StructField("name", StringType(), nullable = True), StructField("value", StringType(), nullable = True)]))
 
# from_json is used to validate if col2 has a valid schema. If yes -> correct_json = col2, if no -> correct_json = null
# null is a default value returned by from_json when a valid json could not be created
# rows with corrupted jsons are flagged with 1 by checking a result before and after validation. If col2 was not null and after a validation become null it means that json is corrupted
df = data\
  .withColumn("correct_json", from_json(col("col2"), json_schema))\
  .withColumn("json_flag", when(col("col2").isNotNull() & col("correct_json").isNull(), 1).otherwise(0))\
  .drop("correct_json")
 
display(df)

Anonymous
Not applicable

@Bartosz Wachocki​ - Thank you for sharing your solution and marking it as best.

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.