Hello,
I have some trouble with AutoLoader. Currently we use many diffrent source location on ADLS to read parquet files and write it to delta table using AutoLoader. Files in locations have the same schema.
Every things works fine untill we have to add new source location for existing table. In this case the error has thrown:
There are [2] sources in the checkpoint offsets and now there are [3] sources requested by the query
Our implementation of AutoLoader:
CloudFile config:
cloudFile = {
"cloudFiles.usenotifications": False
, "cloudFiles.format": self.source_format
, "cloudFiles.schemaLocation": self.al_output_path_schema
, "cloudFiles.inferColumnTypes": True
, "cloudFiles.validateOptions": True
}
AutoLoader run:
df = self._al_readStream_from_paths()
df.writeStream\
.foreachBatch(lambda df, epoch_id: self._al_NAV_augment_base_stream(epoch_id, df))\
.option( "checkpointLocation", self.al_output_path_checkpoint )\
.option("mergeSchema", "true")\
.queryName(f'_process_{self.source_mnt}_{self.target_name}')\
.trigger(availableNow=True)\
.start().awaitTermination()
_al_readStream_from_paths definition:
def _al_readStream_from_paths(self) -> None:
list_of_paths = self._get_list_of_paths()
if list_of_paths == []:
df_single_path = self._al_readStream( source_path=self.source_path )
return df_single_path
else:
df_unioned = None
for path in list_of_paths:
df_single_path = self._al_readStream( source_path=path )
if df_unioned is None:
df_unioned = df_single_path
else:
df_unioned = df_unioned.union(df_single_path)
return df_unioned
_al_readStrem def:
def _al_readStream(self, source_path) -> DataFrame:
cloudFile = self._al_get_cloudFile_options()
df = spark.readStream\
.format("cloudFiles")\
.options(**cloudFile)\
.load(source_path)
return df
_al_NAV_augment_base_stream used in writeStream include augument of df such as adding linegae columns etc.
My question is how to add new source location in proper way that not cause "There are [2] sources in the checkpoint offsets and now there are [3] sources requested by the query".