cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Identify the partitionValues written yesterday from delta

soundari
New Contributor

We have a streaming data written into delta. We will not write all the partitions every day. Hence i am thinking of running compact spark job, to run only on partitions that has been modified yesterday. Is it possible to query the partitionsValues written yesterday from delta log?

1 ACCEPTED SOLUTION

Accepted Solutions

Deepak_Bhutada
Contributor III

Hi @Gnanasoundari Soundarajanโ€‹ 

Based on the details you provided, you are not overwriting all the partitions every day which means you might be using append mode while writing the data on day 1. On day 2, you want to access those partition values and run an operation on them. You can retrieve data written on day 1 by following the below steps:

  • In delta, every time you perform any operation, transaction logs are generated and get stored in _delta_log folder. You can do a describe history on the delta table and create a temp view on top of it to access the table history using normal SQL way
## Create a temp view on table history
spark.sql(s"desc <table_name>").filter($"operation"===("APPEND")).orderBy($"timestamp".desc).createOrReplaceTempView("<tempViewName>")
 
## create a table on top of the view
%sql
create database if not exists test;
 
create table if not exists table_name_1
using delta
as select * from <tempViewName>;
 
## select relevant columns from the table created above
%sql
display(sql("""select job.runId runId,
operationParameters.batchId batchId,
operationMetrics.numRemovedFiles numRemovedFiles,
operationMetrics.numRemovedBytes numRemovedBytes,
operationMetrics.numAddedFiles numAddedFiles,
operationMetrics.numAddedBytes numAddedBytes,*
from table_name_1"""))

From this, you can get what version was added in the logs due to which operation (optimize, append, etc)

  • After you know the version that you want to use, you can time travel to that version
%python
# Query Delta table by version using versionAsOf (df = spark.read
      .format("delta")
      .option("versionAsOf", "5238")
      .load("/path/to/my/table"))
 
    # Query Delta table by version using @ parameter
    (df = spark.read
      .format("delta")
      .load("/path/to/my/table@v5238"))
 
%sql
-- Query metastore-defined Delta table by version 
SELECT COUNT(*) FROM my_table VERSION AS OF 5238 
SELECT COUNT(*) FROM my_table@v5238
 
 -- Query Delta table by file path by version
SELECT count(*) FROM delta.`/path/to/my/table@v5238`

Now you have the data corresponding to this version. You can create a view on top of this to perform an operation on it, or you can use it as described in the above code snippet.

Hope this is helpful

View solution in original post

1 REPLY 1

Deepak_Bhutada
Contributor III

Hi @Gnanasoundari Soundarajanโ€‹ 

Based on the details you provided, you are not overwriting all the partitions every day which means you might be using append mode while writing the data on day 1. On day 2, you want to access those partition values and run an operation on them. You can retrieve data written on day 1 by following the below steps:

  • In delta, every time you perform any operation, transaction logs are generated and get stored in _delta_log folder. You can do a describe history on the delta table and create a temp view on top of it to access the table history using normal SQL way
## Create a temp view on table history
spark.sql(s"desc <table_name>").filter($"operation"===("APPEND")).orderBy($"timestamp".desc).createOrReplaceTempView("<tempViewName>")
 
## create a table on top of the view
%sql
create database if not exists test;
 
create table if not exists table_name_1
using delta
as select * from <tempViewName>;
 
## select relevant columns from the table created above
%sql
display(sql("""select job.runId runId,
operationParameters.batchId batchId,
operationMetrics.numRemovedFiles numRemovedFiles,
operationMetrics.numRemovedBytes numRemovedBytes,
operationMetrics.numAddedFiles numAddedFiles,
operationMetrics.numAddedBytes numAddedBytes,*
from table_name_1"""))

From this, you can get what version was added in the logs due to which operation (optimize, append, etc)

  • After you know the version that you want to use, you can time travel to that version
%python
# Query Delta table by version using versionAsOf (df = spark.read
      .format("delta")
      .option("versionAsOf", "5238")
      .load("/path/to/my/table"))
 
    # Query Delta table by version using @ parameter
    (df = spark.read
      .format("delta")
      .load("/path/to/my/table@v5238"))
 
%sql
-- Query metastore-defined Delta table by version 
SELECT COUNT(*) FROM my_table VERSION AS OF 5238 
SELECT COUNT(*) FROM my_table@v5238
 
 -- Query Delta table by file path by version
SELECT count(*) FROM delta.`/path/to/my/table@v5238`

Now you have the data corresponding to this version. You can create a view on top of this to perform an operation on it, or you can use it as described in the above code snippet.

Hope this is helpful

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group