cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

AutoLoader - problem with adding new source location

Marcin_U
New Contributor II

Hello,

I have some trouble with AutoLoader. Currently we use many diffrent source location on ADLS to read parquet files and write it to delta table using AutoLoader. Files in locations have the same schema.

Every things works fine untill we have to add new source location for existing table. In this case the error has thrown:

 

There are [2] sources in the checkpoint offsets and now there are [3] sources requested by the query

 

 

Our implementation of AutoLoader:

 

CloudFile config:

 

        cloudFile = {
                 "cloudFiles.usenotifications": False
                , "cloudFiles.format": self.source_format
                , "cloudFiles.schemaLocation": self.al_output_path_schema
                , "cloudFiles.inferColumnTypes": True
                , "cloudFiles.validateOptions": True
            }

 

AutoLoader run:

 

        df = self._al_readStream_from_paths()

        df.writeStream\
            .foreachBatch(lambda df, epoch_id: self._al_NAV_augment_base_stream(epoch_id, df))\
            .option( "checkpointLocation", self.al_output_path_checkpoint )\
            .option("mergeSchema", "true")\
            .queryName(f'_process_{self.source_mnt}_{self.target_name}')\
            .trigger(availableNow=True)\
            .start().awaitTermination()

 

_al_readStream_from_paths definition:

 

def _al_readStream_from_paths(self) -> None:

        list_of_paths = self._get_list_of_paths()
        if list_of_paths == []:
            df_single_path = self._al_readStream( source_path=self.source_path )
            return df_single_path
        else:
            df_unioned = None
            for path in list_of_paths:

                df_single_path = self._al_readStream( source_path=path )
                if df_unioned is None:
                    df_unioned = df_single_path
                else:
                    df_unioned = df_unioned.union(df_single_path)
            return df_unioned

 

_al_readStrem def:

 

    def _al_readStream(self, source_path) -> DataFrame:

        cloudFile = self._al_get_cloudFile_options()

        df = spark.readStream\
          .format("cloudFiles")\
          .options(**cloudFile)\
          .load(source_path)
        return df

 

_al_NAV_augment_base_stream used in writeStream include augument of df such as adding linegae columns etc.

 

My question is how to add new source location in proper way that not cause "There are [2] sources in the checkpoint offsets and now there are [3] sources requested by the query".

2 REPLIES 2

Kaniz
Community Manager
Community Manager

Hi @Marcin_UWhen adding a new source location to your AutoLoader setup in Spark Structured Streaming, you can follow these steps to avoid the error related to checkpoint offsets:

  1. Checkpoint Location:

    • First, delete the current checkpoint files associated with your job. This ensures a fresh start.
    • When you add a new source location, specify a new checkpoint location for the updated job. This will prevent conflicts with existing checkpoint data.
  2. Configure Auto Loader:

  3. Consider Using Wildcards:

    • If your new source location follows a similar pattern as existing ones, you can use wildcards in your path.
    • For example, if your paths are like /data/source1/2024-02-21/, /data/source2/2024-02-21/, etc., you can use a wildcard like /data/*/2024-02-21/.
    • This way, you won’t need to manually add each new source location; AutoLoader will pick up any matching directories.
  4. Query from cloud_files_state:

Remember that AutoLoader is designed to handle incremental data efficiently, but managing checkpoint locations and ensuring consistency across sources is crucial. By following these guidelines, you can seamlessly add new source locations without encountering the mentioned error. 🚀

Marcin_U
New Contributor II

Thanks for the reply @Kaniz . I have some questions related to you answer.

  1. Checkpoint Location:
    • Does deleteing checkpoint folder (or only files?) mean that next run of AutoLoader will load all files from provided source locations? So it will duplicate data which was alredy loaded to target delta table.
  2. Configure Auto Loader:
    • Am I undestand correctly that InMemoryFileIndex is used for listing files and directories more efficiently but there is no possibility to use it with AutoLoader with cloudFiles?
    • How about to implement process which move (for backup purpose) or delete processed by AutoLoader files? It could resolve problem with long files listing. Is there any feature like this in AutoLoader? In fact I have found "archive_timestamp" column in "cloud_file_state" but it keeps only nulls.
  3. Consider Using Wildcards:
    • It looks like wildcards could resolve my problem. Please confirm that using wildcards create only one soruce in "checkpoint/sources" directory?
      Marcin_U_0-1708616776547.png
    • I wonder why my implementation create new source in "checkpoint/sources" folder after adding new source locations to AutoLoader? Is it due to start run readStream as many times as source locations in
      _al_readStream_from_paths​
      method ?
Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.