cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

IoT hub with kafka connector - how to decode the enqueued timestamp and device id

Pรกlmi
New Contributor II

I'm reading data from the default endpoint of an IoT hub in azure using the kafka connector in Databricks.  Most data items are straight forward, but the device id and the timestamp I haven't been able to properly decode

For example, the key-value map of the headers  {"key": "iothub-enqueuedtime", "value": "gwAAAZAMsGjg"}  should be a recent timestamp.  Any ideas on how to decode this, using pyspark?

 

 

3 REPLIES 3

Kaniz_Fatma
Community Manager
Community Manager

Hi @Pรกlmi

  1. First, read the data stream from your IoT hub using the Kafka connector. You can define a DataFrame to read the data as a stream from your EventHub or IoT-Hub. 
  2. To extract the timestamp, youโ€™ll need to define the structure of the data.
  3. The timestamp provided in the iothub-enqueuedtime header is in a base64-encoded format. You can decode it and convert it to a readable timestamp. 
  4. Finally, select the relevant columns (including the decoded timestamp) from the DataFrame.

For more information, you can refer to the the Microsoft Learn guide on Azure IoT Hub message format.

Pรกlmi
New Contributor II

Hi @Kaniz_Fatma , thanks for your reply.  The iothub-enqueuedtime does not (directly ) cast into a timestamp, but an unix timestamp with milliseconds  is somewhere in there

 

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, explode, get_json_object, schema_of_json
from pyspark.sql.functions import col, explode, expr, unbase64, from_unixtime,hex,length

df = spark.read.format("delta").table("iot_ps2")
#df.display()
df2=df.select("headers")
df2.display()
# Explode the array of structs into individual rows
df_exploded = df.withColumn("json_item", explode(col("headers")))
# Filter rows to get only the 'iothub-enqueuedtime' key
df_filtered = df_exploded.filter(col("json_item.key") == "iothub-enqueuedtime")
df3=df_filtered.select("json_item.key","json_item.value")
df3=df3.withColumn("str_value",expr("cast(value as STRING)"))
df3=df3.withColumn("hex",expr("hex(str_value)"))
df3.display()
 
looking at the hex code it is possible to determine that the 6 rightmost bytes "01 90 36 4C 1B 5C" turn into a unix timestamp with milliseconds. That leaves 3 unknown bytes 
I'm hoping that a more straightforward way is available
Plmi_0-1718906244920.png

 

Erik
Valued Contributor II
Join 100K+ Data Experts: Register Now & Grow with Us!

Excited to expand your horizons with us? Click here to Register and begin your journey to success!

Already a member? Login and join your local regional user group! If there isn’t one near you, fill out this form and we’ll create one for you to join!