cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Receiving Null values from Eventhub streaming.

vkumar
New Contributor

Hi, I am new to PySpark, and facing an issue while consuming data from the Azure eventhub. I am unable to deserialize the consumed data. I see only null values upon deserializing data using the schema. Please find the below schema, eventhub message, and code that I am using to consume data and let me know how can I resolve this issue. Thanks in advance.

Sample Eventhub message:
EventHubOverrideMessage(gtin13=00******0010, sourceId=0******5, lastUpdateTimestamp=2024-07-09T12:45:00.009805, lastUpdatedUser=null, inStore=ModalityOverride(reason=Other, override=true, startDate=2023-10-23T00:00, endDate=null), pickup=ModalityOverride(reason=Other, override=true, startDate=2023-10-23T00:00, endDate=null), delivery=ModalityOverride(reason=Other, override=true, startDate=2023-10-23T00:00, endDate=null), ship=null)

Deserialization Schema: (Tried replacingTimestampType with stringtype)

StructType([
StructField("gtin13", StringType(), True),
StructField("sourceId", StringType(), True),
StructField("lastUpdateTimestamp", TimestampType(), True),
StructField("lastUpdatedUser", StringType(), True),
StructField("inStore", StructType([
StructField("reason", StringType(), True),
StructField("override", BooleanType(), True),
StructField("startDate", TimestampType(), True),
StructField("endDate", TimestampType(), True)
]), True),
StructField("pickup", StructType([
StructField("reason", StringType(), True),
StructField("override", BooleanType(), True),
StructField("startDate", TimestampType(), True),
StructField("endDate", TimestampType(), True)
]), True),
StructField("delivery", StructType([
StructField("reason", StringType(), True),
StructField("override", BooleanType(), True),
StructField("startDate", TimestampType(), True),
StructField("endDate", TimestampType(), True)
]), True),
StructField("ship", StructType([
StructField("reason", StringType(), True),
StructField("override", BooleanType(), True),
StructField("startDate", TimestampType(), True),
StructField("endDate", TimestampType(), True)
]), True)
])

Actual Schema:
{
"title": "schema",
"type": "object",
"properties": {
"gtin13": {
"type": "string",
"pattern": "^[0-9]{13}$"
},
"sourceId": {
"type": "string",
"pattern": "^[0-9]{8}$"
},
"lastUpdateTimestamp": {
"type": "string",
"format": "date-time"
},
"inStore": {
"type": [
"object",
"null"
],
"properties": {
"reason": {
"type": [
"string",
"null"
]
},
"override": {
"type": [
"boolean",
"null"
]
},
"startDate": {
"type": [
"string",
"null"
],
"format": "date-time"
},
"endDate": {
"type": [
"string",
"null"
],
"format": "date-time"
}
},
"required": [
"reason",
"override",
"startDate",
"endDate"
]
},
"pickup": {
"type": [
"object",
"null"
],
"properties": {
"reason": {
"type": "string"
},
"override": {
"type": [
"boolean",
"null"
]
},
"startDate": {
"type": [
"string",
"null"
],
"format": "date-time"
},
"endDate": {
"type": [
"string",
"null"
],
"format": "date-time"
}
},
"required": [
"reason",
"override",
"startDate",
"endDate"
]
},
"delivery": {
"type": [
"object",
"null"
],
"properties": {
"reason": {
"type": [
"string",
"null"
]
},
"override": {
"type": [
"boolean",
"null"
]
},
"startDate": {
"type": [
"string",
"null"
],
"format": "date-time"
},
"endDate": {
"type": [
"string",
"null"
],
"format": "date-time"
}
},
"required": [
"reason",
"override",
"startDate",
"endDate"
]
},
"ship": {
"type": [
"object",
"null"
],
"properties": {
"reason": {
"type": [
"string",
"null"
]
},
"override": {
"type": [
"boolean",
"null"
]
},
"startDate": {
"type": [
"string",
"null"
],
"format": "date-time"
},
"endDate": {
"type": [
"string",
"null"
],
"format": "date-time"
}
},
"required": [
"reason",
"override",
"startDate",
"endDate"
]
}
},
"required": [
"gtin13",
"sourceId",
"lastUpdateTimestamp"
]
}


CODE:
kafkaDF = (
spark.readStream.format("kafka")
.option("subscribe", topic)
.option(
"kafka.bootstrap.servers",
bootstrap_server,
)
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.jaas.config",EVENTHUB_CONNECTION_STRING)
.option("kafka.request.timeout.ms", "60000")
.option("kafka.session.timeout.ms", "30000")
.option("startingOffsets", "earliest")
.option("mode", "PERMISSIVE")
.load()
.withColumn("value", col("value").cast(StringType()))
)

print("Schema of Kafka DataFrame:")
kafkaDF.printSchema()

# Deserialize JSON data
deserialized_stream = kafkaDF.withColumn(
"data", F.from_json(col("value").cast("string"), json_schema)
)
 
parsed_df = deserialized_stream.select("data.*")

# Display the parsed DataFrame
display(deserialized_stream)


PRINT SCHEMA OUTPUT:
Schema of Kafka DataFrame: root |-- key: binary (nullable = true) |-- value: string (nullable = true) |-- topic: string (nullable = true) |-- partition: integer (nullable = true) |-- offset: long (nullable = true) |-- timestamp: timestamp (nullable = true) |-- timestampType: integer (nullable = true)

Deserialized Output:
{"gtin13":null,"sourceId":null,"lastUpdateTimestamp":null,"inStore":null,"pickup":null,"delivery":null,"ship":null}
1 REPLY 1

Kaniz_Fatma
Community Manager
Community Manager

Hi @vkumarThe issue you are facing is likely due to the way the data is being serialized and deserialized in the Azure Event Hub.

  • The sample Event Hub message you provided is in a custom format, which is not the typical JSON format. The message appears to be a custom class called EventHubOverrideMessage with nested objects. This means that the data is not in a simple JSON format, and the default JSON deserialization may not work.
  • Since the data is not in a standard JSON format, you will need to implement custom deserialization to handle the specific structure of the EventHubOverrideMessage class and its nested objects.
  • The sample Event Hub message you provided has some fields that can be null. Make sure your schema and deserialization logic can handle these null values correctly.
  • After implementing the custom deserialization, validate the output of the deserialized_stream DataFrame to ensure that the data is being parsed correctly and that there are no null values or other issues.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group