cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

use DeltaLog class in databricks cluster

pokus
New Contributor III

I need to use DeltaLog class in the code to get the AddFiles dataset. I have to keep the implemented code in a repo and run it in databricks cluster.

Some docs say to use org.apache.spark.sql.delta.DeltaLog class, but it seems databricks gets rid of it in runtime and i have NoClassDefFoundError: org/apache/spark/sql/delta/DeltaLog$ when run on cluster using

val files = org.apache.spark.sql.delta.DeltaLog.forTable(spark, path(db, table))
        .unsafeVolatileSnapshot
        .allFiles

as i use Provided config for "io.delta" %% "delta-core" dependency, when i try running without Provided, have the exception IllegalArgumentException: requirement failed: Config entry spark.databricks.delta.timeTravel.resolveOnIdentifier.enabled already registered!

databricks https://kb.databricks.com/en_US/sql/find-size-of-table say to use com.databricks.sql.transaction.tahoe.DeltaLog but this class is outside the io.delta package which is the cause of the compilation issue. I even can't define the jar(source of com.databricks.sql.transaction.tahoe.DeltaLog) to import it explicitly into my build. this code works in the cluster

val deltaTable = DeltaTable.forPath(spark, path)
deltaTable.getClass.getMethod("deltaLog").invoke(deltaTable)
      .asInstanceOf[com.databricks.sql.transaction.tahoe.DeltaLog]
      .snapshot
      .allFiles

but as i said i can't keep it in my code because of compilation issue

How can I use DeltaLog in my code and have the possibility to run this code on a cluster?

1 ACCEPTED SOLUTION

Accepted Solutions

pokus
New Contributor III

i was able to resolve the issue using the reflection only

val deltaTable = DeltaTable.forPath(spark, path(db, table))
val deltaLog = deltaTable.getClass.getMethod("deltaLog").invoke(deltaTable)
val snapshot = deltaLog.getClass.getMethod("unsafeVolatileSnapshot").invoke(deltaLog)
val allFiles = snapshot.getClass.getMethod("allFiles").invoke(snapshot).asInstanceOf[DataFrame]

but it's will be good to resolve the dependency issue and have the possibility to get DeltaLog using the delta api

View solution in original post

3 REPLIES 3

pokus
New Contributor III

i was able to resolve the issue using the reflection only

val deltaTable = DeltaTable.forPath(spark, path(db, table))
val deltaLog = deltaTable.getClass.getMethod("deltaLog").invoke(deltaTable)
val snapshot = deltaLog.getClass.getMethod("unsafeVolatileSnapshot").invoke(deltaLog)
val allFiles = snapshot.getClass.getMethod("allFiles").invoke(snapshot).asInstanceOf[DataFrame]

but it's will be good to resolve the dependency issue and have the possibility to get DeltaLog using the delta api

Kaniz
Community Manager
Community Manager

Hi @Artur Kiiko​, Your willingness to share your solution with fellow members demonstrates your expertise and highlights the spirit of collaboration that makes our community thrive.

Your solution has undoubtedly saved time and effort for many of our members who faced similar challenges and has inspired others to share their knowledge. Your generosity in providing insights and guidance has had a lasting impact on our community.

Again, thank you for your valuable contribution and for being a pivotal part of our community's success. We look forward to your continued engagement and sharing your expertise in the future.

dbal
New Contributor III

Thanks for providing a solution @pokus .

What I dont understand is why Databricks cannot provide the DeltaLog at runtime. How can this be the official solution? We need a better solution for this instead of depending on reflections.