cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

AutoLoader - handle spark write transactional (_SUCCESS file) on ADLS

Marcin_U
New Contributor II

Spark write method (df.write.parquet) to parquet files is transactional. I mean after write is sucessfull file _SUCCESS is created in path where parquet files was loaded.

Marcin_U_0-1709647032623.png

Is it possible to configure AutoLoader to load parquet files only in case when write is done with success (_SUCCESS file was appeared) ?

2 REPLIES 2

Kaniz
Community Manager
Community Manager

Hi @Marcin_UWhen writing a Parquet file using Spark’s df.write.parquet method, the process is transactional. After a successful write, an _SUCCESS file is created in the same directory where the Parquet files ...1. However, if you want to configure AutoLoader to load Parquet files only when the write operation is successful (i.e., when the _SUCCESS file appears), you can follow these steps:

  1. Check for _SUCCESS File:

    • Before loading the Parquet files, verify the presence of the _SUCCESS file in the target directory.
    • If the _SUCCESS file exists, proceed with loading the Parquet data.
  2. Conditional Loading:

    • Implement a conditional check in your AutoLoader logic.
    • If the _SUCCESS file is present, load the Parquet files.
    • If the _SUCCESS file is not found (indicating a failed write), skip the loading step.
  3. Example (Python):

    from pyspark.sql import SparkSession
    
    # Create a Spark session
    spark = SparkSession.builder.appName("ParquetLoader").getOrCreate()
    
    # Specify the path to the Parquet files
    parquet_path = "/path/to/parquet/files"
    
    # Check if the _SUCCESS file exists
    success_file_exists = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration()).exists(spark._jvm.org.apache.hadoop.fs.Path(parquet_path + "/_SUCCESS"))
    
    if success_file_exists:
        # Load the Parquet data
        df = spark.read.parquet(parquet_path)
        # Process the data as needed
        # ...
    
        # Additional logic for AutoLoader (e.g., streaming, batch, etc.)
        # ...
    
    else:
        print("Write operation was not successful. Skipping Parquet loading.")
    
    # Stop the Spark session
    spark.stop()
    

Remember to adjust the paths and additional logic according to your specific use case. By checking for the _SUCCESS file, you can ensure that the Parquet files are loaded only when the wr...12.

Please note that this example assumes a Python environment. If you’re using Scala or Java, the approach will be similar, but you’ll need to adapt the syntax accordingly.

 

Marcin_U
New Contributor II

I think my question wasn't understood correctly. I meant AutoLoader as the data loading tool provided by Databricks (https://docs.databricks.com/en/ingestion/auto-loader/index.html).

AutoLoader has set of different options to setup (https://docs.databricks.com/en/ingestion/auto-loader/options.html) but I don't find any option to help me achive resault which I described in this topc. Any ideas how to resolve my problem?

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.