cancel
Showing results for 
Search instead for 
Did you mean: 
Get Started Discussions
Start your journey with Databricks by joining discussions on getting started guides, tutorials, and introductory topics. Connect with beginners and experts alike to kickstart your Databricks experience.
cancel
Showing results for 
Search instead for 
Did you mean: 

How to check if autoloader processed new file

alesventus
Contributor

Hi,

My scenario is this: 

  • autoloader loads data from ADLS once a day.
  • data are merged into the bronze zone.
  • bronze table has CDF turned on.
  • read CDF and merge into the silver.

all works fine, but when there is not new file for autoloader to process, CDF takes latest version of the bronze table and merge data again to silver table.

This is extra step I want to avoid. I tried find the way how to use df.count() to check number of rows inserted by autoloader but that was not possible.

The only solution for now is to check latest version number of silver table at the beginning of the autoloader and compare it with the version number after autoloader process to check if new version is higher than previous.

Any other idea how to prevent running unnecessary merge from bronze to silver if autoloader did not process new file?

check last version of CDF

 

 

from delta.tables import DeltaTable
from pyspark.sql.functions import current_timestamp

table_path = bronze_table
deltaTable = DeltaTable.forPath(spark, table_path)

last_version = str(DeltaTable.forPath(spark, table_path).history(1).head()["version"])

print(f"last table version is {last_version}")

 

 

run autoloader

 

 

from pyspark.sql.functions import current_timestamp

df = (spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "parquet")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .load(autoloader_path)
  .withColumn("inserted_datetime", current_timestamp())
)

query = df.writeStream \
  .format("delta") \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", checkpoint_path) \
  .queryName(f"query: {bronze_table}") \
  .foreachBatch(merge_bronze(args)) \
  .trigger(availableNow=True) \
  .start()
 
query.awaitTermination()

 

 

and then before merge to silver check version again 

 

 

from delta.tables import DeltaTable
from delta.tables import DeltaTable
from pyspark.sql.functions import current_timestamp

table_path = bronze_table
deltaTable = DeltaTable.forPath(spark, table_path)

new_version = str(DeltaTable.forPath(spark, table_path).history(1).head()["version"])

print(f"new table version is: {new_version}")

if new_version > last_version:
…else:
  print('there is not new version of the table')
  dbutils.notebook.exit('exit notebook')

 

 

 Thanks.

0 REPLIES 0

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group