โ01-15-2024 02:28 AM
I want to stream data from kinesis using DLT. the Data is in json format. How can I use structured streaming to automatically infer the schema? I know auto-loader has this feature but it doesn't make sense for me to use autoloader since my data is streaming from kinesis...
โ01-16-2024 03:57 PM
Hi @israelst
When working with Kinesis in Databricks, you can effectively handle various data formats including JSON, Avro, or bytes. The key is to appropriately decode the data in your Spark application.
Before reading your stream, define your data schema. For JSON data, this can be done manually using PySpark's StructType and StructField if you are aware of your data structure.
To read the stream, specify the source format as โkinesisโ in your Databricks notebook.
df = spark.readStream.format("kinesis")
.option("streamName", "your_stream_name")
.option("initialPosition", "latest")
.load()
After defining the schema, use it to process the JSON data from Kinesis:
from pyspark.sql.functions import from_json
json_df = df.select(from_json(df.value.cast("string"), schema).alias("data")).select("data.*")
โ01-17-2024 12:44 AM
Thanks @Priyanka_Biswas, but as I wrote - I am aiming for automatic inference of the schema. AutoLoader already has this functionality... But it seems kinesis structured streaming doesn't...
โ01-18-2024 01:26 AM
Hi @israelst, You can use Structured Streaming in Databricks to read data from Kinesis and infer the schema.
Read from Kinesis: You can read from Kinesis using the readStream method.
Define the Schema: If you know the schema of your JSON data, you can define it using the StructType method and set it using the schema method.
For example:
from pyspark.sql.types import StructType, StringType, TimestampType jsonSchema = StructType([ StructField("field1", StringType(), True), StructField("field2", TimestampType(), True), # Add more fields here ]) streamingInputDF = ( spark .readStream .schema(jsonSchema) .json("/path/to/json/data") )
Infer the Schema: If you donโt know the schema of your JSON data, you can infer it by first reading a small batch of data, inferring the schema, and then using that schema for the rest of the stream. However, please note that if the schema changes after a streaming read begins against the table, the....
Write to Delta Lake: You can write the processed stream to a Delta Lake table using the writeStream method. For example, streamingInputDF.writeStream.format("delta").option("checkpointLocation", "/path/to/che....
โ01-17-2024 01:51 AM - edited โ01-17-2024 01:51 AM
โ01-18-2024 12:57 AM
I wanted to use Databricks for this. I don't want to depend on AWS Glue. Same way I could do it with AutoLoader...
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโt want to miss the chance to attend and share knowledge.
If there isnโt a group near you, start one and help create a community that brings people together.
Request a New Group