I am using a notebook to copy over my database on a schedule (I had no success connecting through the Data Explorer UI). When I run the notebook on its own, it works. When I run it as a scheduled job, I get this error.
org.apache.spark.SparkSQLException: Unsupported type ARRAY.
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
File <command-3916833176743968>:13
1 tables = [
2 "daily_aggregations",
3 "journal_entries",
(...)
9 "users"
10 ]
12 for table in tables:
---> 13 remote_table = (spark.read
14 .format("jdbc")
15 .option("driver", driver)
16 .option("url", url)
17 .option("dbtable", table)
18 .option("user", user)
19 .option("password", password)
20 .load()
21 )
22 remote_table.write.mode("overwrite").saveAsTable(table)
Any ideas why it only fails when run as a job?
This is my code.
driver = "org.postgresql.Driver"
user = dbutils.secrets.get("leaflet-database", "user")
password = dbutils.secrets.get("leaflet-database", "password")
host = dbutils.secrets.get("leaflet-database", "host")
port = dbutils.secrets.get("leaflet-database", "port")
name = dbutils.secrets.get("leaflet-database", "name")
url = f"jdbc:postgresql://{host}:{port}/{name}"
tables = [
"daily_aggregations",
"journal_entries",
"plant_actions",
"plant_classifications",
"plants",
"readings",
"sensors",
"users"
]
for table in tables:
remote_table = (spark.read
.format("jdbc")
.option("driver", driver)
.option("url", url)
.option("dbtable", table)
.option("user", user)
.option("password", password)
.load()
)
remote_table.write.mode("overwrite").saveAsTable(table)
This is the job definition JSON.
{
"run_as": {
"user_name": "jesse@krado.co"
},
"name": "migrate_leaflet_database",
"email_notifications": {
"no_alert_for_skipped_runs": false
},
"webhook_notifications": {},
"timeout_seconds": 0,
"schedule": {
"quartz_cron_expression": "11 0 5 * * ?",
"timezone_id": "America/Boise",
"pause_status": "UNPAUSED"
},
"max_concurrent_runs": 1,
"tasks": [
{
"task_key": "migrate_leaflet_database",
"run_if": "ALL_SUCCESS",
"notebook_task": {
"notebook_path": "/Users/jesse@krado.co/leaflet-database-migration",
"source": "WORKSPACE"
},
"job_cluster_key": "Job_cluster",
"timeout_seconds": 0,
"email_notifications": {
"on_success": [
"jesse@krado.co"
],
"on_failure": [
"jesse@krado.co"
]
},
"notification_settings": {
"no_alert_for_skipped_runs": true,
"no_alert_for_canceled_runs": true,
"alert_on_last_attempt": true
}
}
],
"job_clusters": [
{
"job_cluster_key": "Job_cluster",
"new_cluster": {
"cluster_name": "",
"spark_version": "12.2.x-scala2.12",
"aws_attributes": {
"first_on_demand": 1,
"availability": "SPOT_WITH_FALLBACK",
"zone_id": "us-east-1f",
"spot_bid_price_percent": 100,
"ebs_volume_count": 0
},
"node_type_id": "i3.xlarge",
"spark_env_vars": {
"PYSPARK_PYTHON": "/databricks/python3/bin/python3"
},
"enable_elastic_disk": false,
"data_security_mode": "SINGLE_USER",
"runtime_engine": "STANDARD",
"num_workers": 8
}
}
],
"format": "MULTI_TASK"
}