cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

How to filter files in Databricks Autoloader stream

kaslan
New Contributor II

I want to set up an S3 stream using Databricks Auto Loader. I have managed to set up the stream, but my S3 bucket contains different type of JSON files. I want to filter them out, preferably in the stream itself rather than using a filter operation.

According to the docs I should be able to filter using a glob pattern. However, I can't seem to get this to work as it loads everything anyhow.

This is what I have

df = (
  spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.inferColumnTypes", "true")
  .option("cloudFiles.schemaInference.samleSize.numFiles", 1000)
  .option("cloudFiles.schemaLocation", "dbfs:/auto-loader/schemas/")
  .option("includeExistingFiles", "true")
  .option("multiLine", "true")
  .option("inferSchema", "true")
#   .option("cloudFiles.schemaHints", schemaHints)
#  .load("s3://<BUCKET>/qualifier/**/*_INPUT")
  .load("s3://<BUCKET>/qualifier")
  .withColumn("filePath", F.input_file_name())
  .withColumn("date_ingested", F.current_timestamp())
)

My files have a key that is structured as

qualifier/version/YYYY-MM/DD/<NAME>_INPUT.json
 

, so I want to filter files that contain the name input. This seems to load everything:

.load("s3://<BUCKET>/qualifier")
 

and

.load("s3://<BUCKET>/qualifier/**/*_INPUT")

is what I want to do, but that doesn't work. Is my glob pattern incorrect, or is there something else I am missing?

6 REPLIES 6

Kaniz
Community Manager
Community Manager

Hi @ kaslan ! My name is Kaniz, and I'm the technical moderator here. Great to meet you, and thanks for your question! Let's see if your peers in the community have an answer to your question first. Or else I will get back to you soon. Thanks.

-werners-
Esteemed Contributor III

According to the docs you linked, the glob filter on input-path only works on directories, not on the files themselves.

So if you want to filter on certain files in the concerning dirs, you can include an additional filter through the pathGlobFilter option:

.option("pathGlobFilter", "*_INPUT")

https://docs.databricks.com/spark/latest/structured-streaming/auto-loader-s3.html#use-cloudfiles-sou...

kaslan
New Contributor II

Ah yeah forgot to mention that I tried that as well. It still picks up other files as well when I do that

-werners-
Esteemed Contributor III

Strange, maybe because of this? :

"The glob pattern will have * appended to it " (for the filepath)

Or use *_INPUT* as file filter.

kaslan
New Contributor II

Yeah, maybe. But that would mean that all files that contain INPUT would still be included right?

-werners-
Esteemed Contributor III

no, if you explicitely put the underscore, plain INPUT will not be selected.

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.