cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Flattening the json in databricks

SuMiT1
New Contributor II

I have chatbot data  I read adls json file in databricks and i stored the output in dataframe

In that table two columns contains json data but the data type is string

1.content

2.metadata

 

Now i have to flatten the.data but i am not getting how to do that

Sample data 

1.Metadat

{"BotId":"487d-bff-f3-eb4-a649285","AADTenantId":"46c88-e34-4e4-496-4e5d","BotName":"agent1"}

2.content

{ "id": "91c1dc-69b-e11-9c0-feb6d8b07", "type": "message", "timestamp": 17589303, "from": { "id": "a089e3-b4c-0aad-262-51e13eedd", "role": 1 }, "channelId": "pudio", "textFormat": "plain", "text": "chenty", "attachments": [ ], "channelData": { "attachmentSizes": [ ], "enableDiagnostics": true, "testMode": "Text", "clientActivityID": "tzi5p" } }, { "valueType": "DynamicPceived", "id": "32e34-817-4b8-8a8-bdc0058fb", "type": "event", "timestamp": 1758305, "from": { "id": "e23b-0ec-3b-926-2cabb5cb3", "role": 0 }, "name": "Dynamceived", "channelId": "pvdio", "attachments": [ ], "replyToId": "91cdc-9b-41-00-febd8b07", "value": { "steps": [ "agent1.topic.Checnty" ], "isFinalPlan": false, "planIdentifier": "73435b-39-4ceb-a77-b7344f5e8" } }, { "id": "23bd3b-80d-421-62-14f86ffe1", "type": "event", "timestamp": 17584305, "from": { "id": "e2348b-0ec-3b4-92b6-bd6bb5cb3", "role": 0 }, "name": "DynamdDebug", "channelId": "pudio", "attachments": [ ], "replyToId": "91c1dc-6b9b-4e11-9c00-fe8b07", "value": { "summary": "", "ask": "chenty", "planIdentifier": "7348a-3589-4ceb-a747-b7315e8", "isFinalPlan": false } }, { "valueType": "Dynam\ered", "id": "6e3e4-db12-414b-ae37-9f496f3", "type": "event", "timestamp": 1758305, "from": { "id": "e28b-0e3c-3b41-92b6-2ca5cb3", "role": 0 }, "name": "Dynered", "channelId": "pvio", "attachments": [ ], "replyToId": "91dc-6b9b-4e11-9c00-feb636b07", "value": { "planIdentifier": "734b8a-3589-4ceb-a747-b7312f5e8", "stepId": "6418-3656-4e70-aba7-ca93dd4", "taskDialogId": "agent1.topic.Chenty", "thought": "This action ne.", "state": 1, "hasRecommendations": false, "type": "CustomTopic" } },

 

5 REPLIES 5

szymon_dybczak
Esteemed Contributor III

Hi @SuMiT1 ,

Here's one approach you can use:

1. First, let's create a dataframe with a sample data that matches your table:

from pyspark.sql.functions import from_json, explode, col
from pyspark.sql.types import *


metadata_json = '''
{"BotId":"487d-bff-f3-eb4-a649285","AADTenantId":"46c88-e34-4e4-496-4e5d","BotName":"agent1"}
'''

content_json = '''
[
  {
    "id": "91c1dc-69b-e11-9c0-feb6d8b07",
    "type": "message",
    "timestamp": 17589303,
    "from": {"id": "a089e3-b4c-0aad-262-51e13eedd", "role": 1},
    "channelId": "pudio",
    "text": "chenty",
    "attachments": [],
    "channelData": {
      "enableDiagnostics": true,
      "testMode": "Text",
      "clientActivityID": "tzi5p"
    }
  },
  {
    "id": "32e34-817-4b8-8a8-bdc0058fb",
    "type": "event",
    "timestamp": 1758305,
    "from": {"id": "e23b-0ec-3b-926-2cabb5cb3", "role": 0},
    "name": "Dynamceived",
    "value": {
      "steps": ["agent1.topic.Checnty"],
      "isFinalPlan": false,
      "planIdentifier": "73435b-39-4ceb-a77-b7344f5e8"
    }
  }
]
'''

