When the notebook is run by the jobs/workflow scheduler, the data is never imported, but the files do get removed.
When run directly (as in running the cell) or when running the Job manually (as in clicking Run Now from the Jobs UI), the data does get imported.
There aren't any obvious errors reported in the job logs. Previously, if the import files contained an invalid schema, the job would actually fail and an alert sent.
import re
dir = "/FileStore/shared_uploads/gnosis/import/"
files = dbutils.fs.ls(dir)
for f in files:
tab = re.split("_\d+",f.name)[0]
print(f.name + " -> " + tab)
df = spark.read.format('csv').load(dir + f.name, inferSchema="true", header="true")
if tab == "t_1" or tab == "t_2":
df.write.option("mergeSchema",True).saveAsTable('extracts.{}'.format(tab), format='delta', mode='append')
else:
df.write.option("mergeSchema",True).saveAsTable('extracts.{}'.format(tab), format='delta', mode='overwrite')
dbutils.fs.rm(dir + f.name)
{
"job_id": 1,
"settings": {
"name": "import data",
"email_notifications": {
"on_failure": [
"gnosis"
],
"no_alert_for_skipped_runs": true
},
"timeout_seconds": 0,
"schedule": {
"quartz_cron_expression": "54 5 0/2 * * ?",
"timezone_id": "America/Detroit",
"pause_status": "PAUSED"
},
"max_concurrent_runs": 1,
"tasks": [
{
"task_key": "import_data",
"notebook_task": {
"notebook_path": "/Users/gnosis/import data"
},
"existing_cluster_id": "1",
"timeout_seconds": 0,
"email_notifications": {}
}
],
"format": "MULTI_TASK"
},
"created_time": 1648574818835,
"creator_user_name": "gnosis",
"run_as_user_name": "gnosis"
}