05-13-2024 04:54 AM - edited 05-13-2024 04:55 AM
I am trying to use autoloader to load data from two different blobs from within the same account so that spark will discover the data asynchronously. However, when I try this, it doesn't work and I get the error outlined below. Can anyone point out where I am going wrong, or an alternative method to achieve this?
To use 'cloudFiles' as a streaming source, please provide the file format with the option 'cloudFiles.format', and use .load() to create your DataFrame.
df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("cloudFiles.backfillInterval", "1 day")
.load("wasbs://customer1@container_name.*csv", "wasbs://customer2@container_name.*csv")
05-15-2024 06:12 AM
Hello Tim, as you will note from the Spark Streaming docs, the load function only accepts one string for the path arg. This means that all files need to be detectible from the same base path if you wish to do this in a single stream. You can then use glob patterns to pick up the files from the same base path.
You have 2 options here:
05-15-2024 06:17 AM
Thanks for your feedback Corbin. I am aware of the deprecation, but the type of blob storage is beyond my control in this case i'm afraid. Further to this, the business logic dictates that there be a single blob storage per customer so this also cannot be changed.
Therefore it looks like option 2 would be my only choice. Is there a way to create separate streams dynamically from a list of customers and still utilise the asynchronous nature of spark?
05-15-2024 06:19 AM
What do you mean by "asynchronous nature of spark"? What behavior are you trying to maintain?
05-15-2024 06:37 AM
Can I create multiple streams that run at the same time? Or do I have to wait for one stream to finish before starting another? So if I have 10 different storage containers, can I create 10 streams that run at the same time?
05-15-2024 06:49 AM
Yes, each stream will run independently of one another and all can run together at the same time.
05-15-2024 07:01 AM
That's good news. So would this be the correct sort of set up, or should I be creating all the streams first before writing the streams to a table?
customer_list = ['customer1', 'customer2', 'customer3', ...]
table_name = "bronze.customer_data_table"
for customer in customer_list:
file_path = f"wasbs://{customer}@conatiner.blob.core.windows.net/*/*.csv"
checkpoint_ path = f"/tmp/checkpoints/{customer}/_checkpoints"
cloudFile = {
"cloudFiles.format": "csv",
"cloudFiles.backfillInterval": "1 day",
"cloudFiles.schemaLocation": checkpoint_path,
"cloudFiles.schemaEvolutionMode": "rescue",
}
df = (
spark.readStream
.format("cloudFiles")
.options(**cloudFile)
.load(file_path)
)
streamQuery = (
df.writeStream.format("delta")
.option("outputMode", "append")
.option("checkpointLocation", checkpoint_path)
.trigger(once=True)
.toTable(table_name)
)
05-15-2024 07:31 AM
LGTM, but use .trigger(availableNow=True) instead of once since once is now deprecated
05-15-2024 08:15 AM
If were were to upgrade to ADLSg2, but retain the same structure, would there be scope for this method above to be improved (besides moving to notification mode)?
Excited to expand your horizons with us? Click here to Register and begin your journey to success!
Already a member? Login and join your local regional user group! If there isn’t one near you, fill out this form and we’ll create one for you to join!