cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

how to make databricks job to fail when the application has already given "exit code 1"?

source2sea
Contributor
object OurMainObject extends LazyLogging with IOApp {
  def run(args: List[String]): IO[ExitCode] = {
    logger.info("Started the application")
 
    val conf = defaultOverrides.withFallback(defaultApplication).withFallback(defaultReference)
    val app  = new OurRunner(conf, sparkFactory)
 
    app
      .program(args)
      .attempt
      .flatMap {
        case Left(ex) =>
          IO(logger.error("Failure", ex))
            .map(_ => ExitCode.Error)
        case Right(_) =>
          IO(logger.info("Success: Successfully finished running Glue application"))
            .map(_ => ExitCode.Success)
      }
  }

what is the reason when the program exist code 1 is ignored by databricks?

our job is triggered by airflow using DatabricksSubmitRunOperator

spark_jar_task

the log from databricks driver is like this:

23/05/12 13:50:11 ERROR OurMain$: Failure
org.apache.spark.sql.AnalysisException: Path does not exist: abfss://blah/blah/excel/file/we/did/not/have
	at org.apache.spark.sql.errors.QueryCompilationErrors$.dataPathNotExistError(QueryCompilationErrors.scala:1005)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$1(DataSource.scala:849)
	at scala.collection.immutable.List.flatMap(List.scala:366)
	at org.apache.spark.sql.execution.datasources.DataSource$.checkAndGlobPathIfNecessary(DataSource.scala:832)
	at org.apache.spark.sql.execution.datasources.v2.FileTable.fileIndex$lzycompute(FileTable.scala:57)
	at org.apache.spark.sql.execution.datasources.v2.FileTable.fileIndex(FileTable.scala:45)
	at org.apache.spark.sql.execution.datasources.v2.FileTable.$anonfun$dataSchema$4(FileTable.scala:70)
	at scala.Option.orElse(Option.scala:447)
	at org.apache.spark.sql.execution.datasources.v2.FileTable.dataSchema$lzycompute(FileTable.scala:70)
	at org.apache.spark.sql.execution.datasources.v2.FileTable.dataSchema(FileTable.scala:64)
	at org.apache.spark.sql.execution.datasources.v2.FileTable.schema$lzycompute(FileTable.scala:82)
	at org.apache.spark.sql.execution.datasources.v2.FileTable.schema(FileTable.scala:80)
	at org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2.inferSchema(FileDataSourceV2.scala:94)
	at org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2.inferSchema$(FileDataSourceV2.scala:92)
	at com.crealytics.spark.excel.v2.ExcelDataSource.inferSchema(ExcelDataSource.scala:27)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils$.getTableFromProvider(DataSourceV2Utils.scala:91)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils$.loadV2Source(DataSourceV2Utils.scala:130)
	at org.apache.spark.sql.DataFrameReader.$anonfun$load$1(DataFrameReader.scala:322)
	at scala.Option.flatMap(Option.scala:271)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:320)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:236)
	at our.company.package.name.dao.ReadFromDisk.readExcel(ReadFromDisk.scala:48)
	at our.company.package.name.dao.ReadFromDisk.readExcel$(ReadFromDisk.scala:28)
	at our.company.package.name.dao.MappingDAO.readExcel(MappingDAO.scala:14)
	at our.company.package.name.dao.MappingDAO.read(MappingDAO.scala:24)
	at our.company.package.name.SparkPopulateCore.read(SparkPopulateCore.scala:81)
	at our.company.package.name.SparkPopulateCore.$anonfun$run$3(SparkPopulateCore.scala:29)
	at apply @ our.company.another.package.name.SparkPopulateCore.setAzureCredentialsInContext(SparkPopulateCore.scala:40)
	at flatMap @ our.company.another.package.name.SparkPopulateCore.$anonfun$run$1(SparkPopulateCore.scala:26)
	at apply @ com.ourCompany.appName.appNameMain$.$anonfun$sparkFactory$1(appNameMain.scala:60)
	at flatMap @ our.company.another.package.name.SparkPopulateCore.run(SparkPopulateCore.scala:23)
	at map @ our.company.runner.processToCore(OurRunner.scala:21)
	at map @ our.company.runner.$anonfun$program$3(OurRunner.scala:14)
	at apply @ our.company.runner.$anonfun$program$1(OurRunner.scala:13)
	at flatMap @ our.company.runner.$anonfun$program$1(OurRunner.scala:13)
	at map @ our.company.runner$.$anonfun$readConfiguration$1(OurRunner.scala:36)
	at flatMap @ our.company.runner$.com$ourCompany$appName$OurRunner$$readConfiguration(OurRunner.scala:35)
	at flatMap @ our.company.runner.program(OurRunner.scala:12)
	at flatMap @ com.ourCompany.appName.appNameMain$.run(appNameMain.scala:23)

1 ACCEPTED SOLUTION

Accepted Solutions

source2sea
Contributor

my workaround now is to make the code like below, so the databricks jobs becomes failure.

        case Left(ex) => {
          IO(logger.error("Glue failure", ex)).map(_ => ExitCode.Error)
          IO.raiseError(ex)
        }

View solution in original post

4 REPLIES 4

Anonymous
Not applicable

@min shi​ :

Based on the code snippet you provided, it looks like the Databricks job is configured to fail if the Spark application exits with code 1. The run method returns an ExitCode based on the result of the Spark application. If the application fails with an exception, the method returns ExitCode.Error, otherwise, it returns ExitCode.Success.

However, in the logs you provided, it seems that the Spark application failed with an AnalysisException, which is an exception and not an exit code. This might be the reason why the job is not failing as expected. To make sure that the job fails when the Spark application exits with code 1, you can modify the run method to handle the exit code returned by the Spark application. Here is an example:

def run(args: List[String]): IO[ExitCode] = {
  logger.info("Started the application")
 
  val conf = defaultOverrides.withFallback(defaultApplication).withFallback(defaultReference)
  val app  = new OurRunner(conf, sparkFactory)
 
  val exitCodeIO = IO(app.program(args).waitFor())
  
  exitCodeIO.flatMap { exitCode =>
    if (exitCode == 0) {
      IO(logger.info("Success: Successfully finished running Spark application"))
        .map(_ => ExitCode.Success)
    } else {
      IO(logger.error(s"Failure: Spark application exited with code $exitCode"))
        .map(_ => ExitCode.Error)
    }
  }
}

With this modification, the run method returns ExitCode.Success if the Spark application exits with code 0, and ExitCode.Error if it exits with any other code, including 1.

Anonymous
Not applicable

Hi @min shi​ 

Thank you for posting your question in our community! We are happy to assist you.

To help us provide you with the most accurate information, could you please take a moment to review the responses and select the one that best answers your question?

This will also help other community members who may have similar questions in the future. Thank you for your participation and let us know if you need any further assistance! 

source2sea
Contributor

Hi @Suteja Kanuri​ 

what made you say this "it looks like the Databricks job is configured to fail if the Spark application exits with code 1. "

my question is why the databricks job is not failing when the exit code is 1.

our code is saying: regardless what exception, we will log it and exit the application with 1. Anything wrong with this approach?

image

source2sea
Contributor

my workaround now is to make the code like below, so the databricks jobs becomes failure.

        case Left(ex) => {
          IO(logger.error("Glue failure", ex)).map(_ => ExitCode.Error)
          IO.raiseError(ex)
        }

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.