cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Spark Streaming - only process new files in streaming path?

Michael_Galli
Contributor II

In our streaming jobs, we currently run streaming (cloudFiles format) on a directory with sales transactions coming every 5 minutes.

In this directory, the transactions are ordered in the following format:

<streaming-checkpoint-root>/<transaction_date>/<transaction_hour>/transaction_x_y.json

Only the transactions of TODAY are of interest, all others are already obsolete.

When I start the streaming job, it will process all the historical transactions, which I donยดt want.

Is it somehow possible to process only NEW files coming in after the streaming job has started?

1 ACCEPTED SOLUTION

Accepted Solutions

Michael_Galli
Contributor II

Update:

Seems that maxFileAge was not a good idea. The following with the option "includeExistingFiles" = False solved my problem:

streaming_df = (

spark.readStream.format("cloudFiles")

.option("cloudFiles.format", extension)

.option("cloudFiles.maxFilesPerTrigger", 20)

.option("cloudFiles.includeExistingFiles", False)

.option("multiLine", True)

.option("pathGlobfilter", "*."+extension) \

.schema(schema).load(streaming_path)

)

View solution in original post

3 REPLIES 3

Michael_Galli
Contributor II

Seems that "maxFileAge" solves the problem.

streaming_df = (

spark.readStream.format("cloudFiles").option("cloudFiles.format", "json") \

.option("maxFilesPerTrigger", 20) \

.option("multiLine", True) \

.option("maxFileAge", 1) \

.schema(schema).load(streaming_path)

)

This ignores files older than 1 week.

But how to ignore files older than 1 day?

Hubert-Dudek
Esteemed Contributor III

Yes exactly cloudFiles.maxFileAge please select your answer as the best one ๐Ÿ™‚

Michael_Galli
Contributor II

Update:

Seems that maxFileAge was not a good idea. The following with the option "includeExistingFiles" = False solved my problem:

streaming_df = (

spark.readStream.format("cloudFiles")

.option("cloudFiles.format", extension)

.option("cloudFiles.maxFilesPerTrigger", 20)

.option("cloudFiles.includeExistingFiles", False)

.option("multiLine", True)

.option("pathGlobfilter", "*."+extension) \

.schema(schema).load(streaming_path)

)

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.