cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

readStream query throws exception if there's no data in delta location.

pranathisg97
New Contributor III

Hi,

I have a scenario where writeStream query writes the stream data to bronze location and I have to read from bronze, do some processing and finally write it to silver. I use S3 location for delta tables

But for the very first execution , readStream is giving an exception: Table schema is not set. Write data into it or use CREATE TABLE to set the schema.

My readStream query:

(spark

.readStream.format("delta")

.option("ignoreChanges","true")

.option("initialPosition","latest")

.load("/mnt/tmp/bronze")

)

Is there any option that can be set with readStream query to check if there's any data in delta location and if not , it will wait until the data is available?

2 REPLIES 2

Anonymous
Not applicable

@Pranathi Girish​ :

The error message you're seeing suggests that the schema for the Delta table you're trying to read hasn't been set yet. This can happen if you haven't written any data to the table yet, or if the schema hasn't been explicitly set using the

CREATE TABLE command.

To address the issue, you can try explicitly setting the schema for the Delta table using the CREATE TABLE command before running your readStream query. For example:

(spark.sql("CREATE TABLE delta_table_name (col1 STRING, col2 INT) USING DELTA LOCATION '/mnt/tmp/silver'")
.readStream
.format("delta")
.option("ignoreChanges","true")
.option("initialPosition","latest")
.load("/mnt/tmp/bronze"))

Regarding the question of waiting for data to be available in the Delta table, you can try setting the

failOnDataLoss option to false in your readStream query. This tells Spark to wait for new data to arrive in the stream instead of throwing an error if data is missing. For example:

(spark
  .readStream
  .format("delta")
  .option("ignoreChanges", "true")
  .option("initialPosition", "latest")
  .option("failOnDataLoss", "false")
  .load("/mnt/tmp/bronze")
)

With this option set, Spark will wait for new data to arrive in the stream and continue processing once it's available. However, note that this can result in slower performance if there are long gaps between data arriving in the stream.

Hope this helps you to find solution to your answer!

Vartika
Moderator
Moderator

Hi @Pranathi Girish​,

Hope all is well!

Checking in. If @Suteja Kanuri​'s answer helped, would you let us know and mark the answer as best? If not, would you be happy to give us more information?

We'd love to hear from you.

Thanks!