cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

structured streaming schema inference

israelst
New Contributor II

I want to stream data from kinesis using DLT. the Data is in json format. How can I use structured streaming to automatically infer the schema? I know auto-loader has this feature but it doesn't make sense for me to use autoloader since my data is streaming from kinesis...

5 REPLIES 5

Priyanka_Biswas
Esteemed Contributor III
Esteemed Contributor III

Hi @israelst 

When working with Kinesis in Databricks, you can effectively handle various data formats including JSON, Avro, or bytes. The key is to appropriately decode the data in your Spark application.

Before reading your stream, define your data schema. For JSON data, this can be done manually using PySpark's StructType and StructField if you are aware of your data structure.

To read the stream, specify the source format as โ€œkinesisโ€ in your Databricks notebook. 

df = spark.readStream.format("kinesis")
.option("streamName", "your_stream_name")
.option("initialPosition", "latest")
.load()

After defining the schema, use it to process the JSON data from Kinesis:

from pyspark.sql.functions import from_json
json_df = df.select(from_json(df.value.cast("string"), schema).alias("data")).select("data.*")

Thanks @Priyanka_Biswas, but as I wrote - I am aiming for automatic inference of the schema. AutoLoader already has this functionality... But it seems kinesis structured streaming doesn't...

Hi @israelst, You can use Structured Streaming in Databricks to read data from Kinesis and infer the schema. 

 

Read from Kinesis: You can read from Kinesis using the readStream method. 

 

For example, in Python, you can use spark.readStream.format("kinesis").option("streamName", "<your-s....

 

Define the Schema: If you know the schema of your JSON data, you can define it using the StructType method and set it using the schema method. 

For example:

from pyspark.sql.types import StructType, StringType, TimestampType jsonSchema = StructType([    StructField("field1", StringType(), True),    StructField("field2", TimestampType(), True),    # Add more fields here ]) streamingInputDF = (    spark    .readStream    .schema(jsonSchema)    .json("/path/to/json/data") )

 

Infer the Schema: If you donโ€™t know the schema of your JSON data, you can infer it by first reading a small batch of data, inferring the schema, and then using that schema for the rest of the stream. However, please note that if the schema changes after a streaming read begins against the table, the....

 

Process the Stream: Once you have the DataFrame, you can perform various operations on it, like filt....

 

Write to Delta Lake: You can write the processed stream to a Delta Lake table using the writeStream method. For example, streamingInputDF.writeStream.format("delta").option("checkpointLocation", "/path/to/che....

 

Kaniz_Fatma
Community Manager
Community Manager

israelst
New Contributor II

I wanted to use Databricks for this. I don't want to depend on AWS Glue. Same way I could do it with AutoLoader...

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group