Hi Databricks Community,
I am facing an issue while exploding nested JSON data.
In the content column, I have dynamic nested JSON, and I am using the below approach to parse and explode it.
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
col, from_json, explode_outer, when, first, substring_index, to_json
)
from pyspark.sql.types import StructType, StructField, ArrayType, StringType
from pyspark.sql.window import Window
def create_silver_layer_raw(df, json_col="content", metadata_col="metadata"):
try:
sample_json_rdd = df.select(json_col).rdd.map(lambda r: r[json_col])
inferred_schema = spark.read.json(sample_json_rdd).schema
df_parsed = df.withColumn("content_parsed", from_json(col(json_col), inferred_schema))
df_activities = df_parsed.withColumn("activity", explode_outer("content_parsed.activities"))
sample_metadata_rdd = df.select(metadata_col).rdd.map(lambda r: r[metadata_col])
metadata_schema = spark.read.json(sample_metadata_rdd).schema
df_with_metadata = df_activities.withColumn("metadata_parsed", from_json(col(metadata_col), metadata_schema))
sample_value_rdd = df_with_metadata.select("activity.value").rdd.map(lambda r: r["value"])
sample_value_rdd_filtered = sample_value_rdd.filter(lambda x: x is not None and x.strip().startswith("{"))
value_schema = StructType()
if not sample_value_rdd_filtered.isEmpty():
value_schema = spark.read.json(sample_value_rdd_filtered).schema
df_final = df_with_metadata.withColumn("value_parsed", from_json(col("activity.value"), value_schema))
content_schema = StructType([
StructField("body", ArrayType(StructType([
StructField("text", StringType())
])))
])
df_final = df_final.withColumn(
"attachment_content_parsed",
when(
col("activity.attachments").isNotNull() &
col("activity.attachments")[0].content.cast("string").startswith("{"),
from_json(col("activity.attachments")[0].content.cast("string"), content_schema)
).otherwise(None)
)
window_spec = Window.partitionBy("name")
df_final = df_final.withColumn(
"SessionId_calc",
first(
when(col("activity.name") == "startConversation", col("activity.id")),
ignorenulls=True
).over(window_spec)
)
df_silver = df_final.select(
substring_index(col("name"), "_", 1).alias("ConversationId"),
col("SessionId_calc").alias("SessionId"),
col("metadata_parsed.botId").alias("BotId"),
col("metadata_parsed.botName").alias("BotName"),
col("activity.id").alias("ActivityId"),
col("activity.name").alias("ActivityName"),
col("activity.type").alias("ActivityType"),
col("activity.valueType").alias("ActivityValueType"),
col("activity.timestamp").alias("ActivityTimestamp"),
col("activity.channelId").alias("DeploymentInfo"),
col("activity.from.id").alias("FromId"),
col("activity.from.role").alias("FromRole"),
col("activity.text").alias("ActivityText"),
when(
col("attachment_content_parsed").isNotNull(),
col("attachment_content_parsed.body")[0].text
).otherwise(
to_json(col("activity.attachments")[0].content)
).alias("ActivityAttachmentText"),
col("activity.replyToId").alias("ReplyToId"),
col("value_parsed.isDesignMode").alias("IsDesignMode"),
col("value_parsed.id").alias("ValueId"),
col("value_parsed.newValue").alias("newValue"),
col("value_parsed.startTimeUtc").alias("StartTime"),
col("value_parsed.endTimeUtc").alias("EndTime"),
col("value_parsed.outcome").alias("SessionOutcome"),
col("value_parsed.turnCount").alias("TurnCount"),
col("value_parsed.targetDialogId").alias("ToTopicId"),
col("value_parsed.topicDisplayName").alias("TopicName"),
col("value_parsed.targetDialogType").alias("RedirectType"),
col("value_parsed.taskDialogId").alias("TaskDialogId"),
col("value_parsed.actions")[0].topicId.alias("ActionTopicId"),
col("value_parsed.actions")[0].dialogComponentId.alias("ActionTopicGUID"),
col("value_parsed.actions")[0].exception.alias("StackTrace")
)
df_filtered = df_silver.filter(col("BotId") == "48ec1c7d-bbff-fd23-e5b4-a645da519285")
return df_filtered
except Exception as e:
raise Exception(f"Failed to parse JSON: {str(e)}")
df_silver_layer = create_silver_layer_raw(df, json_col="content", metadata_col="metadata")
display(df_silver_layer)
This code was working fine earlier, but after two days it suddenly stopped working and I’m not sure why.
I am now getting the following error:
error - org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 73.0 failed 4 times, most recent failure: Lost task 0.3 in stage 73.0 (TID 226) (10.164.98.36 executor driver): org.apache.spark.api.python.PythonException: 'AttributeError: strip', from /root/.ipykernel/913/command-5241568264767544-3525431851, line 21. Full traceback below:
Has anyone faced this issue or can help me with the correct way to explode dynamic JSON fields?
Thanks!