Hi all,
I am quite new to databricks. Overall I enjoyed the experience so far, but now ran into a problem, I was not able to find an acceptable solution.
Here is my setup: I have a bunch of s3 buckets, and need to put the data into databricks, preferably using a DLT pipeline. Each of the buckets contains a directory with data files, and possibly a directory with error logs, produced by an application. So the structure looks something like this:
- s3://<bucket-name>/data/...
- s3://<bucket-name>/errors/...
Creating a DLT-Pipeline to read the data-files was not a problem. Now I wanted to do the same for the errors. The problem is: Some of the buckets do not contain the /errors directory. Therefor the straight forward
@Dlt.table
def error_table():
return spark.readStream.options(**options).load("s3://<bucket-name>/errors/")
does not work, and the complete pipeline will fail on initialization. Thus even the other tables in the pipeline will not update. I want to do the following: If the path does not exist, I instead want to create an empty table (the schema is known).
What I tried so far:
@Dlt.table
def error_table():
try:
return spark.readStream.options(**options).load("s3://<bucket-name>/errors")
except Exception:
return spark.createDataframe(data=[], schema=error_schema)
Due to the lazy nature of spark, this does not work (at least for me).
@Dlt.table
def error_table():
return spark.readStream.options(**options).option("pathGlobFilter", "/errors*").load("s3://<bucket-name>")
This seems to work, but it seems to scan all the data-files. That is quite inefficient, and not acceptable. For me, this took >30min, even when there were no errors.