05-09-2023 01:19 PM
I'm working in the Google Cloud environment. I have an Autoloader job that uses the cloud files notifications to load data into a delta table. I want to filter the files from the PubSub topic based on the path in GCS where the files are located, not just the file name. I can successfully filter files based on the file name, but if I try to filter on the path, I get an empty DataSet.
path_of_file = "gs://my_bucket/dir1/dir2/test_data.json"
glob_filter1 = "*.json"
glob_filter2 = "*dir2*.json"
glob_filter3 = "**dir2**.json"
glob_filter4 = "*/dir2/*.json
spark
.readStream.schema(schema)
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.inferColumnTypes", "true")
.option("cloudFiles.projectId", "<MY PROJECT ID>")
.option("cloudFiles.useNotifications", "true")
.option("checkpointLocation", check_point_location)
.option("cloudFiles.includeExistingFiles", "true")
.option("cloudFiles.subscription", "<MY SUBSCRIPTION ID>")
.option("pathGlobFilter", <GLOB FILTER>)
.load()
When I use `glob_filter1` as the `pathGlobFilter` option, the autoloader successfully runs and loads the expected file. When I use `glob_filter2`, `glob_filter3`, or `glob_filter4`, autoloader runs but filters out the expected file. I always confirm that the expected notification is in the PubSub topic before running the test and that it has been acked on the topic after the test.
The documentation refers to it as a glob filter, and in all other places in the documentation, the glob filter can filter on the full path. Am I doing something wrong? Does the globPathFilter only work on the file name and not the full path?
05-10-2023 06:37 AM
Thank you for confirming what I observed that differed from the documentation.
05-10-2023 04:04 AM
pathGlobFilter is used to only include files with file names matching the pattern. The syntax follows org.apache.hadoop.fs.GlobFilter. It does not change the behavior of partition discovery.
To load files with paths matching a given glob pattern while keeping the behavior of partition discovery, you can use:
val testGlobFilterDF = spark.read.format("parquet")
.option("pathGlobFilter", "*.parquet") // json file should be filtered out
.load("examples/src/main/resources/dir1")
testGlobFilterDF.show()
// +-------------+
// | file|
// +-------------+
// |file1.parquet|
// +-------------+
05-10-2023 06:37 AM
Thank you for confirming what I observed that differed from the documentation.
Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections.
Click here to register and join today!
Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.