cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Spark Streaming - only process new files in streaming path?

Michael_Galli
Contributor III

In our streaming jobs, we currently run streaming (cloudFiles format) on a directory with sales transactions coming every 5 minutes.

In this directory, the transactions are ordered in the following format:

<streaming-checkpoint-root>/<transaction_date>/<transaction_hour>/transaction_x_y.json

Only the transactions of TODAY are of interest, all others are already obsolete.

When I start the streaming job, it will process all the historical transactions, which I donยดt want.

Is it somehow possible to process only NEW files coming in after the streaming job has started?

1 ACCEPTED SOLUTION

Accepted Solutions

Michael_Galli
Contributor III

Update:

Seems that maxFileAge was not a good idea. The following with the option "includeExistingFiles" = False solved my problem:

streaming_df = (

spark.readStream.format("cloudFiles")

.option("cloudFiles.format", extension)

.option("cloudFiles.maxFilesPerTrigger", 20)

.option("cloudFiles.includeExistingFiles", False)

.option("multiLine", True)

.option("pathGlobfilter", "*."+extension) \

.schema(schema).load(streaming_path)

)

View solution in original post

3 REPLIES 3

Michael_Galli
Contributor III

Seems that "maxFileAge" solves the problem.

streaming_df = (

spark.readStream.format("cloudFiles").option("cloudFiles.format", "json") \

.option("maxFilesPerTrigger", 20) \

.option("multiLine", True) \

.option("maxFileAge", 1) \

.schema(schema).load(streaming_path)

)

This ignores files older than 1 week.

But how to ignore files older than 1 day?

Hubert-Dudek
Esteemed Contributor III

Yes exactly cloudFiles.maxFileAge please select your answer as the best one ๐Ÿ™‚

Michael_Galli
Contributor III

Update:

Seems that maxFileAge was not a good idea. The following with the option "includeExistingFiles" = False solved my problem:

streaming_df = (

spark.readStream.format("cloudFiles")

.option("cloudFiles.format", extension)

.option("cloudFiles.maxFilesPerTrigger", 20)

.option("cloudFiles.includeExistingFiles", False)

.option("multiLine", True)

.option("pathGlobfilter", "*."+extension) \

.schema(schema).load(streaming_path)

)

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group