cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Databricks Nested Json Flattening

SumitB14
New Contributor

Hi Databricks Community,

I am facing an issue while exploding nested JSON data.

In the content column, I have dynamic nested JSON, and I am using the below approach to parse and explode it.

from pyspark.sql import SparkSession
from pyspark.sql.functions import (
col, from_json, explode_outer, when, first, substring_index, to_json
)
from pyspark.sql.types import StructType, StructField, ArrayType, StringType
from pyspark.sql.window import Window

def create_silver_layer_raw(df, json_col="content", metadata_col="metadata"):
try:
sample_json_rdd = df.select(json_col).rdd.map(lambda r: r[json_col])
inferred_schema = spark.read.json(sample_json_rdd).schema
df_parsed = df.withColumn("content_parsed", from_json(col(json_col), inferred_schema))

df_activities = df_parsed.withColumn("activity", explode_outer("content_parsed.activities"))

sample_metadata_rdd = df.select(metadata_col).rdd.map(lambda r: r[metadata_col])
metadata_schema = spark.read.json(sample_metadata_rdd).schema
df_with_metadata = df_activities.withColumn("metadata_parsed", from_json(col(metadata_col), metadata_schema))

sample_value_rdd = df_with_metadata.select("activity.value").rdd.map(lambda r: r["value"])
sample_value_rdd_filtered = sample_value_rdd.filter(lambda x: x is not None and x.strip().startswith("{"))
value_schema = StructType()
if not sample_value_rdd_filtered.isEmpty():
value_schema = spark.read.json(sample_value_rdd_filtered).schema

df_final = df_with_metadata.withColumn("value_parsed", from_json(col("activity.value"), value_schema))

content_schema = StructType([
StructField("body", ArrayType(StructType([
StructField("text", StringType())
])))
])

df_final = df_final.withColumn(
"attachment_content_parsed",
when(
col("activity.attachments").isNotNull() &
col("activity.attachments")[0].content.cast("string").startswith("{"),
from_json(col("activity.attachments")[0].content.cast("string"), content_schema)
).otherwise(None)
)

window_spec = Window.partitionBy("name")
df_final = df_final.withColumn(
"SessionId_calc",
first(
when(col("activity.name") == "startConversation", col("activity.id")),
ignorenulls=True
).over(window_spec)
)

df_silver = df_final.select(
substring_index(col("name"), "_", 1).alias("ConversationId"),
col("SessionId_calc").alias("SessionId"),

col("metadata_parsed.botId").alias("BotId"),
col("metadata_parsed.botName").alias("BotName"),

col("activity.id").alias("ActivityId"),
col("activity.name").alias("ActivityName"),
col("activity.type").alias("ActivityType"),
col("activity.valueType").alias("ActivityValueType"),
col("activity.timestamp").alias("ActivityTimestamp"),
col("activity.channelId").alias("DeploymentInfo"),
col("activity.from.id").alias("FromId"),
col("activity.from.role").alias("FromRole"),
col("activity.text").alias("ActivityText"),

when(
col("attachment_content_parsed").isNotNull(),
col("attachment_content_parsed.body")[0].text
).otherwise(
to_json(col("activity.attachments")[0].content)
).alias("ActivityAttachmentText"),

col("activity.replyToId").alias("ReplyToId"),

col("value_parsed.isDesignMode").alias("IsDesignMode"),
col("value_parsed.id").alias("ValueId"),
col("value_parsed.newValue").alias("newValue"),
col("value_parsed.startTimeUtc").alias("StartTime"),
col("value_parsed.endTimeUtc").alias("EndTime"),
col("value_parsed.outcome").alias("SessionOutcome"),
col("value_parsed.turnCount").alias("TurnCount"),
col("value_parsed.targetDialogId").alias("ToTopicId"),
col("value_parsed.topicDisplayName").alias("TopicName"),
col("value_parsed.targetDialogType").alias("RedirectType"),
col("value_parsed.taskDialogId").alias("TaskDialogId"),
col("value_parsed.actions")[0].topicId.alias("ActionTopicId"),
col("value_parsed.actions")[0].dialogComponentId.alias("ActionTopicGUID"),
col("value_parsed.actions")[0].exception.alias("StackTrace")
)

