cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

How to filter files in Databricks Autoloader stream

kaslan
New Contributor II

I want to set up an S3 stream using Databricks Auto Loader. I have managed to set up the stream, but my S3 bucket contains different type of JSON files. I want to filter them out, preferably in the stream itself rather than using a filter operation.

According to the docs I should be able to filter using a glob pattern. However, I can't seem to get this to work as it loads everything anyhow.

This is what I have

df = (
  spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.inferColumnTypes", "true")
  .option("cloudFiles.schemaInference.samleSize.numFiles", 1000)
  .option("cloudFiles.schemaLocation", "dbfs:/auto-loader/schemas/")
  .option("includeExistingFiles", "true")
  .option("multiLine", "true")
  .option("inferSchema", "true")
#   .option("cloudFiles.schemaHints", schemaHints)
#  .load("s3://<BUCKET>/qualifier/**/*_INPUT")
  .load("s3://<BUCKET>/qualifier")
  .withColumn("filePath", F.input_file_name())
  .withColumn("date_ingested", F.current_timestamp())
)

My files have a key that is structured as

qualifier/version/YYYY-MM/DD/<NAME>_INPUT.json
 

, so I want to filter files that contain the name input. This seems to load everything:

.load("s3://<BUCKET>/qualifier")
 

and

.load("s3://<BUCKET>/qualifier/**/*_INPUT")

is what I want to do, but that doesn't work. Is my glob pattern incorrect, or is there something else I am missing?

5 REPLIES 5

-werners-
Esteemed Contributor III

According to the docs you linked, the glob filter on input-path only works on directories, not on the files themselves.

So if you want to filter on certain files in the concerning dirs, you can include an additional filter through the pathGlobFilter option:

.option("pathGlobFilter", "*_INPUT")

https://docs.databricks.com/spark/latest/structured-streaming/auto-loader-s3.html#use-cloudfiles-sou...

kaslan
New Contributor II

Ah yeah forgot to mention that I tried that as well. It still picks up other files as well when I do that

-werners-
Esteemed Contributor III

Strange, maybe because of this? :

"The glob pattern will have * appended to it " (for the filepath)

Or use *_INPUT* as file filter.

kaslan
New Contributor II

Yeah, maybe. But that would mean that all files that contain INPUT would still be included right?

-werners-
Esteemed Contributor III

no, if you explicitely put the underscore, plain INPUT will not be selected.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group