cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Streaming- Results not getting updated on arrival of new files

AanchalSoni
New Contributor II

Hi!

I'm trying to stream some files using read_files.format("cloudFiles"). However, when new files arrive, the subsequent SQL query and monitoring graphs are not getting updated. Please suggest.

9 REPLIES 9

BS_THE_ANALYST
Esteemed Contributor II

Hi @AanchalSoni , 

How have you set up your stream? Could you provide the code? ๐Ÿ˜Š. Perhaps you've not setup the stream trigger to behave like you want: https://docs.databricks.com/aws/en/structured-streaming/triggers 

All the best,
BS

AanchalSoni
New Contributor II

Please check the attachments

(Virus scan in progress ...)
(Virus scan in progress ...)

szymon_dybczak
Esteemed Contributor III

It seems there is a problem with attachements on community in recent days. All of them are stuck with this "Virus scan in progress". Could you try copy those images directly into text box.

Hi, sorry for calling you directly  @Advika , @Sujitha - but mayby do you know if there have been any changes to the attachment adding system recently?

df_super = (
    spark.readStream.format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .option("cloudFiles.validateOptions", "false")
    .option(
        "cloudFiles.schemaHints",
        "ROW_ID int, Order_Date date, Ship_Date date, Postal_Code int, Sales double, Quantity double, Discount double, Profit double",
    )
    .option("cloudFiles.schemaLocation", "/Volumes/dlt/default/data/Schema2/")
    .load("/Volumes/dlt/default/data/Superstore/")
)
 --------
spark.sql("select * from vw_superstore").display(checkpointLocation = "/Volumes/dlt/default/data/Check_super13/")

saurabh18cs
Honored Contributor II

Hi @AanchalSoni  thanks for sharing your code. your job is also not scheduled and running continuously right?

+

You need to write the streaming DataFrame to a table or view for vw_superstore

example:

f_super.writeStream \
.format("delta") \
.option("checkpointLocation", "/Volumes/dlt/default/data/Check_super13/") \
.outputMode("append") \
.table("vw_superstore").start()

Thanks for tagging, @szymon_dybczak! Weโ€™ll check and get back on this.

saurabh18cs
Honored Contributor II

Hi @AanchalSoni If you are using .readStream, make sure you have set a trigger interval (e.g., .trigger(processingTime='1 minute'))

szymon_dybczak
Esteemed Contributor III

If you don't define trigger by default it will trigger microbatch every 0.5 second. So I guess this is not an issue here.

Hi Saurabh!

If I'm not explicitly mentioning the trigger then by default the trigger should be 500 ms and there should be a quick check for new files. however, even after a few minutes there is no expected activity.