Hi @SuMiT1 ,
Here's one approach you can use:
1. First, let's create a dataframe with a sample data that matches your table:
from pyspark.sql.functions import from_json, explode, col
from pyspark.sql.types import *
metadata_json = '''
{"BotId":"487d-bff-f3-eb4-a649285","AADTenantId":"46c88-e34-4e4-496-4e5d","BotName":"agent1"}
'''
content_json = '''
[
{
"id": "91c1dc-69b-e11-9c0-feb6d8b07",
"type": "message",
"timestamp": 17589303,
"from": {"id": "a089e3-b4c-0aad-262-51e13eedd", "role": 1},
"channelId": "pudio",
"text": "chenty",
"attachments": [],
"channelData": {
"enableDiagnostics": true,
"testMode": "Text",
"clientActivityID": "tzi5p"
}
},
{
"id": "32e34-817-4b8-8a8-bdc0058fb",
"type": "event",
"timestamp": 1758305,
"from": {"id": "e23b-0ec-3b-926-2cabb5cb3", "role": 0},
"name": "Dynamceived",
"value": {
"steps": ["agent1.topic.Checnty"],
"isFinalPlan": false,
"planIdentifier": "73435b-39-4ceb-a77-b7344f5e8"
}
}
]
'''
df = spark.createDataFrame(
[(metadata_json, content_json)],
["metadata", "content"]
)
2. Next, we will define schema for both JSON string columns:
metadata_schema = StructType([
StructField("BotId", StringType()),
StructField("AADTenantId", StringType()),
StructField("BotName", StringType())
])
content_schema = ArrayType(StructType([
StructField("id", StringType()),
StructField("type", StringType()),
StructField("timestamp", LongType()),
StructField("from", StructType([
StructField("id", StringType()),
StructField("role", IntegerType())
])),
StructField("name", StringType()),
StructField("channelId", StringType()),
StructField("text", StringType()),
StructField("attachments", ArrayType(StringType())),
StructField("replyToId", StringType()),
StructField("value", StructType([
StructField("steps", ArrayType(StringType())),
StructField("isFinalPlan", BooleanType()),
StructField("planIdentifier", StringType()),
StructField("summary", StringType()),
StructField("ask", StringType()),
StructField("stepId", StringType()),
StructField("taskDialogId", StringType()),
StructField("thought", StringType()),
StructField("state", IntegerType()),
StructField("hasRecommendations", BooleanType()),
StructField("type", StringType())
])),
StructField("channelData", StructType([
StructField("enableDiagnostics", BooleanType()),
StructField("testMode", StringType()),
StructField("clientActivityID", StringType())
]))
]))
3. Next, we will use parse_json function and we provide as argument to that function or json string columns along with schemas we defined at step above:
df_parsed = (
df
.withColumn("metadata_parsed", from_json(col("metadata"), metadata_schema))
.withColumn("content_parsed", from_json(col("content"), content_schema))
.withColumn("content_exploded", explode(col("content_parsed")))
)
df_flat = (
df_parsed
.select(
col("metadata_parsed.BotId").alias("BotId"),
col("metadata_parsed.AADTenantId").alias("AADTenantId"),
col("metadata_parsed.BotName").alias("BotName"),
col("content_exploded.id").alias("ContentId"),
col("content_exploded.type").alias("ContentType"),
col("content_exploded.timestamp").alias("Timestamp"),
col("content_exploded.channelId").alias("ChannelId"),
col("content_exploded.text").alias("Text"),
col("content_exploded.name").alias("EventName"),
col("content_exploded.from.id").alias("FromId"),
col("content_exploded.from.role").alias("FromRole"),
col("content_exploded.value.planIdentifier").alias("PlanIdentifier"),
col("content_exploded.value.isFinalPlan").alias("IsFinalPlan"),
col("content_exploded.value.steps").alias("Steps"),
col("content_exploded.channelData.testMode").alias("TestMode")
)
)
display(df_flat)