cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Receiving Null values from Eventhub streaming.

vkumar
New Contributor

Hi, I am new to PySpark, and facing an issue while consuming data from the Azure eventhub. I am unable to deserialize the consumed data. I see only null values upon deserializing data using the schema. Please find the below schema, eventhub message, and code that I am using to consume data and let me know how can I resolve this issue. Thanks in advance.

Sample Eventhub message:
EventHubOverrideMessage(gtin13=00******0010, sourceId=0******5, lastUpdateTimestamp=2024-07-09T12:45:00.009805, lastUpdatedUser=null, inStore=ModalityOverride(reason=Other, override=true, startDate=2023-10-23T00:00, endDate=null), pickup=ModalityOverride(reason=Other, override=true, startDate=2023-10-23T00:00, endDate=null), delivery=ModalityOverride(reason=Other, override=true, startDate=2023-10-23T00:00, endDate=null), ship=null)

Deserialization Schema: (Tried replacingTimestampType with stringtype)

StructType([
StructField("gtin13", StringType(), True),
StructField("sourceId", StringType(), True),
StructField("lastUpdateTimestamp", TimestampType(), True),
StructField("lastUpdatedUser", StringType(), True),
StructField("inStore", StructType([
StructField("reason", StringType(), True),
StructField("override", BooleanType(), True),
StructField("startDate", TimestampType(), True),
StructField("endDate", TimestampType(), True)
]), True),
StructField("pickup", StructType([
StructField("reason", StringType(), True),
StructField("override", BooleanType(), True),
StructField("startDate", TimestampType(), True),
StructField("endDate", TimestampType(), True)
]), True),
StructField("delivery", StructType([
StructField("reason", StringType(), True),
StructField("override", BooleanType(), True),
StructField("startDate", TimestampType(), True),
StructField("endDate", TimestampType(), True)
]), True),
StructField("ship", StructType([
StructField("reason", StringType(), True),
StructField("override", BooleanType(), True),
StructField("startDate", TimestampType(), True),
StructField("endDate", TimestampType(), True)
]), True)
])

Actual Schema:
{
"title": "schema",
"type": "object",
"properties": {
"gtin13": {
"type": "string",
"pattern": "^[0-9]{13}$"
},
"sourceId": {
"type": "string",
"pattern": "^[0-9]{8}$"
},
"lastUpdateTimestamp": {
"type": "string",
"format": "date-time"
},
"inStore": {
"type": [
"object",
"null"
],
"properties": {
"reason": {
"type": [
"string",
"null"
]
},
"override": {
"type": [
"boolean",
"null"
]
},
"startDate": {
"type": [
"string",
"null"
],
"format": "date-time"
},
"endDate": {
"type": [
"string",
"null"
],
"format": "date-time"
}
},
"required": [
"reason",
"override",
"startDate",
"endDate"
]
},
"pickup": {
"type": [
"object",
"null"
],
"properties": {
"reason": {
"type": "string"
},
"override": {
"type": [
"boolean",
"null"
]
},
"startDate": {
"type": [
"string",
"null"
],
"format": "date-time"
},
"endDate": {
"type": [
"string",
"null"
],
"format": "date-time"
}
},
"required": [
"reason",
"override",
"startDate",
"endDate"
]
},
"delivery": {
"type": [
"object",
"null"
],
"properties": {
"reason": {
"type": [
"string",
"null"
]
},
"override": {
"type": [
"boolean",
"null"
]
},
"startDate": {
"type": [
"string",
"null"
],
"format": "date-time"
},
"endDate": {
"type": [
"string",
"null"
],
"format": "date-time"
}
},
"required": [
"reason",
"override",
"startDate",
"endDate"
]
},
"ship": {
"type": [
"object",
"null"
],
"properties": {
"reason": {
"type": [
"string",
"null"
]
},
"override": {
"type": [
"boolean",
"null"
]
},
"startDate": {
"type": [
"string",
"null"
],
"format": "date-time"
},
"endDate": {
"type": [
"string",
"null"
],
"format": "date-time"
}
},
"required": [
"reason",
"override",
"startDate",
"endDate"
]
}
},
"required": [
"gtin13",
"sourceId",
"lastUpdateTimestamp"
]
}


CODE:
kafkaDF = (
spark.readStream.format("kafka")
.option("subscribe", topic)
.option(
"kafka.bootstrap.servers",
bootstrap_server,
)
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.jaas.config",EVENTHUB_CONNECTION_STRING)
.option("kafka.request.timeout.ms", "60000")
.option("kafka.session.timeout.ms", "30000")
.option("startingOffsets", "earliest")
.option("mode", "PERMISSIVE")
.load()
.withColumn("value", col("value").cast(StringType()))
)

print("Schema of Kafka DataFrame:")
kafkaDF.printSchema()

# Deserialize JSON data
deserialized_stream = kafkaDF.withColumn(
"data", F.from_json(col("value").cast("string"), json_schema)
)
 
parsed_df = deserialized_stream.select("data.*")

# Display the parsed DataFrame
display(deserialized_stream)


PRINT SCHEMA OUTPUT:
Schema of Kafka DataFrame: root |-- key: binary (nullable = true) |-- value: string (nullable = true) |-- topic: string (nullable = true) |-- partition: integer (nullable = true) |-- offset: long (nullable = true) |-- timestamp: timestamp (nullable = true) |-- timestampType: integer (nullable = true)

Deserialized Output:
{"gtin13":null,"sourceId":null,"lastUpdateTimestamp":null,"inStore":null,"pickup":null,"delivery":null,"ship":null}
0 REPLIES 0

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group