szymon_dybczak
Esteemed Contributor III

Hi @SuMiT1 ,

Here's one approach you can use:

1. First, let's create a dataframe with a sample data that matches your table:

from pyspark.sql.functions import from_json, explode, col
from pyspark.sql.types import *


metadata_json = '''
{"BotId":"487d-bff-f3-eb4-a649285","AADTenantId":"46c88-e34-4e4-496-4e5d","BotName":"agent1"}
'''

content_json = '''
[
  {
    "id": "91c1dc-69b-e11-9c0-feb6d8b07",
    "type": "message",
    "timestamp": 17589303,
    "from": {"id": "a089e3-b4c-0aad-262-51e13eedd", "role": 1},
    "channelId": "pudio",
    "text": "chenty",
    "attachments": [],
    "channelData": {
      "enableDiagnostics": true,
      "testMode": "Text",
      "clientActivityID": "tzi5p"
    }
  },
  {
    "id": "32e34-817-4b8-8a8-bdc0058fb",
    "type": "event",
    "timestamp": 1758305,
    "from": {"id": "e23b-0ec-3b-926-2cabb5cb3", "role": 0},
    "name": "Dynamceived",
    "value": {
      "steps": ["agent1.topic.Checnty"],
      "isFinalPlan": false,
      "planIdentifier": "73435b-39-4ceb-a77-b7344f5e8"
    }
  }
]
'''

df = spark.createDataFrame(
    [(metadata_json, content_json)],
    ["metadata", "content"]
)

2. Next, we will define schema for both JSON string columns:

metadata_schema = StructType([
    StructField("BotId", StringType()),
    StructField("AADTenantId", StringType()),
    StructField("BotName", StringType())
])

content_schema = ArrayType(StructType([
    StructField("id", StringType()),
    StructField("type", StringType()),
    StructField("timestamp", LongType()),
    StructField("from", StructType([
        StructField("id", StringType()),
        StructField("role", IntegerType())
    ])),
    StructField("name", StringType()),
    StructField("channelId", StringType()),
    StructField("text", StringType()),
    StructField("attachments", ArrayType(StringType())),
    StructField("replyToId", StringType()),
    StructField("value", StructType([
        StructField("steps", ArrayType(StringType())),
        StructField("isFinalPlan", BooleanType()),
        StructField("planIdentifier", StringType()),
        StructField("summary", StringType()),
        StructField("ask", StringType()),
        StructField("stepId", StringType()),
        StructField("taskDialogId", StringType()),
        StructField("thought", StringType()),
        StructField("state", IntegerType()),
        StructField("hasRecommendations", BooleanType()),
        StructField("type", StringType())
    ])),
    StructField("channelData", StructType([
        StructField("enableDiagnostics", BooleanType()),
        StructField("testMode", StringType()),
        StructField("clientActivityID", StringType())
    ]))
]))

 

3. Next, we will use parse_json function and we provide as argument to that function or json string columns along with schemas we defined at step above:

df_parsed = (
    df
    .withColumn("metadata_parsed", from_json(col("metadata"), metadata_schema))
    .withColumn("content_parsed", from_json(col("content"), content_schema))
    .withColumn("content_exploded", explode(col("content_parsed")))
)

df_flat = (
    df_parsed
    .select(
        col("metadata_parsed.BotId").alias("BotId"),
        col("metadata_parsed.AADTenantId").alias("AADTenantId"),
        col("metadata_parsed.BotName").alias("BotName"),
        col("content_exploded.id").alias("ContentId"),
        col("content_exploded.type").alias("ContentType"),
        col("content_exploded.timestamp").alias("Timestamp"),
        col("content_exploded.channelId").alias("ChannelId"),
        col("content_exploded.text").alias("Text"),
        col("content_exploded.name").alias("EventName"),
        col("content_exploded.from.id").alias("FromId"),
        col("content_exploded.from.role").alias("FromRole"),
        col("content_exploded.value.planIdentifier").alias("PlanIdentifier"),
        col("content_exploded.value.isFinalPlan").alias("IsFinalPlan"),
        col("content_exploded.value.steps").alias("Steps"),
        col("content_exploded.channelData.testMode").alias("TestMode")
    )
)

display(df_flat)