05-12-2023 07:21 AM
object OurMainObject extends LazyLogging with IOApp {
def run(args: List[String]): IO[ExitCode] = {
logger.info("Started the application")
val conf = defaultOverrides.withFallback(defaultApplication).withFallback(defaultReference)
val app = new OurRunner(conf, sparkFactory)
app
.program(args)
.attempt
.flatMap {
case Left(ex) =>
IO(logger.error("Failure", ex))
.map(_ => ExitCode.Error)
case Right(_) =>
IO(logger.info("Success: Successfully finished running Glue application"))
.map(_ => ExitCode.Success)
}
}
what is the reason when the program exist code 1 is ignored by databricks?
our job is triggered by airflow using DatabricksSubmitRunOperator
spark_jar_task
the log from databricks driver is like this:
23/05/12 13:50:11 ERROR OurMain$: Failure
org.apache.spark.sql.AnalysisException: Path does not exist: abfss://blah/blah/excel/file/we/did/not/have
at org.apache.spark.sql.errors.QueryCompilationErrors$.dataPathNotExistError(QueryCompilationErrors.scala:1005)
at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$1(DataSource.scala:849)
at scala.collection.immutable.List.flatMap(List.scala:366)
at org.apache.spark.sql.execution.datasources.DataSource$.checkAndGlobPathIfNecessary(DataSource.scala:832)
at org.apache.spark.sql.execution.datasources.v2.FileTable.fileIndex$lzycompute(FileTable.scala:57)
at org.apache.spark.sql.execution.datasources.v2.FileTable.fileIndex(FileTable.scala:45)
at org.apache.spark.sql.execution.datasources.v2.FileTable.$anonfun$dataSchema$4(FileTable.scala:70)
at scala.Option.orElse(Option.scala:447)
at org.apache.spark.sql.execution.datasources.v2.FileTable.dataSchema$lzycompute(FileTable.scala:70)
at org.apache.spark.sql.execution.datasources.v2.FileTable.dataSchema(FileTable.scala:64)
at org.apache.spark.sql.execution.datasources.v2.FileTable.schema$lzycompute(FileTable.scala:82)
at org.apache.spark.sql.execution.datasources.v2.FileTable.schema(FileTable.scala:80)
at org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2.inferSchema(FileDataSourceV2.scala:94)
at org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2.inferSchema$(FileDataSourceV2.scala:92)
at com.crealytics.spark.excel.v2.ExcelDataSource.inferSchema(ExcelDataSource.scala:27)
at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils$.getTableFromProvider(DataSourceV2Utils.scala:91)
at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils$.loadV2Source(DataSourceV2Utils.scala:130)
at org.apache.spark.sql.DataFrameReader.$anonfun$load$1(DataFrameReader.scala:322)
at scala.Option.flatMap(Option.scala:271)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:320)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:236)
at our.company.package.name.dao.ReadFromDisk.readExcel(ReadFromDisk.scala:48)
at our.company.package.name.dao.ReadFromDisk.readExcel$(ReadFromDisk.scala:28)
at our.company.package.name.dao.MappingDAO.readExcel(MappingDAO.scala:14)
at our.company.package.name.dao.MappingDAO.read(MappingDAO.scala:24)
at our.company.package.name.SparkPopulateCore.read(SparkPopulateCore.scala:81)
at our.company.package.name.SparkPopulateCore.$anonfun$run$3(SparkPopulateCore.scala:29)
at apply @ our.company.another.package.name.SparkPopulateCore.setAzureCredentialsInContext(SparkPopulateCore.scala:40)
at flatMap @ our.company.another.package.name.SparkPopulateCore.$anonfun$run$1(SparkPopulateCore.scala:26)
at apply @ com.ourCompany.appName.appNameMain$.$anonfun$sparkFactory$1(appNameMain.scala:60)
at flatMap @ our.company.another.package.name.SparkPopulateCore.run(SparkPopulateCore.scala:23)
at map @ our.company.runner.processToCore(OurRunner.scala:21)
at map @ our.company.runner.$anonfun$program$3(OurRunner.scala:14)
at apply @ our.company.runner.$anonfun$program$1(OurRunner.scala:13)
at flatMap @ our.company.runner.$anonfun$program$1(OurRunner.scala:13)
at map @ our.company.runner$.$anonfun$readConfiguration$1(OurRunner.scala:36)
at flatMap @ our.company.runner$.com$ourCompany$appName$OurRunner$$readConfiguration(OurRunner.scala:35)
at flatMap @ our.company.runner.program(OurRunner.scala:12)
at flatMap @ com.ourCompany.appName.appNameMain$.run(appNameMain.scala:23)
05-16-2023 07:59 AM
my workaround now is to make the code like below, so the databricks jobs becomes failure.
case Left(ex) => {
IO(logger.error("Glue failure", ex)).map(_ => ExitCode.Error)
IO.raiseError(ex)
}
05-13-2023 09:01 AM
@min shi :
Based on the code snippet you provided, it looks like the Databricks job is configured to fail if the Spark application exits with code 1. The run method returns an ExitCode based on the result of the Spark application. If the application fails with an exception, the method returns ExitCode.Error, otherwise, it returns ExitCode.Success.
However, in the logs you provided, it seems that the Spark application failed with an AnalysisException, which is an exception and not an exit code. This might be the reason why the job is not failing as expected. To make sure that the job fails when the Spark application exits with code 1, you can modify the run method to handle the exit code returned by the Spark application. Here is an example:
def run(args: List[String]): IO[ExitCode] = {
logger.info("Started the application")
val conf = defaultOverrides.withFallback(defaultApplication).withFallback(defaultReference)
val app = new OurRunner(conf, sparkFactory)
val exitCodeIO = IO(app.program(args).waitFor())
exitCodeIO.flatMap { exitCode =>
if (exitCode == 0) {
IO(logger.info("Success: Successfully finished running Spark application"))
.map(_ => ExitCode.Success)
} else {
IO(logger.error(s"Failure: Spark application exited with code $exitCode"))
.map(_ => ExitCode.Error)
}
}
}
With this modification, the run method returns ExitCode.Success if the Spark application exits with code 0, and ExitCode.Error if it exits with any other code, including 1.
05-13-2023 06:08 PM
Hi @min shi
Thank you for posting your question in our community! We are happy to assist you.
To help us provide you with the most accurate information, could you please take a moment to review the responses and select the one that best answers your question?
This will also help other community members who may have similar questions in the future. Thank you for your participation and let us know if you need any further assistance!
05-15-2023 04:32 AM
Hi @Suteja Kanuri
what made you say this "it looks like the Databricks job is configured to fail if the Spark application exits with code 1. "
my question is why the databricks job is not failing when the exit code is 1.
our code is saying: regardless what exception, we will log it and exit the application with 1. Anything wrong with this approach?
05-16-2023 07:59 AM
my workaround now is to make the code like below, so the databricks jobs becomes failure.
case Left(ex) => {
IO(logger.error("Glue failure", ex)).map(_ => ExitCode.Error)
IO.raiseError(ex)
}
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group