cancel
Showing results for 
Search instead for 
Did you mean: 
Get Started Discussions
Start your journey with Databricks by joining discussions on getting started guides, tutorials, and introductory topics. Connect with beginners and experts alike to kickstart your Databricks experience.
cancel
Showing results for 
Search instead for 
Did you mean: 

How to check if autoloader processed new file

alesventus
New Contributor III

Hi,

My scenario is this: 

  • autoloader loads data from ADLS once a day.
  • data are merged into the bronze zone.
  • bronze table has CDF turned on.
  • read CDF and merge into the silver.

all works fine, but when there is not new file for autoloader to process, CDF takes latest version of the bronze table and merge data again to silver table.

This is extra step I want to avoid. I tried find the way how to use df.count() to check number of rows inserted by autoloader but that was not possible.

The only solution for now is to check latest version number of silver table at the beginning of the autoloader and compare it with the version number after autoloader process to check if new version is higher than previous.

Any other idea how to prevent running unnecessary merge from bronze to silver if autoloader did not process new file?

check last version of CDF

 

 

from delta.tables import DeltaTable
from pyspark.sql.functions import current_timestamp

table_path = bronze_table
deltaTable = DeltaTable.forPath(spark, table_path)

last_version = str(DeltaTable.forPath(spark, table_path).history(1).head()["version"])

print(f"last table version is {last_version}")

 

 

run autoloader

 

 

from pyspark.sql.functions import current_timestamp

df = (spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "parquet")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .load(autoloader_path)
  .withColumn("inserted_datetime", current_timestamp())
)

query = df.writeStream \
  .format("delta") \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", checkpoint_path) \
  .queryName(f"query: {bronze_table}") \
  .foreachBatch(merge_bronze(args)) \
  .trigger(availableNow=True) \
  .start()
 
query.awaitTermination()

 

 

and then before merge to silver check version again 

 

 

from delta.tables import DeltaTable
from delta.tables import DeltaTable
from pyspark.sql.functions import current_timestamp

table_path = bronze_table
deltaTable = DeltaTable.forPath(spark, table_path)

new_version = str(DeltaTable.forPath(spark, table_path).history(1).head()["version"])

print(f"new table version is: {new_version}")

if new_version > last_version:
…else:
  print('there is not new version of the table')
  dbutils.notebook.exit('exit notebook')

 

 

 Thanks.

0 REPLIES 0
Join 100K+ Data Experts: Register Now & Grow with Us!

Excited to expand your horizons with us? Click here to Register and begin your journey to success!

Already a member? Login and join your local regional user group! If there isn’t one near you, fill out this form and we’ll create one for you to join!