Options
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
10-09-2025 01:57 AM - edited 10-09-2025 02:01 AM
Hi @SuMiT1 ,
So you can try below approach:
1. First prepare sample data that reflects your json content
from pyspark.sql.functions import col, explode, from_json, get_json_object
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, IntegerType
sample_data = [
{
"conversation_id": "conv_001",
"session_id": "sess_12345",
"user_id": "user_001",
"timestamp": "2024-10-09T10:30:00",
"content": '{"activities": [{"valueType": "ConversationInfo", "type": "trace", "timestamp": 1758964295, "from": {"id": "", "role": 0}, "value": {"lastSessionOutcome": "Abandoned", "lastSessionOutcomeReason": "UserExit", "isDesignMode": true, "locale": "en-US"}}, {"id": "a96558ddb", "type": "event", "timestamp": 1758964295, "from": {"id": "a0899dd8e7", "role": 1}, "name": "startConversation", "channelId": "pio"}]}',
"metadata": '{"source": "web", "channel": "chat", "region": "US", "deviceType": "desktop", "browserInfo": {"name": "Chrome", "version": "120"}}'
},
{
"conversation_id": "conv_002",
"session_id": "sess_67890",
"user_id": "user_002",
"timestamp": "2024-10-09T11:15:00",
"content": '{"activities": [{"valueType": "VariableAssignment", "id": "7ce594904b4af48", "type": "trace", "timestamp": 1758964296, "from": {"id": "e234528bb5cb3", "role": 0}, "value": {"name": "LoggedInUserEmail", "id": "Global.LoggedInUserEmail", "newValue": "test@xyz.com", "type": "global"}}]}',
"metadata": '{"source": "mobile", "channel": "app", "region": "EU", "deviceType": "smartphone", "appVersion": "2.5.1"}'
}
]
df = spark.createDataFrame(sample_data)
2. Define schemas - needed for from_json function:
activity_schema = StructType([
StructField("valueType", StringType(), True),
StructField("id", StringType(), True),
StructField("type", StringType(), True),
StructField("timestamp", IntegerType(), True),
StructField("from", StructType([
StructField("id", StringType(), True),
StructField("role", IntegerType(), True)
]), True),
StructField("name", StringType(), True),
StructField("channelId", StringType(), True),
StructField("text", StringType(), True),
StructField("value", StringType(), True) #This can be parser further if you want
])
content_schema = StructType([
StructField("activities", ArrayType(activity_schema), True)
])
metadata_schema = StructType([
StructField("source", StringType(), True),
StructField("channel", StringType(), True),
StructField("region", StringType(), True),
StructField("deviceType", StringType(), True),
StructField("browserInfo", StructType([
StructField("name", StringType(), True),
StructField("version", StringType(), True)
]), True),
StructField("appVersion", StringType(), True)
])3. Parse data using from_json function and schemas defined at step above:
df_complete = (df
.withColumn("content_parsed", from_json(col("content"), content_schema)) \
.withColumn("metadata_parsed", from_json(col("metadata"), metadata_schema)) \
.withColumn("activity", explode(col("content_parsed.activities"))) \
.select(
col("conversation_id"),
col("session_id"),
col("user_id"),
col("timestamp").alias("conversation_timestamp"),
# Metadata columns
col("metadata_parsed.source").alias("meta_source"),
col("metadata_parsed.channel").alias("meta_channel"),
col("metadata_parsed.region").alias("meta_region"),
col("metadata_parsed.deviceType").alias("meta_deviceType"),
col("metadata_parsed.browserInfo.name").alias("meta_browser_name"),
col("metadata_parsed.browserInfo.version").alias("meta_browser_version"),
col("metadata_parsed.appVersion").alias("meta_appVersion"),
# Activity columns
col("activity.valueType").alias("activity_valueType"),
col("activity.id").alias("activity_id"),
col("activity.type").alias("activity_type"),
col("activity.timestamp").alias("activity_timestamp"),
col("activity.from.id").alias("from_id"),
col("activity.from.role").alias("from_role"),
col("activity.name").alias("activity_name"),
col("activity.channelId").alias("channel_id"),
col("activity.text").alias("activity_text"),
col("activity.value").alias("activity_value")
)
)