cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Process single data set with different JSON schema rows using Pyspark in databricks

ahmed_zarar
New Contributor III

 

Hi,
i am getting data from event hub and stored in delta table as a row table, i data i received in json , the problem i data i have different schema in each row but i code i use it take first row a json schema i am stuck how to do please any one guide me

from pyspark.sql.functions import explode, from_json, col,schema_of_json
from pyspark.sql.types import ArrayType, StringType
from pyspark.sql import functions as F
from pyspark.sql.types import *
sample_json = df.select("Value").first()[0]
json_schema = schema_of_json(sample_json)

print(json_schema)
value_df = (
    df.select(
        from_json(col("Value").cast("string"), json_schema).alias("analytics")
    )
    .select(explode(col("analytics")).alias("Value"))

).select("Value.customer_id")
value_df.display()

 

ahmed_zarar_0-1722683168135.png

 

1 ACCEPTED SOLUTION

Accepted Solutions

ahmed_zarar
New Contributor III

Thank you , I got it.

View solution in original post

2 REPLIES 2

Edthehead
Contributor II

Since each row has a different schema, if you plan to store these as separate columns, you cannot do it in 1 Delta table. You will need to split the messages into multiple tables depending on the some information within the message. In a real scenario, different schema messages should be sent to different partitions or completely different eventhubs so it is easy to segregate. However since you have all the messages in 1 eventhub, you will need to validate each message against a schema. You can of course use spark to do this but you need to tell spark what your expected schemas are and which tables you want to send the different type of messages. 

An example code is shown below. 

from pyspark.sql.types import StructType, StructField

#Define the schemas of the different type of messages you can receive
screen_schema = ArrayType(StructType([
        StructField('current_screen', StringType(), True),
        StructField('sequence', IntegerType(), True),
        StructField('screen_id', StringType(), True)
]))

cust_schema = ArrayType(StructType([
        StructField('customer_id', StringType(), True),
        StructField('session_id', StringType(), True),
        StructField('app_ver', StringType(), True),
        StructField('screen_id', StringType(), True)
]))


#First set
df1 = df\
  .withColumn("json", explode(from_json(col("value"), screen_schema)))

screen_df = df1.select(df1.partition, df1.json.current_screen, df1.json.sequence, df1.json.screen_id).where(df1.json.current_screen.isNotNull() & df1.json.sequence.isNotNull() & df1.json.screen_id.isNotNull())
screen_df.write.format("delta").mode("append").saveAsTable("Screen_table")

df2 = df\
  .withColumn("json", explode(from_json(col("value"), cust_schema)))

cust_df = df2.select(df2.partition, df2.json.customer_id, df2.json.session_id, df2.json.app_ver, df2.json.screen_id).where(df2.json.customer_id.isNotNull() & df2.json.session_id.isNotNull() & df2.json.session_id.isNotNull() & df2.json.screen_id.isNotNull())
cust_df.write.format("delta").mode("append").saveAsTable("Cust_table")

 

ahmed_zarar
New Contributor III

Thank you , I got it.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group