- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
โ05-06-2022 04:19 AM
In our streaming jobs, we currently run streaming (cloudFiles format) on a directory with sales transactions coming every 5 minutes.
In this directory, the transactions are ordered in the following format:
<streaming-checkpoint-root>/<transaction_date>/<transaction_hour>/transaction_x_y.json
Only the transactions of TODAY are of interest, all others are already obsolete.
When I start the streaming job, it will process all the historical transactions, which I donยดt want.
Is it somehow possible to process only NEW files coming in after the streaming job has started?
- Labels:
-
CloudFiles
-
TODAY
Accepted Solutions
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
โ05-09-2022 11:00 PM
Update:
Seems that maxFileAge was not a good idea. The following with the option "includeExistingFiles" = False solved my problem:
streaming_df = (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", extension)
.option("cloudFiles.maxFilesPerTrigger", 20)
.option("cloudFiles.includeExistingFiles", False)
.option("multiLine", True)
.option("pathGlobfilter", "*."+extension) \
.schema(schema).load(streaming_path)
)
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
โ05-06-2022 06:28 AM
Seems that "maxFileAge" solves the problem.
streaming_df = (
spark.readStream.format("cloudFiles").option("cloudFiles.format", "json") \
.option("maxFilesPerTrigger", 20) \
.option("multiLine", True) \
.option("maxFileAge", 1) \
.schema(schema).load(streaming_path)
)
This ignores files older than 1 week.
But how to ignore files older than 1 day?
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
โ05-06-2022 08:46 AM
Yes exactly cloudFiles.maxFileAge please select your answer as the best one ๐
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
โ05-09-2022 11:00 PM
Update:
Seems that maxFileAge was not a good idea. The following with the option "includeExistingFiles" = False solved my problem:
streaming_df = (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", extension)
.option("cloudFiles.maxFilesPerTrigger", 20)
.option("cloudFiles.includeExistingFiles", False)
.option("multiLine", True)
.option("pathGlobfilter", "*."+extension) \
.schema(schema).load(streaming_path)
)

