szymon_dybczak
Esteemed Contributor III

Hi @SuMiT1 ,

So you can try below approach:

1. First prepare sample data that reflects your json content

from pyspark.sql.functions import col, explode, from_json, get_json_object
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, IntegerType


sample_data = [
    {
        "conversation_id": "conv_001",
        "session_id": "sess_12345",
        "user_id": "user_001",
        "timestamp": "2024-10-09T10:30:00",
        "content": '{"activities": [{"valueType": "ConversationInfo", "type": "trace", "timestamp": 1758964295, "from": {"id": "", "role": 0}, "value": {"lastSessionOutcome": "Abandoned", "lastSessionOutcomeReason": "UserExit", "isDesignMode": true, "locale": "en-US"}}, {"id": "a96558ddb", "type": "event", "timestamp": 1758964295, "from": {"id": "a0899dd8e7", "role": 1}, "name": "startConversation", "channelId": "pio"}]}',
        "metadata": '{"source": "web", "channel": "chat", "region": "US", "deviceType": "desktop", "browserInfo": {"name": "Chrome", "version": "120"}}'
    },
    {
        "conversation_id": "conv_002",
        "session_id": "sess_67890",
        "user_id": "user_002",
        "timestamp": "2024-10-09T11:15:00",
        "content": '{"activities": [{"valueType": "VariableAssignment", "id": "7ce594904b4af48", "type": "trace", "timestamp": 1758964296, "from": {"id": "e234528bb5cb3", "role": 0}, "value": {"name": "LoggedInUserEmail", "id": "Global.LoggedInUserEmail", "newValue": "test@xyz.com", "type": "global"}}]}',
        "metadata": '{"source": "mobile", "channel": "app", "region": "EU", "deviceType": "smartphone", "appVersion": "2.5.1"}'
    }
]

df = spark.createDataFrame(sample_data)


2. Define schemas - needed for from_json function:

activity_schema = StructType([
    StructField("valueType", StringType(), True),
    StructField("id", StringType(), True),
    StructField("type", StringType(), True),
    StructField("timestamp", IntegerType(), True),
    StructField("from", StructType([
        StructField("id", StringType(), True),
        StructField("role", IntegerType(), True)
    ]), True),
    StructField("name", StringType(), True),
    StructField("channelId", StringType(), True),
    StructField("text", StringType(), True),
    StructField("value", StringType(), True)  #This can be parser further if you want
])

content_schema = StructType([
    StructField("activities", ArrayType(activity_schema), True)
])


metadata_schema = StructType([
    StructField("source", StringType(), True),
    StructField("channel", StringType(), True),
    StructField("region", StringType(), True),
    StructField("deviceType", StringType(), True),
    StructField("browserInfo", StructType([
        StructField("name", StringType(), True),
        StructField("version", StringType(), True)
    ]), True),
    StructField("appVersion", StringType(), True)
])

3. Parse data using from_json function and schemas defined at step above:

df_complete = (df 
    .withColumn("content_parsed", from_json(col("content"), content_schema)) \
    .withColumn("metadata_parsed", from_json(col("metadata"), metadata_schema)) \
    .withColumn("activity", explode(col("content_parsed.activities"))) \
    .select(
        
        col("conversation_id"),
        col("session_id"),
        col("user_id"),
        col("timestamp").alias("conversation_timestamp"),
        
        # Metadata columns
        col("metadata_parsed.source").alias("meta_source"),
        col("metadata_parsed.channel").alias("meta_channel"),
        col("metadata_parsed.region").alias("meta_region"),
        col("metadata_parsed.deviceType").alias("meta_deviceType"),
        col("metadata_parsed.browserInfo.name").alias("meta_browser_name"),
        col("metadata_parsed.browserInfo.version").alias("meta_browser_version"),
        col("metadata_parsed.appVersion").alias("meta_appVersion"),
        
        # Activity columns
        col("activity.valueType").alias("activity_valueType"),
        col("activity.id").alias("activity_id"),
        col("activity.type").alias("activity_type"),
        col("activity.timestamp").alias("activity_timestamp"),
        col("activity.from.id").alias("from_id"),
        col("activity.from.role").alias("from_role"),
        col("activity.name").alias("activity_name"),
        col("activity.channelId").alias("channel_id"),
        col("activity.text").alias("activity_text"),
        col("activity.value").alias("activity_value")
    )
)