df_filtered = df_silver.filter(col("BotId") == "48ec1c7d-bbff-fd23-e5b4-a645da519285")

return df_filtered

except Exception as e:
raise Exception(f"Failed to parse JSON: {str(e)}")


df_silver_layer = create_silver_layer_raw(df, json_col="content", metadata_col="metadata")
display(df_silver_layer)

This code was working fine earlier, but after two days it suddenly stopped working and I’m not sure why.

I am now getting the following error:

error - org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 73.0 failed 4 times, most recent failure: Lost task 0.3 in stage 73.0 (TID 226) (10.164.98.36 executor driver): org.apache.spark.api.python.PythonException: 'AttributeError: strip', from /root/.ipykernel/913/command-5241568264767544-3525431851, line 21. Full traceback below:

Has anyone faced this issue or can help me with the correct way to explode dynamic JSON fields?

Thanks!

1 REPLY 1

mark_ott
Databricks Employee
Databricks Employee

You are encountering an AttributeError related to strip, which likely means that some entries for activity.value are not strings (maybe None or dicts) and your code expects all to be strings before calling .strip(). This kind of problem can arise if the JSON structure or types change unexpectedly, or if null/empty values are introduced in your data.

Problem Analysis

Here’s the problematic line:

python
sample_value_rdd_filtered = sample_value_rdd.filter(lambda x: x is not None and x.strip().startswith("{"))
  • If x is not a string (for example, None, a dictionary, or an integer), x.strip() will raise an AttributeError.

  • If some of your JSON data's activity.value is numeric, boolean, null, or an actual dictionary (not a stringified dictionary), this will break.

Why Did This Suddenly Start?

  • The underlying JSON structure in your data changed.

  • Rows were added with nulls or activity.value in a data type other than string.

  • Upstream process change or ingestion of new, unexpected formats.

Solution Approach

1. Make Your Filter More Robust

Make sure to only attempt .strip() on items that are truly strings:

python
sample_value_rdd_filtered = sample_value_rdd.filter( lambda x: isinstance(x, str) and x.strip().startswith("{") )
  • This guards against None or non-string types and avoids AttributeError.

2. Debug Unexpected Types

To further debug, inspect a few values and their types. For example:

python
# Collect some values and their types for inspection print(sample_value_rdd.take(5)) print([type(x) for x in sample_value_rdd.take(20)])

This will show which values are not strings.

3. Defensive Schema Inference

You may want to relax this logic further by handling actual dicts in addition to JSON strings, to protect future runs.

python
def is_json_string(x): return isinstance(x, str) and x.strip().startswith("{") def is_json_dict(x): return isinstance(x, dict) sample_value_rdd_filtered = sample_value_rdd.filter(lambda x: is_json_string(x) or is_json_dict(x))

4. Update downstream usage

When calling from_json, remember it only works with string columns—if the column contains dicts, you may need to cast or convert accordingly.

Summary of Fix

  • Change your filter to isinstance(x, str) before calling .strip().

  • Consider checking for dicts too, as some Spark operations may auto-convert JSON fields.

  • Check data source for upstream format changes.

Reference Example (Updated Filter)

python
sample_value_rdd_filtered = sample_value_rdd.filter( lambda x: isinstance(x, str) and x.strip().startswith("{") )

Additional Suggestions

  • Log or sample problematic data to ensure the root cause is covered.

  • Write a test or assertion to check value types before Spark runs complex JSON parsing.

  • Add try/except in your filter (with a logger) to avoid silent failures.

This should resolve your error, and help protect against similar issues with dynamic, nested JSON in production pipelines.

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now