My scenario is this:
- autoloader loads data from ADLS once a day.
- data are merged into the bronze zone.
- bronze table has CDF turned on.
- read CDF and merge into the silver.
all works fine, but when there is not new file for autoloader to process, CDF takes latest version of the bronze table and merge data again to silver table.
This is extra step I want to avoid. I tried find the way how to use df.count() to check number of rows inserted by autoloader but that was not possible.
The only solution for now is to check latest version number of silver table at the beginning of the autoloader and compare it with the version number after autoloader process to check if new version is higher than previous.
Any other idea how to prevent running unnecessary merge from bronze to silver if autoloader did not process new file?
check last version of CDF
from delta.tables import DeltaTable
from pyspark.sql.functions import current_timestamp
table_path = bronze_table
deltaTable = DeltaTable.forPath(spark, table_path)
last_version = str(DeltaTable.forPath(spark, table_path).history(1).head()["version"])
print(f"last table version is {last_version}")
run autoloader
from pyspark.sql.functions import current_timestamp
df = (spark.readStream
.option("cloudFiles.format", "parquet")
.option("cloudFiles.schemaLocation", checkpoint_path)
.withColumn("inserted_datetime", current_timestamp())
query = df.writeStream \
.format("delta") \
.option("mergeSchema", "true") \
.option("checkpointLocation", checkpoint_path) \
.queryName(f"query: {bronze_table}") \
.foreachBatch(merge_bronze(args)) \
.trigger(availableNow=True) \
and then before merge to silver check version again
from delta.tables import DeltaTable
from delta.tables import DeltaTable
from pyspark.sql.functions import current_timestamp
table_path = bronze_table
deltaTable = DeltaTable.forPath(spark, table_path)
new_version = str(DeltaTable.forPath(spark, table_path).history(1).head()["version"])
print(f"new table version is: {new_version}")
if new_version > last_version:
print('there is not new version of the table')
dbutils.notebook.exit('exit notebook')