cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Receiving Null values from Eventhub streaming.

vkumar
New Contributor

Hi, I am new to PySpark, and facing an issue while consuming data from the Azure eventhub. I am unable to deserialize the consumed data. I see only null values upon deserializing data using the schema. Please find the below schema, eventhub message, and code that I am using to consume data and let me know how can I resolve this issue. Thanks in advance.

Sample Eventhub message:
EventHubOverrideMessage(gtin13=00******0010, sourceId=0******5, lastUpdateTimestamp=2024-07-09T12:45:00.009805, lastUpdatedUser=null, inStore=ModalityOverride(reason=Other, override=true, startDate=2023-10-23T00:00, endDate=null), pickup=ModalityOverride(reason=Other, override=true, startDate=2023-10-23T00:00, endDate=null), delivery=ModalityOverride(reason=Other, override=true, startDate=2023-10-23T00:00, endDate=null), ship=null)

Deserialization Schema: (Tried replacingTimestampType with stringtype)

StructType([
StructField("gtin13", StringType(), True),
StructField("sourceId", StringType(), True),
StructField("lastUpdateTimestamp", TimestampType(), True),
StructField("lastUpdatedUser", StringType(), True),
StructField("inStore", StructType([
StructField("reason", StringType(), True),
StructField("override", BooleanType(), True),
StructField("startDate", TimestampType(), True),
StructField("endDate", TimestampType(), True)
]), True),
StructField("pickup", StructType([
StructField("reason", StringType(), True),
StructField("override", BooleanType(), True),
StructField("startDate", TimestampType(), True),
StructField("endDate", TimestampType(), True)
]), True),
StructField("delivery", StructType([
StructField("reason", StringType(), True),
StructField("override", BooleanType(), True),
StructField("startDate", TimestampType(), True),
StructField("endDate", TimestampType(), True)
]), True),
StructField("ship", StructType([
StructField("reason", StringType(), True),
StructField("override", BooleanType(), True),
StructField("startDate", TimestampType(), True),
StructField("endDate", TimestampType(), True)
]), True)
])

Actual Schema:
{
"title": "schema",
"type": "object",
"properties": {
"gtin13": {
"type": "string",
"pattern": "^[0-9]{13}$"
},
"sourceId": {
"type": "string",
"pattern": "^[0-9]{8}$"
},
"lastUpdateTimestamp": {
"type": "string",
"format": "date-time"
},
"inStore": {
"type": [
"object",
"null"
],
"properties": {
"reason": {
"type": [
"string",
"null"
]
},
"override": {
"type": [
"boolean",
"null"
]
},
"startDate": {
"type": [
"string",
"null"
],
"format": "date-time"
},
"endDate": {
"type": [
"string",
"null"
],
"format": "date-time"
}
},
"required": [
"reason",
"override",
"startDate",
"endDate"
]
},
"pickup": {
"type": [
"object",
"null"
],
"properties": {
"reason": {
"type": "string"
},
"override": {
"type": [
"boolean",
"null"
]
},
"startDate": {
"type": [
"string",
"null"
],
"format": "date-time"
},
"endDate": {
"type": [
"string",
"null"
],
"format": "date-time"
}
},
"required": [
"reason",
"override",
"startDate",
"endDate"
]
},
"delivery": {
"type": [
"object",
"null"
],
"properties": {
"reason": {
"type": [
"string",
"null"
]
},
"override": {
"type": [
"boolean",
"null"
]
},
"startDate": {
"type": [
"string",
"null"
],
"format": "date-time"
},
"endDate": {
"type": [
"string",
"null"
],
"format": "date-time"
}
},
"required": [
"reason",
"override",
"startDate",
"endDate"
]
},
"ship": {
"type": [
"object",
"null"
],
"properties": {
"reason": {
"type": [
"string",
"null"
]
},
"override": {
"type": [
"boolean",
"null"
]
},
"startDate": {
"type": [
"string",
"null"
],
"format": "date-time"
},
"endDate": {
"type": [
"string",
"null"
],
"format": "date-time"
}
},
"required": [
"reason",
"override",
"startDate",
"endDate"
]
}
},
"required": [
"gtin13",
"sourceId",
"lastUpdateTimestamp"
]
}


CODE:
kafkaDF = (
spark.readStream.format("kafka")
.option("subscribe", topic)
.option(
"kafka.bootstrap.servers",
bootstrap_server,
)
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.jaas.config",EVENTHUB_CONNECTION_STRING)
.option("kafka.request.timeout.ms", "60000")
.option("kafka.session.timeout.ms", "30000")
.option("startingOffsets", "earliest")
.option("mode", "PERMISSIVE")
.load()
.withColumn("value", col("value").cast(StringType()))
)

print("Schema of Kafka DataFrame:")
kafkaDF.printSchema()

# Deserialize JSON data
deserialized_stream = kafkaDF.withColumn(
"data", F.from_json(col("value").cast("string"), json_schema)
)
 
parsed_df = deserialized_stream.select("data.*")

# Display the parsed DataFrame
display(deserialized_stream)


PRINT SCHEMA OUTPUT:
Schema of Kafka DataFrame: root |-- key: binary (nullable = true) |-- value: string (nullable = true) |-- topic: string (nullable = true) |-- partition: integer (nullable = true) |-- offset: long (nullable = true) |-- timestamp: timestamp (nullable = true) |-- timestampType: integer (nullable = true)

Deserialized Output:
{"gtin13":null,"sourceId":null,"lastUpdateTimestamp":null,"inStore":null,"pickup":null,"delivery":null,"ship":null}
0 REPLIES 0

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now