cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

AutoLoader - problem with adding new source location

Marcin_U
New Contributor II

Hello,

I have some trouble with AutoLoader. Currently we use many diffrent source location on ADLS to read parquet files and write it to delta table using AutoLoader. Files in locations have the same schema.

Every things works fine untill we have to add new source location for existing table. In this case the error has thrown:

 

There are [2] sources in the checkpoint offsets and now there are [3] sources requested by the query

 

 

Our implementation of AutoLoader:

 

CloudFile config:

 

        cloudFile = {
                 "cloudFiles.usenotifications": False
                , "cloudFiles.format": self.source_format
                , "cloudFiles.schemaLocation": self.al_output_path_schema
                , "cloudFiles.inferColumnTypes": True
                , "cloudFiles.validateOptions": True
            }

 

AutoLoader run:

 

        df = self._al_readStream_from_paths()

        df.writeStream\
            .foreachBatch(lambda df, epoch_id: self._al_NAV_augment_base_stream(epoch_id, df))\
            .option( "checkpointLocation", self.al_output_path_checkpoint )\
            .option("mergeSchema", "true")\
            .queryName(f'_process_{self.source_mnt}_{self.target_name}')\
            .trigger(availableNow=True)\
            .start().awaitTermination()

 

_al_readStream_from_paths definition:

 

def _al_readStream_from_paths(self) -> None:

        list_of_paths = self._get_list_of_paths()
        if list_of_paths == []:
            df_single_path = self._al_readStream( source_path=self.source_path )
            return df_single_path
        else:
            df_unioned = None
            for path in list_of_paths:

                df_single_path = self._al_readStream( source_path=path )
                if df_unioned is None:
                    df_unioned = df_single_path
                else:
                    df_unioned = df_unioned.union(df_single_path)
            return df_unioned

 

_al_readStrem def:

 

    def _al_readStream(self, source_path) -> DataFrame:

        cloudFile = self._al_get_cloudFile_options()

        df = spark.readStream\
          .format("cloudFiles")\
          .options(**cloudFile)\
          .load(source_path)
        return df

 

_al_NAV_augment_base_stream used in writeStream include augument of df such as adding linegae columns etc.

 

My question is how to add new source location in proper way that not cause "There are [2] sources in the checkpoint offsets and now there are [3] sources requested by the query".

1 REPLY 1

Marcin_U
New Contributor II

Thanks for the reply @Retired_mod . I have some questions related to you answer.

  1. Checkpoint Location:
    • Does deleteing checkpoint folder (or only files?) mean that next run of AutoLoader will load all files from provided source locations? So it will duplicate data which was alredy loaded to target delta table.
  2. Configure Auto Loader:
    • Am I undestand correctly that InMemoryFileIndex is used for listing files and directories more efficiently but there is no possibility to use it with AutoLoader with cloudFiles?
    • How about to implement process which move (for backup purpose) or delete processed by AutoLoader files? It could resolve problem with long files listing. Is there any feature like this in AutoLoader? In fact I have found "archive_timestamp" column in "cloud_file_state" but it keeps only nulls.
  3. Consider Using Wildcards:
    • It looks like wildcards could resolve my problem. Please confirm that using wildcards create only one soruce in "checkpoint/sources" directory?
      Marcin_U_0-1708616776547.png
    • I wonder why my implementation create new source in "checkpoint/sources" folder after adding new source locations to AutoLoader? Is it due to start run readStream as many times as source locations in
      _al_readStream_from_paths​
      method ?

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group