df = spark.createDataFrame(
    [(metadata_json, content_json)],
    ["metadata", "content"]
)

2. Next, we will define schema for both JSON string columns:

metadata_schema = StructType([
    StructField("BotId", StringType()),
    StructField("AADTenantId", StringType()),
    StructField("BotName", StringType())
])

content_schema = ArrayType(StructType([
    StructField("id", StringType()),
    StructField("type", StringType()),
    StructField("timestamp", LongType()),
    StructField("from", StructType([
        StructField("id", StringType()),
        StructField("role", IntegerType())
    ])),
    StructField("name", StringType()),
    StructField("channelId", StringType()),
    StructField("text", StringType()),
    StructField("attachments", ArrayType(StringType())),
    StructField("replyToId", StringType()),
    StructField("value", StructType([
        StructField("steps", ArrayType(StringType())),
        StructField("isFinalPlan", BooleanType()),
        StructField("planIdentifier", StringType()),
        StructField("summary", StringType()),
        StructField("ask", StringType()),
        StructField("stepId", StringType()),
        StructField("taskDialogId", StringType()),
        StructField("thought", StringType()),
        StructField("state", IntegerType()),
        StructField("hasRecommendations", BooleanType()),
        StructField("type", StringType())
    ])),
    StructField("channelData", StructType([
        StructField("enableDiagnostics", BooleanType()),
        StructField("testMode", StringType()),
        StructField("clientActivityID", StringType())
    ]))
]))

 

3. Next, we will use parse_json function and we provide as argument to that function or json string columns along with schemas we defined at step above:

df_parsed = (
    df
    .withColumn("metadata_parsed", from_json(col("metadata"), metadata_schema))
    .withColumn("content_parsed", from_json(col("content"), content_schema))
    .withColumn("content_exploded", explode(col("content_parsed")))
)

df_flat = (
    df_parsed
    .select(
        col("metadata_parsed.BotId").alias("BotId"),
        col("metadata_parsed.AADTenantId").alias("AADTenantId"),
        col("metadata_parsed.BotName").alias("BotName"),
        col("content_exploded.id").alias("ContentId"),
        col("content_exploded.type").alias("ContentType"),
        col("content_exploded.timestamp").alias("Timestamp"),
        col("content_exploded.channelId").alias("ChannelId"),
        col("content_exploded.text").alias("Text"),
        col("content_exploded.name").alias("EventName"),
        col("content_exploded.from.id").alias("FromId"),
        col("content_exploded.from.role").alias("FromRole"),
        col("content_exploded.value.planIdentifier").alias("PlanIdentifier"),
        col("content_exploded.value.isFinalPlan").alias("IsFinalPlan"),
        col("content_exploded.value.steps").alias("Steps"),
        col("content_exploded.channelData.testMode").alias("TestMode")
    )
)

display(df_flat)

Hi @szymon_dybczak 

I already read the conversationtranscript.json file in databricks dataframe df

Below are the columns that I have in that dataframe

bot_conversationtranscriptid:string bot_conversationtranscriptidname:string content:string conversationstarttime:string conversationtranscriptid:string createdby:string createdbyname:string createdbyyominame:string createdon:string createdonbehalfby:string createdonbehalfbyname:string createdonbehalfbyyominame:string importsequencenumber:string metadata:string modifiedby:string modifiedbyname:string modifiedbyyominame:string modifiedon:string modifiedonbehalfby:string modifiedonbehalfbyname:string modifiedonbehalfbyyominame:string name:string

 

Only the content and metadata contains the json value

Can you please tell me the code to flatten the json columns that I have to do?

 

 

szymon_dybczak
Esteemed Contributor III

At above answer I showed you one way to flatten the data. You can use from_json function to achieve what you want.

SuMiT1
New Contributor II

Hi @szymon_dybczak 

I gave the wrong content json value

Here is the updated one could you please tell me the code for this it would be helpful for me you gave the code already but i am getting confused so please tell me for this 

