cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

structured streaming schema inference

israelst
New Contributor II

I want to stream data from kinesis using DLT. the Data is in json format. How can I use structured streaming to automatically infer the schema? I know auto-loader has this feature but it doesn't make sense for me to use autoloader since my data is streaming from kinesis...

5 REPLIES 5

Priyanka_Biswas
Valued Contributor
Valued Contributor

Hi @israelst 

When working with Kinesis in Databricks, you can effectively handle various data formats including JSON, Avro, or bytes. The key is to appropriately decode the data in your Spark application.

Before reading your stream, define your data schema. For JSON data, this can be done manually using PySpark's StructType and StructField if you are aware of your data structure.

To read the stream, specify the source format as โ€œkinesisโ€ in your Databricks notebook. 

df = spark.readStream.format("kinesis")
.option("streamName", "your_stream_name")
.option("initialPosition", "latest")
.load()

After defining the schema, use it to process the JSON data from Kinesis:

from pyspark.sql.functions import from_json
json_df = df.select(from_json(df.value.cast("string"), schema).alias("data")).select("data.*")

Thanks @Priyanka_Biswas, but as I wrote - I am aiming for automatic inference of the schema. AutoLoader already has this functionality... But it seems kinesis structured streaming doesn't...

Hi @israelst, You can use Structured Streaming in Databricks to read data from Kinesis and infer the schema. 

 

Read from Kinesis: You can read from Kinesis using the readStream method. 

 

For example, in Python, you can use spark.readStream.format("kinesis").option("streamName", "<your-s....

 

Define the Schema: If you know the schema of your JSON data, you can define it using the StructType method and set it using the schema method. 

For example:

from pyspark.sql.types import StructType, StringType, TimestampType jsonSchema = StructType([    StructField("field1", StringType(), True),    StructField("field2", TimestampType(), True),    # Add more fields here ]) streamingInputDF = (    spark    .readStream    .schema(jsonSchema)    .json("/path/to/json/data") )

 

Infer the Schema: If you donโ€™t know the schema of your JSON data, you can infer it by first reading a small batch of data, inferring the schema, and then using that schema for the rest of the stream. However, please note that if the schema changes after a streaming read begins against the table, the....

 

Process the Stream: Once you have the DataFrame, you can perform various operations on it, like filt....

 

Write to Delta Lake: You can write the processed stream to a Delta Lake table using the writeStream method. For example, streamingInputDF.writeStream.format("delta").option("checkpointLocation", "/path/to/che....

 

israelst
New Contributor II

I wanted to use Databricks for this. I don't want to depend on AWS Glue. Same way I could do it with AutoLoader...

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.