Spark Streaming - only process new files in streaming path?

Michael_Galli
Contributor III

In our streaming jobs, we currently run streaming (cloudFiles format) on a directory with sales transactions coming every 5 minutes.

In this directory, the transactions are ordered in the following format:

<streaming-checkpoint-root>/<transaction_date>/<transaction_hour>/transaction_x_y.json

Only the transactions of TODAY are of interest, all others are already obsolete.

When I start the streaming job, it will process all the historical transactions, which I donΒ΄t want.

Is it somehow possible to process only NEW files coming in after the streaming job has started?

Michael_Galli
Contributor III

Seems that "maxFileAge" solves the problem.

streaming_df = (

spark.readStream.format("cloudFiles").option("cloudFiles.format", "json") \

.option("maxFilesPerTrigger", 20) \

.option("multiLine", True) \

.option("maxFileAge", 1) \

.schema(schema).load(streaming_path)

)

This ignores files older than 1 week.

But how to ignore files older than 1 day?

Yes exactly cloudFiles.maxFileAge please select your answer as the best one πŸ™‚


My blog: https://databrickster.medium.com/

Michael_Galli
Contributor III

Update:

Seems that maxFileAge was not a good idea. The following with the option "includeExistingFiles" = False solved my problem:

streaming_df = (

spark.readStream.format("cloudFiles")

.option("cloudFiles.format", extension)

.option("cloudFiles.maxFilesPerTrigger", 20)

.option("cloudFiles.includeExistingFiles", False)

.option("multiLine", True)

.option("pathGlobfilter", "*."+extension) \

.schema(schema).load(streaming_path)

)

View solution in original post