PaoloF
New Contributor II

Thank you, very much @Louis_Frolio !!
Now it works as expected.

only some comments:
- the option .option("cloudFiles.includeFileName", "true")  seems not working, exists this option?
- the option 'input_file_name()' is not working in Unity Catalog
so I have maintened my first solution:
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "parquet")
.option("cloudFiles.includeExistingFiles", "true")
.option("mergeSchema", "true")
.option("cloudFiles.schemaEvolutionMode", "addNewColumns")
.option("cloudFiles.schemaLocation", checkpoint_location)
.load(source_directory)
.selectExpr("*", "_metadata.file_path as _source_file_path")

Probably i did some mistake in the function (i'm not a python expert)...
i think the dropdn() is the key in 

files_df = (
microBatchOutputDF
.select("_source_file_path") # Or use "_metadata.file_path" if that's your column
.dropna()
.distinct()
)

Thanks!!