om.microsoft.azure.storage.StorageException: The specifed resource name contains invalid characters.

manas_884
New Contributor II

Hi guys I'm relatively new to Databricks and struggling to implement an autoloader ( with trigger once = true ) in file notifications mode. I have CSV files in one container (landing zone). I would like the autoloader to pick up new and existing files from this container and load these CSV files in delta format in another container. I can read the stream with the credentials, however i when I try to write the stream using df.writeStream(), I get the above storage exception. I checked the files names and paths as well and just to be sure, I read the files using spar. read and I was successfully able to write it as a batch process. I'm really not able to figure out why I cannot write the stream whereas I can write it as a batch job. I also have the correct roles assigned to the service principal.

I would really appreciate if someone could help me out as i have been stuck with it for hours

manas_884
New Contributor II

Hi Kaniz, thank you for your reply. I initially made the mistake of using a capital letter in the queue as part of config files. I can now write, there is no error as a batch process. However, when I try to run the write stream, it says"Running Command" and it just keeps running without the output. I have the checkpoint folder being created, but I can't see my delta files. There is no _delta_log folder being created. I'm not sure what the issue is and my code is as follows:

cloudFilesOption = {
"cloudFiles.format": "csv",
"cloudFiles.useNotifications": "true", # Use file notifications for efficient discovery
"cloudFiles.includeExistingFiles": "true", # Process existing files in addition to new ones
"cloudFiles.connectionString": dbutils.secrets.get(scope="stockseval-con-string", key="stockseval-con-string"),
"cloudFiles.resourceGroup": "stocks",
"cloudFiles.subscriptionId": dbutils.secrets.get(scope="subscription-id", key="subscription-id"),
"cloudFiles.tenantId": dbutils.secrets.get(scope="tenant-id", key="tenant-id"),
"cloudFiles.clientId": dbutils.secrets.get(scope="clientid", key="clientid"),
"cloudFiles.clientSecret": dbutils.secrets.get(scope = "adls-db", key = "client-secret"),
"cloudFiles.maxFilesPerTrigger": "100", # Number of files to process per micro-batch
"cloudFiles.schemaLocation" : "abfss://financialdatabronze@stockseval.dfs.core.windows.net/",
"cloudFiles.schemaEvolutionMode" : "addNewColumns"
}

landing_loc = "/mnt/financialdatalandingzone/balancesheet/annualreports/"


df = spark.readStream.format("cloudFiles").options(**cloudFilesOption).option("Header" ,True).load(landing_loc)
 
bronze_layer = "/mnt/financialdatabronze/balancesheet/annualreports/"
df.writeStream.format("delta").outputMode("append").trigger(once= True).queryName("Autoloader").option("checkpointLocation", "/mnt/financialdatabronze/balancesheet/annualreports_checkpoint/").start(bronze_layer).awaitTermination()
 
 

manas_884
New Contributor II

Could you please elaborate?