{ "activities": [ { "valueType": "ConversationInfo", "type": "trace", "timestamp": 1758964295, "from": { "id": "", "role": 0 }, "value": { "lastSessionOutcome": "Abandoned", "lastSessionOutcomeReason": "UserExit", "isDesignMode": true, "locale": "en-US" } }, { "id": "a96558ddb", "type": "event", "timestamp": 1758964295, "from": { "id": "a0899dd8e7", "role": 1 }, "name": "startConversation", "channelId": "pio", "attachments": [ ] }, { "valueType": "VariableAssignment", "id": "7ce594904b4af48", "type": "trace", "timestamp": 1758964296, "from": { "id": "e234528bb5cb3", "role": 0 }, "value": { "name": "LoggedInUserEmail", "id": "Global.LoggedInUserEmail", "newValue": "ma251@xyz.com", "type": "global" } }, { "id": "291fb2589935", "type": "event", "timestamp": 1758964296, "from": { "id": "e23d6bb5cb3", "role": 0 }, "name": "DialogTracing", "channelId": "pio", "attachments": [ ], "replyToId": "a28ddb", "value": { "actions": [ { "actionId": "setVariable_HL0GLM", "topicId": "agent1.topic.ConversationStart", "triggerId": "main", "dialogComponentId": "045ba8ea", "actionType": "SetVariable", "conditionItemExit": [ ], "variableState": { "dialogState": { }, "globalState": { "LoggedInUserEmail": "ma251@xyz.com" } }, "exception": "", "resultTrace": { } } ] } }, { "id": "54d086b8b1", "type": "message", "timestamp": 1758964296, "from": { "id": "e234525cb3", "role": 0 }, "channelId": "pio", "textFormat": "markdown", "text": "Hello virtual assistant. Just so you are aware, I sometimes use AI to answer your questions. If you provided a website during creation, try asking me about it! Next try giving me some more knowledge by setting up generative AI.", "attachments": [ ], "replyToId": "a96556b628ddb", "speak": "Hello and thank yoPlease note that some responses are generated by AI and may require verification for accuracy. How may I help you today?", "channelData": { "feedbackLoop": { "type": "default" } } }, { "id": "c2e9a0f2710f6", "type": "event", "timestamp": 1758964296, "from": { "id": "e23456bb5cb3", "role": 0 }, "name": "DialogTracing", "channelId": "pio", "attachments": [ ], "replyToId": "a98ddb", "value": { "actions": [ { "actionId": "sendMessage_M0LuhV", "topicId": "agent1.topic.ConversationStart", "triggerId": "main", "dialogComponentId": "045ba8ea", "actionType": "SendActivity", "conditionItemExit": [ ], "variableState": { "dialogState": { }, "globalState": { } }, "exception": "", "resultTrace": { } } ] } }, { "id": "918b07", "type": "message", "timestamp": 1758964303, "from": { "id": "a088e7", "role": 1 }, "channelId": "pio", "textFormat": "plain", "text": "check warranty", "attachments": [ ], "channelData": { "attachmentSizes": [ ], "enableDiagnostics": true, "testMode": "Text", "clientActivityID": "tziwgv745p" } }, { "valueType": "DynamicPlanReceived", "id": "32e20058fb", "type": "event", "timestamp": 1758964305, "from": { "id": "e23456bb5cb3", "role": 0 }, "name": "DynamicPlanReceived", "channelId": "pio", "attachments": [ ], "replyToId": "91cae4d8b07", "value": { "steps": [ "agent1.topic.Checkwarranty" ], "isFinalPlan": false, "planIdentifier": "73435b4f5e8" } }, { "id": "236ffe1", "type": "event", "timestamp": 1758964305, "from": { "id": "e23452cb3", "role": 0 }, "name": "DynamicPlanReceivedDebug", "channelId": "pio", "attachments": [ ], "replyToId": "91caed8b07", "value": { "summary": "", "ask": "check warranty", "planIdentifier": "7343544f5e8", "isFinalPlan": false } },

 

szymon_dybczak
Esteemed Contributor III

Hi @SuMiT1 ,

So you can try below approach:

1. First prepare sample data that reflects your json content

from pyspark.sql.functions import col, explode, from_json, get_json_object
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, IntegerType


sample_data = [
    {
        "conversation_id": "conv_001",
        "session_id": "sess_12345",
        "user_id": "user_001",
        "timestamp": "2024-10-09T10:30:00",
        "content": '{"activities": [{"valueType": "ConversationInfo", "type": "trace", "timestamp": 1758964295, "from": {"id": "", "role": 0}, "value": {"lastSessionOutcome": "Abandoned", "lastSessionOutcomeReason": "UserExit", "isDesignMode": true, "locale": "en-US"}}, {"id": "a96558ddb", "type": "event", "timestamp": 1758964295, "from": {"id": "a0899dd8e7", "role": 1}, "name": "startConversation", "channelId": "pio"}]}',
        "metadata": '{"source": "web", "channel": "chat", "region": "US", "deviceType": "desktop", "browserInfo": {"name": "Chrome", "version": "120"}}'
    },
    {
        "conversation_id": "conv_002",
        "session_id": "sess_67890",
        "user_id": "user_002",
        "timestamp": "2024-10-09T11:15:00",
        "content": '{"activities": [{"valueType": "VariableAssignment", "id": "7ce594904b4af48", "type": "trace", "timestamp": 1758964296, "from": {"id": "e234528bb5cb3", "role": 0}, "value": {"name": "LoggedInUserEmail", "id": "Global.LoggedInUserEmail", "newValue": "test@xyz.com", "type": "global"}}]}',
        "metadata": '{"source": "mobile", "channel": "app", "region": "EU", "deviceType": "smartphone", "appVersion": "2.5.1"}'
    }
]

df = spark.createDataFrame(sample_data)


2. Define schemas - needed for from_json function:

activity_schema = StructType([
    StructField("valueType", StringType(), True),
    StructField("id", StringType(), True),
    StructField("type", StringType(), True),
    StructField("timestamp", IntegerType(), True),
    StructField("from", StructType([
        StructField("id", StringType(), True),
        StructField("role", IntegerType(), True)
    ]), True),
    StructField("name", StringType(), True),
    StructField("channelId", StringType(), True),
    StructField("text", StringType(), True),
    StructField("value", StringType(), True)  #This can be parser further if you want
])

content_schema = StructType([
    StructField("activities", ArrayType(activity_schema), True)
])


metadata_schema = StructType([
    StructField("source", StringType(), True),
    StructField("channel", StringType(), True),
    StructField("region", StringType(), True),
    StructField("deviceType", StringType(), True),
    StructField("browserInfo", StructType([
        StructField("name", StringType(), True),
        StructField("version", StringType(), True)
    ]), True),
    StructField("appVersion", StringType(), True)
])

3. Parse data using from_json function and schemas defined at step above:

df_complete = (df 
    .withColumn("content_parsed", from_json(col("content"), content_schema)) \
    .withColumn("metadata_parsed", from_json(col("metadata"), metadata_schema)) \
    .withColumn("activity", explode(col("content_parsed.activities"))) \
    .select(
        
        col("conversation_id"),
        col("session_id"),
        col("user_id"),
        col("timestamp").alias("conversation_timestamp"),
        
        # Metadata columns
        col("metadata_parsed.source").alias("meta_source"),
        col("metadata_parsed.channel").alias("meta_channel"),
        col("metadata_parsed.region").alias("meta_region"),
        col("metadata_parsed.deviceType").alias("meta_deviceType"),
        col("metadata_parsed.browserInfo.name").alias("meta_browser_name"),
        col("metadata_parsed.browserInfo.version").alias("meta_browser_version"),
        col("metadata_parsed.appVersion").alias("meta_appVersion"),
        
        # Activity columns
        col("activity.valueType").alias("activity_valueType"),
        col("activity.id").alias("activity_id"),
        col("activity.type").alias("activity_type"),
        col("activity.timestamp").alias("activity_timestamp"),
        col("activity.from.id").alias("from_id"),
        col("activity.from.role").alias("from_role"),
        col("activity.name").alias("activity_name"),
        col("activity.channelId").alias("channel_id"),
        col("activity.text").alias("activity_text"),
        col("activity.value").alias("activity_value")
    )
)

 

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now