Hi Kaniz, thank you for your reply. I initially made the mistake of using a capital letter in the queue as part of config files. I can now write, there is no error as a batch process. However, when I try to run the write stream, it says"Running Command" and it just keeps running without the output. I have the checkpoint folder being created, but I can't see my delta files. There is no _delta_log folder being created. I'm not sure what the issue is and my code is as follows:
cloudFilesOption = {
"cloudFiles.format": "csv",
"cloudFiles.useNotifications": "true", # Use file notifications for efficient discovery
"cloudFiles.includeExistingFiles": "true", # Process existing files in addition to new ones
"cloudFiles.connectionString": dbutils.secrets.get(scope="stockseval-con-string", key="stockseval-con-string"),
"cloudFiles.resourceGroup": "stocks",
"cloudFiles.subscriptionId": dbutils.secrets.get(scope="subscription-id", key="subscription-id"),
"cloudFiles.tenantId": dbutils.secrets.get(scope="tenant-id", key="tenant-id"),
"cloudFiles.clientId": dbutils.secrets.get(scope="clientid", key="clientid"),
"cloudFiles.clientSecret": dbutils.secrets.get(scope = "adls-db", key = "client-secret"),
"cloudFiles.maxFilesPerTrigger": "100", # Number of files to process per micro-batch
"cloudFiles.schemaLocation" : "abfss://financialdatabronze@stockseval.dfs.core.windows.net/",
"cloudFiles.schemaEvolutionMode" : "addNewColumns"
}
landing_loc = "/mnt/financialdatalandingzone/balancesheet/annualreports/"
df = spark.readStream.format("cloudFiles").options(**cloudFilesOption).option("Header" ,True).load(landing_loc)
bronze_layer = "/mnt/financialdatabronze/balancesheet/annualreports/"
df.writeStream.format("delta").outputMode("append").trigger(once= True).queryName("Autoloader").option("checkpointLocation", "/mnt/financialdatabronze/balancesheet/annualreports_checkpoint/").start(bronze_layer).awaitTermination()