cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

how to make databricks job to fail when the application has already given "exit code 1"?

source2sea
Contributor
object OurMainObject extends LazyLogging with IOApp {
  def run(args: List[String]): IO[ExitCode] = {
    logger.info("Started the application")
 
    val conf = defaultOverrides.withFallback(defaultApplication).withFallback(defaultReference)
    val app  = new OurRunner(conf, sparkFactory)
 
    app
      .program(args)
      .attempt
      .flatMap {
        case Left(ex) =>
          IO(logger.error("Failure", ex))
            .map(_ => ExitCode.Error)
        case Right(_) =>
          IO(logger.info("Success: Successfully finished running Glue application"))
            .map(_ => ExitCode.Success)
      }
  }

what is the reason when the program exist code 1 is ignored by databricks?

our job is triggered by airflow using DatabricksSubmitRunOperator

spark_jar_task

the log from databricks driver is like this:

23/05/12 13:50:11 ERROR OurMain$: Failure
org.apache.spark.sql.AnalysisException: Path does not exist: abfss://blah/blah/excel/file/we/did/not/have
	at org.apache.spark.sql.errors.QueryCompilationErrors$.dataPathNotExistError(QueryCompilationErrors.scala:1005)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$1(DataSource.scala:849)
	at scala.collection.immutable.List.flatMap(List.scala:366)
	at org.apache.spark.sql.execution.datasources.DataSource$.checkAndGlobPathIfNecessary(DataSource.scala:832)
	at org.apache.spark.sql.execution.datasources.v2.FileTable.fileIndex$lzycompute(FileTable.scala:57)
	at org.apache.spark.sql.execution.datasources.v2.FileTable.fileIndex(FileTable.scala:45)
	at org.apache.spark.sql.execution.datasources.v2.FileTable.$anonfun$dataSchema$4(FileTable.scala:70)
	at scala.Option.orElse(Option.scala:447)
	at org.apache.spark.sql.execution.datasources.v2.FileTable.dataSchema$lzycompute(FileTable.scala:70)
	at org.apache.spark.sql.execution.datasources.v2.FileTable.dataSchema(FileTable.scala:64)
	at org.apache.spark.sql.execution.datasources.v2.FileTable.schema$lzycompute(FileTable.scala:82)
	at org.apache.spark.sql.execution.datasources.v2.FileTable.schema(FileTable.scala:80)
	at org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2.inferSchema(FileDataSourceV2.scala:94)
	at org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2.inferSchema$(FileDataSourceV2.scala:92)
	at com.crealytics.spark.excel.v2.ExcelDataSource.inferSchema(ExcelDataSource.scala:27)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils$.getTableFromProvider(DataSourceV2Utils.scala:91)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils$.loadV2Source(DataSourceV2Utils.scala:130)
	at org.apache.spark.sql.DataFrameReader.$anonfun$load$1(DataFrameReader.scala:322)
	at scala.Option.flatMap(Option.scala:271)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:320)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:236)
	at our.company.package.name.dao.ReadFromDisk.readExcel(ReadFromDisk.scala:48)
	at our.company.package.name.dao.ReadFromDisk.readExcel$(ReadFromDisk.scala:28)
	at our.company.package.name.dao.MappingDAO.readExcel(MappingDAO.scala:14)
	at our.company.package.name.dao.MappingDAO.read(MappingDAO.scala:24)
	at our.company.package.name.SparkPopulateCore.read(SparkPopulateCore.scala:81)
	at our.company.package.name.SparkPopulateCore.$anonfun$run$3(SparkPopulateCore.scala:29)
	at apply @ our.company.another.package.name.SparkPopulateCore.setAzureCredentialsInContext(SparkPopulateCore.scala:40)
	at flatMap @ our.company.another.package.name.SparkPopulateCore.$anonfun$run$1(SparkPopulateCore.scala:26)
	at apply @ com.ourCompany.appName.appNameMain$.$anonfun$sparkFactory$1(appNameMain.scala:60)
	at flatMap @ our.company.another.package.name.SparkPopulateCore.run(SparkPopulateCore.scala:23)
	at map @ our.company.runner.processToCore(OurRunner.scala:21)
	at map @ our.company.runner.$anonfun$program$3(OurRunner.scala:14)
	at apply @ our.company.runner.$anonfun$program$1(OurRunner.scala:13)
	at flatMap @ our.company.runner.$anonfun$program$1(OurRunner.scala:13)
	at map @ our.company.runner$.$anonfun$readConfiguration$1(OurRunner.scala:36)
	at flatMap @ our.company.runner$.com$ourCompany$appName$OurRunner$$readConfiguration(OurRunner.scala:35)
	at flatMap @ our.company.runner.program(OurRunner.scala:12)
	at flatMap @ com.ourCompany.appName.appNameMain$.run(appNameMain.scala:23)

1 ACCEPTED SOLUTION

Accepted Solutions

source2sea
Contributor

my workaround now is to make the code like below, so the databricks jobs becomes failure.

        case Left(ex) => {
          IO(logger.error("Glue failure", ex)).map(_ => ExitCode.Error)
          IO.raiseError(ex)
        }

View solution in original post

4 REPLIES 4

Anonymous
Not applicable

@min shi​ :

Based on the code snippet you provided, it looks like the Databricks job is configured to fail if the Spark application exits with code 1. The run method returns an ExitCode based on the result of the Spark application. If the application fails with an exception, the method returns ExitCode.Error, otherwise, it returns ExitCode.Success.

However, in the logs you provided, it seems that the Spark application failed with an AnalysisException, which is an exception and not an exit code. This might be the reason why the job is not failing as expected. To make sure that the job fails when the Spark application exits with code 1, you can modify the run method to handle the exit code returned by the Spark application. Here is an example:

def run(args: List[String]): IO[ExitCode] = {
  logger.info("Started the application")
 
  val conf = defaultOverrides.withFallback(defaultApplication).withFallback(defaultReference)
  val app  = new OurRunner(conf, sparkFactory)
 
  val exitCodeIO = IO(app.program(args).waitFor())
  
  exitCodeIO.flatMap { exitCode =>
    if (exitCode == 0) {
      IO(logger.info("Success: Successfully finished running Spark application"))
        .map(_ => ExitCode.Success)
    } else {
      IO(logger.error(s"Failure: Spark application exited with code $exitCode"))
        .map(_ => ExitCode.Error)
    }
  }
}

With this modification, the run method returns ExitCode.Success if the Spark application exits with code 0, and ExitCode.Error if it exits with any other code, including 1.

Anonymous
Not applicable

Hi @min shi​ 

Thank you for posting your question in our community! We are happy to assist you.

To help us provide you with the most accurate information, could you please take a moment to review the responses and select the one that best answers your question?

This will also help other community members who may have similar questions in the future. Thank you for your participation and let us know if you need any further assistance! 

source2sea
Contributor

Hi @Suteja Kanuri​ 

what made you say this "it looks like the Databricks job is configured to fail if the Spark application exits with code 1. "

my question is why the databricks job is not failing when the exit code is 1.

our code is saying: regardless what exception, we will log it and exit the application with 1. Anything wrong with this approach?

image

source2sea
Contributor

my workaround now is to make the code like below, so the databricks jobs becomes failure.

        case Left(ex) => {
          IO(logger.error("Glue failure", ex)).map(_ => ExitCode.Error)
          IO.raiseError(ex)
        }

Join 100K+ Data Experts: Register Now & Grow with Us!

Excited to expand your horizons with us? Click here to Register and begin your journey to success!

Already a member? Login and join your local regional user group! If there isn’t one near you, fill out this form and we’ll create one for you to join!