I'm tring to build a ETL pipeline in which I'm reading the jsonl files from the azure blob storage, then trying to transform and load it to delta tables in databricks. I have created the below schema for loading my data :
schema = StructType([
StructField("restaurantId", IntegerType(), nullable=False),
StructField("reviewId", IntegerType(), nullable=False),
StructField("text", StringType(), nullable=False),
StructField("rating", DoubleType(), nullable=False),
StructField("publishedAt", TimestampType(), nullable=False),
StructField("_corrupt_record", StringType(), nullable=True)
])
I'm reading the jsonl files from the below code :
df = spark.read \
.option("mode", "PERMISSIVE") \
.option("columnNameOfCorruptRecord", "_corrupt_record") \
.schema(schema) \
.json(raw_file_location).cache()
But I get no results from the below code :
display(df.select(col("_corrupt_record")).where(col("_corrupt_record").isNotNull()))
There are many column which are null in my raw data and these are inserted to the table as null and the _corrupt_record column is null for that case. Please let me know how to resolve this issue
My expectation is to see the corrupt record (the record which does not math the defined schema) populated for the failed records , for this I have also tried SQL queries to manully create the schema and load the data but still doesn't works