cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Community Platform Discussions
Connect with fellow community members to discuss general topics related to the Databricks platform, industry trends, and best practices. Share experiences, ask questions, and foster collaboration within the community.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Want to split JSON data into multiple rows

Pravin08
New Contributor III

Hi,

This is my sample JSON data which is generated from api response and it is all coming in a single row. I want to split this in multiple rows and store it in a dataframe.

[{"transaction_id":"F6001EC5-528196D1","corrects_transaction_id":null,"transaction_type":"Call","original_order_id":"F6001EC5-528196D1","advertiser_campaign_id":"9528415", {"transaction_id":"119F26CF-2304AAE6","corrects_transaction_id":null,"transaction_type":"Call","original_order_id":"119F26CF-2304AAE6","advertiser_campaign_id":"9528009"}]

I want to store it in multiple rows like below, both the key and the values. --

{"transaction_id":"F6001EC5-528196D1","corrects_transaction_id":null,"transaction_type":"Call","original_order_id":"F6001EC5-528196D1","advertiser_campaign_id":"9528415"}

{"transaction_id":"119F26CF-2304AAE6","corrects_transaction_id":null,"transaction_type":"Call","original_order_id":"119F26CF-2304AAE6","advertiser_campaign_id":"9528009"}

Appreciate your help.

 

 

 

 

1 ACCEPTED SOLUTION

Accepted Solutions

feiyun0112
Honored Contributor

 

json="""[{"transaction_id":"F6001EC5-528196D1","corrects_transaction_id":null,"transaction_type":"Call","original_order_id":"F6001EC5-528196D1","advertiser_campaign_id":"9528415"}, {"transaction_id":"119F26CF-2304AAE6","corrects_transaction_id":null,"transaction_type":"Call","original_order_id":"119F26CF-2304AAE6","advertiser_campaign_id":"9528009"}]"""

df=spark.createDataFrame([[json]],['json_str'])

from pyspark.sql.types import *
from pyspark.sql import functions as F

json_schema=ArrayType(StructType([ StructField("transaction_id", StringType())]))

df2 = df.withColumn("json",F.explode(F.from_json("json_str",json_schema)))
df2.display()

 

View solution in original post

13 REPLIES 13

feiyun0112
Honored Contributor

 

json="""[{"transaction_id":"F6001EC5-528196D1","corrects_transaction_id":null,"transaction_type":"Call","original_order_id":"F6001EC5-528196D1","advertiser_campaign_id":"9528415"}, {"transaction_id":"119F26CF-2304AAE6","corrects_transaction_id":null,"transaction_type":"Call","original_order_id":"119F26CF-2304AAE6","advertiser_campaign_id":"9528009"}]"""

df=spark.createDataFrame([[json]],['json_str'])

from pyspark.sql.types import *
from pyspark.sql import functions as F

json_schema=ArrayType(StructType([ StructField("transaction_id", StringType())]))

df2 = df.withColumn("json",F.explode(F.from_json("json_str",json_schema)))
df2.display()

 

Pravin08
New Contributor III

Thanks a lot for your prompt response. The solution recommended worked and produced the desired results. Once again, I appreciate your help !!!

Pravin08
New Contributor III

This solution is working fine on this sample data. But when I implement on the actual api response, the dataframe which gets produced has no data even though I am able to print the response.text and see the data.

This is how I am creating the dataframe

response_text = response.text  ; storing the api response in a variable
unparsed_df = spark.createDataFrame([[response_text]],['json_str'])
print (response_text) --- This prints the data

 

 

feiyun0112
Honored Contributor

please give a sample of response_text

Pravin08
New Contributor III

Here' a sample row --

 

[{"transaction_id":"F6001EC5-528196D1","corrects_transaction_id":null,"transaction_type":"Call","original_order_id":"F6001EC5-528196D1","advertiser_campaign_id":"9528415","advertiser_campaign_id_from_network":"9528415","advertiser_campaign_name":"xxxxx","media_type":"Online: Other","call_source_description":"386-749-8349","syndicated_ident":"","promo_line_description":"DAY: OBE xxxx Business Listing","virtual_line_id":"22949255","call_result_description_detail_managed_advertiser":"Profile Promo Number","city":"xxxxx","region":"FL","qualified_regions":"any","repeat_calling_phone_number":"No","calling_phone_number":"xxx-289-xxxx","mobile":"Mobile","duration":1,"connect_duration":0,"ivr_duration":1,"keypresses":"","keypress_1":"","keypress_2":"","keypress_3":"","keypress_4":"","dynamic_number_pool_referrer_search_engine":null,"dynamic_number_pool_referrer_search_keywords_id":"","dynamic_number_pool_referrer_search_keywords":"","dynamic_number_pool_referrer_ad":"","dynamic_number_pool_referrer_ad_id":"","dynamic_number_pool_referrer_ad_group":"","dynamic_number_pool_referrer_ad_group_id":"","dynamic_number_pool_referrer_referrer_campaign":"","dynamic_number_pool_referrer_referrer_campaign_id":"","dynamic_number_pool_referrer_keyword_match_type":null,"dynamic_number_pool_referrer_param1_name":null,"dynamic_number_pool_referrer_param1_value":null,"dynamic_number_pool_referrer_param2_name":null,"dynamic_number_pool_referrer_param2_value":null,"dynamic_number_pool_referrer_param3_name":null,"dynamic_number_pool_referrer_param3_value":null,"dynamic_number_pool_referrer_param4_name":null,"dynamic_number_pool_referrer_param4_value":null,"dynamic_number_pool_referrer_param5_name":null,"dynamic_number_pool_referrer_param5_value":null,"dynamic_number_pool_referrer_param6_name":null,"dynamic_number_pool_referrer_param6_value":null,"dynamic_number_pool_referrer_param7_name":null,"dynamic_number_pool_referrer_param7_value":null,"dynamic_number_pool_referrer_param8_name":null,"dynamic_number_pool_referrer_param8_value":null,"dynamic_number_pool_referrer_param9_name":null,"dynamic_number_pool_referrer_param9_value":null,"dynamic_number_pool_referrer_param10_name":null,"dynamic_number_pool_referrer_param10_value":null,"dynamic_number_pool_referrer_search_type":null,"dynamic_number_pool_pool_type":null,"dynamic_number_pool_id":null,"start_time_local":"2024-02-13 22:21:18 -05:00","start_time_xml":"2024-02-13T22:21:18","start_time_utc":1707880878627,"start_time_network_timezone":"2024-02-13 22:21:18 -05:00","start_time_network_timezone_xml":"2024-02-13T22:21:18","recording":null,"corrected_at":null,"opt_in_SMS":0,"complete_call_id":"F300-1E5E5784C609","transfer_from_type":"Profile Direct","notes":"","verified_zip":"","hangup_cause":"Caller: Hang-up","Answered by Agent":0,"Not Answered by Agent":1,"Answered by Voicemail":0,"Voicemail Left":0,"English Existing Patient":null,"English Prospect":null,"Spanish Existing Patient":null,"Spanish Prospect":null,"Likely New Patient Appointment":0,"Repeat Caller":0,"CWHH Likely Referral":null,"CWHH Ortho Likely Referral":null,"Closed - Converted":null,"CWHH Ortho Call (web page calls)":null,"New Lead":null,"Outreach - Attempted":null,"Outreach - Engaged":null,"Tour Scheduled":null,"PCP Appointment Scheduled":null,"Closed - Not Converted":null,"Destination: Number Not in Service":null,"invoca_id":null,"invoca_detected_destination":null,"gclid":null,"g_cid":null,"wbraid":null,"msclkid":null,"mcid":null,"existing_destination_english":"xxx-506-xxxx","new_destination_spanish":"xxx-388-xxxx","invoca_campaign_name":"xxxxxx","existing_destination_spanish":"xxx-506-xxxx","new_destination_english":"xxx-388-xxxx","call_flow":"GENESYS Daytona: Ormond Beach ENG/SPA Generic Call Menu","direct_dial":null,"timing":"Evergreen","language":"English/Spanish","audience_type":"Prospects","asset_type":"Tracked Number","year":null,"hh_destination":null,"sfcid":null,"report_suite":null,"customer_id":null,"adobe_org_id":null,"landing_page":null,"utm_medium":"Business Listings","utm_source":"GMB","utm_campaign":null,"utm_content":null,"calling_page":null,"cm_mmc":null,"kc":null,"e_dins":null,"state":"xxxxxx","center_entity":"3638","center_code":"OBE","center_name":null,"market":"xxxxx, Daytona","funding_type":"Acquisition","sub_funding_type":"Engagement","brand":"xxxxx Care Center","signal_name":"Not Answered by Agent","signal_partner_unique_id":"","signal_occurred_at":"2024-02-13 22:21:18 -05:00","signal_source":"","revenue":null,"sale_amount":null,"destination_phone_number":""}]

If I pass this sample value and then createdataframe out of it, then the rest of the steps are working fine but then there is no data. Sharing the code snippet below

unparsed_df = spark.createDataFrame([[json]],['json_str'])

unparsed_df.display()

from pyspark.sql.types import *
from pyspark.sql import functions as F
from datetime import datetime

json_schema = ArrayType(
              StructType([ StructField("transaction_id", StringType()),
                           StructField("corrects_transaction_id", StringType()),
                           StructField("transaction_type", StringType()),
                           StructField("original_order_id", StringType()),
                           StructField("advertiser_campaign_id", StringType()),
                           StructField("advertiser_campaign_id_from_network", StringType()),
                           StructField("advertiser_campaign_name", StringType()),
                           StructField("media_type", StringType()),
                           StructField("call_source_description", StringType()),
                           StructField("syndicated_ident", StringType()),
                           StructField("promo_line_description", StringType()),
                           StructField("virtual_line_id", StringType()),
                           StructField("call_result_description_detail_managed_advertiser", StringType()),
                           StructField("city", StringType()),
                           StructField("region", StringType()),
                           StructField("qualified_regions", StringType()),
                           StructField("repeat_calling_phone_number", StringType()),
                           StructField("calling_phone_number", StringType()),
                           StructField("mobile", StringType()),
                           StructField("duration", IntegerType()),
                           StructField("connect_duration", IntegerType()),
                           StructField("ivr_duration",IntegerType()),
                           StructField("keypresses", StringType()),
                           StructField("keypress_1", StringType()),
                           StructField("keypress_2", StringType()),
                           StructField("keypress_3", StringType()),
                           StructField("keypress_4", StringType()),
                           StructField("dynamic_number_pool_referrer_search_engine", StringType()),
                           StructField("dynamic_number_pool_referrer_search_keywords", StringType()),
                           StructField("dynamic_number_pool_referrer_param1_name", StringType()),
                           StructField("dynamic_number_pool_referrer_param1_value", StringType()),
                           StructField("dynamic_number_pool_referrer_param2_name", StringType()),
                           StructField("dynamic_number_pool_referrer_param2_value", StringType()),
                           StructField("dynamic_number_pool_referrer_param3_name", StringType()),
                           StructField("dynamic_number_pool_referrer_param3_value", StringType()),
                           StructField("dynamic_number_pool_referrer_param4_name", StringType()),
                           StructField("dynamic_number_pool_referrer_param4_value", StringType()),
                           StructField("dynamic_number_pool_referrer_param5_name", StringType()),
                           StructField("dynamic_number_pool_referrer_param5_value", StringType()),
                           StructField("dynamic_number_pool_referrer_param6_name", StringType()),
                           StructField("dynamic_number_pool_referrer_param6_value", StringType()),
                           StructField("dynamic_number_pool_referrer_param7_name", StringType()),
                           StructField("dynamic_number_pool_referrer_param7_value", StringType()),
                           StructField("dynamic_number_pool_referrer_param8_name", StringType()),
                           StructField("dynamic_number_pool_referrer_param8_value", StringType()),
                           StructField("dynamic_number_pool_referrer_param9_name", StringType()),
                           StructField("dynamic_number_pool_referrer_param9_value", StringType()),
                           StructField("dynamic_number_pool_referrer_param10_name", StringType()),
                           StructField("dynamic_number_pool_referrer_param10_value", StringType()),
                           StructField("dynamic_number_pool_referrer_search_type", StringType()),
                           StructField("dynamic_number_pool_pool_type", StringType()),
                           StructField("dynamic_number_pool_id", FloatType()),
                           StructField("start_time_local", StringType()),
                           StructField("start_time_xml", StringType()),
                           StructField("start_time_utc", IntegerType()),
                           StructField("start_time_network_timezone", StringType()),
                           StructField("start_time_network_timezone_xml", StringType()),
                           StructField("recording", StringType()),
                           StructField("corrected_at", StringType()),
                           StructField("opt_in_SMS", IntegerType()),
                           StructField("complete_call_id", StringType()),
                           StructField("transfer_from_type", StringType()),
                           StructField("notes", StringType()),
                           StructField("verified_zip", StringType()),
                           StructField("hangup_cause", StringType()),
                           StructField("Answered by Agent", IntegerType()),
                           StructField("Not Answered by Agent", IntegerType()),
                           StructField("Answered by Voicemail", IntegerType()),
                           StructField("Voicemail Left", IntegerType()),
                           StructField("English Existing Patient", FloatType()),
                           StructField("English Prospect", FloatType()),
                           StructField("Spanish Existing Patient", StringType()),
                           StructField("Spanish Prospect", FloatType()),
                           StructField("Likely New Patient Appointment", FloatType()),
                           StructField("Repeat Caller",FloatType()),
                           StructField("invoca_id", StringType()),
                           StructField("invoca_detected_destination", StringType()),
                           StructField("gclid", StringType()),
                           StructField("g_cid", StringType()),
                           StructField("wbraid", StringType()),
                           StructField("msclkid", StringType()),
                           StructField("mcid", StringType()),
                           StructField("existing_destination_english", StringType()),
                           StructField("new_destination_spanish", StringType()),
                           StructField("invoca_campaign_name", StringType()),
                           StructField("existing_destination_spanish", StringType()),
                           StructField("new_destination_english", StringType()),
                           StructField("call_flow", StringType()),
                           StructField("direct_dial", StringType()),
                           StructField("timing", StringType()),
                           StructField("language", StringType()),
                           StructField("audience_type", StringType()),
                           StructField("asset_type", StringType()),
                           StructField("year", StringType()),
                           StructField("hh_destination", StringType()),
                           StructField("sfcid", StringType()),
                           StructField("landing_page", StringType()),
                           StructField("utm_medium", StringType()),
                           StructField("utm_source", StringType()),
                           StructField("utm_campaign", StringType()),
                           StructField("utm_content", StringType()),
                           StructField("calling_page", StringType()),
                           StructField("cm_mmc", StringType()),
                           StructField("kc", StringType()),
                           StructField("e_dins", StringType()),
                           StructField("state", StringType()),
                           StructField("center_entity", StringType()),
                           StructField("center_code", StringType()),
                           StructField("center_name", StringType()),
                           StructField("market", StringType()),
                           StructField("funding_type", StringType()),
                           StructField("sub_funding_type", StringType()),
                           StructField("brand", StringType()),
                           StructField("signal_name", StringType()),
                           StructField("signal_partner_unique_id", StringType()),
                           StructField("signal_occurred_at", StringType()),
                           StructField("signal_source", StringType()),
                           StructField("revenue", StringType()),
                           StructField("sale_amount", StringType()),
                           StructField("destination_phone_number", StringType())                          
                           ]))

invoca_api_transactions_pco_df = unparsed_df.withColumn("filename",F.lit((f"invoca_api_transactions_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"))) \
                                          .withColumn("contents",F.explode(F.from_json("json_str",json_schema))) \
                                          .withColumn("load_ts",F.lit(datetime.now()))
                                         

invoca_api_transactions_pco_df.display()

feiyun0112
Honored Contributor

I test the sample is work fine, please check you your unparsed_df and parsed_df 

Pravin08
New Contributor III

Yeah, I am working on the same. It is not throwing any error but also not producing any data in the dataframe.

unparsed_df is created from the sample json data 

unparsed_df = spark.createDataFrame([[json]],['json_str'])

Pravin08
New Contributor III

Can the admin please delete the post where in I am sharing the sample data. Dont want to share it in the public domain. Admin please help with this.

Pravin08
New Contributor III

Pravin08_0-1709784934763.png

Sharing the code snippet where I am just working on a part of the data with few fields. As you will see that display shows "query returned no results".

feiyun0112
Honored Contributor

feiyun0112_0-1709785558848.png

it's not a valid json 

Pravin08
New Contributor III

Yeah, that was an error while copying the data. I have fixed that and it seems to work but when I am working with the full JSON data and JSON schema as shared earlier , it is not producing any data. Thanks once again for your help.

feiyun0112
Honored Contributor

your data is 

"start_time_utc": 1707880878627,
 
but your schema is 
StructField("start_time_utc", IntegerType()),
 
it's not match,  max of IntegerType  is  2147483647
 
so cannot parse to json

Pravin08
New Contributor III

Yes indeed, it was datatype issue. After changing it to Longtype in the schema definition, it is working now. Thanks once again for all your inputs and time. Much appreciated !!!

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group