โ10-06-2021 02:29 AM
We have a streaming data written into delta. We will not write all the partitions every day. Hence i am thinking of running compact spark job, to run only on partitions that has been modified yesterday. Is it possible to query the partitionsValues written yesterday from delta log?
โ10-11-2021 09:51 AM
Hi @Gnanasoundari Soundarajanโ
Based on the details you provided, you are not overwriting all the partitions every day which means you might be using append mode while writing the data on day 1. On day 2, you want to access those partition values and run an operation on them. You can retrieve data written on day 1 by following the below steps:
## Create a temp view on table history
spark.sql(s"desc <table_name>").filter($"operation"===("APPEND")).orderBy($"timestamp".desc).createOrReplaceTempView("<tempViewName>")
## create a table on top of the view
%sql
create database if not exists test;
create table if not exists table_name_1
using delta
as select * from <tempViewName>;
## select relevant columns from the table created above
%sql
display(sql("""select job.runId runId,
operationParameters.batchId batchId,
operationMetrics.numRemovedFiles numRemovedFiles,
operationMetrics.numRemovedBytes numRemovedBytes,
operationMetrics.numAddedFiles numAddedFiles,
operationMetrics.numAddedBytes numAddedBytes,*
from table_name_1"""))
From this, you can get what version was added in the logs due to which operation (optimize, append, etc)
%python
# Query Delta table by version using versionAsOf (df = spark.read
.format("delta")
.option("versionAsOf", "5238")
.load("/path/to/my/table"))
# Query Delta table by version using @ parameter
(df = spark.read
.format("delta")
.load("/path/to/my/table@v5238"))
%sql
-- Query metastore-defined Delta table by version
SELECT COUNT(*) FROM my_table VERSION AS OF 5238
SELECT COUNT(*) FROM my_table@v5238
-- Query Delta table by file path by version
SELECT count(*) FROM delta.`/path/to/my/table@v5238`
Now you have the data corresponding to this version. You can create a view on top of this to perform an operation on it, or you can use it as described in the above code snippet.
Hope this is helpful
โ10-06-2021 04:07 AM
Hi @ soundari ! My name is Kaniz, and I'm the technical moderator here. Great to meet you, and thanks for your question! Let's see if your peers in the community have an answer to your question first. Or else I will get back to you soon. Thanks.
โ10-11-2021 09:51 AM
Hi @Gnanasoundari Soundarajanโ
Based on the details you provided, you are not overwriting all the partitions every day which means you might be using append mode while writing the data on day 1. On day 2, you want to access those partition values and run an operation on them. You can retrieve data written on day 1 by following the below steps:
## Create a temp view on table history
spark.sql(s"desc <table_name>").filter($"operation"===("APPEND")).orderBy($"timestamp".desc).createOrReplaceTempView("<tempViewName>")
## create a table on top of the view
%sql
create database if not exists test;
create table if not exists table_name_1
using delta
as select * from <tempViewName>;
## select relevant columns from the table created above
%sql
display(sql("""select job.runId runId,
operationParameters.batchId batchId,
operationMetrics.numRemovedFiles numRemovedFiles,
operationMetrics.numRemovedBytes numRemovedBytes,
operationMetrics.numAddedFiles numAddedFiles,
operationMetrics.numAddedBytes numAddedBytes,*
from table_name_1"""))
From this, you can get what version was added in the logs due to which operation (optimize, append, etc)
%python
# Query Delta table by version using versionAsOf (df = spark.read
.format("delta")
.option("versionAsOf", "5238")
.load("/path/to/my/table"))
# Query Delta table by version using @ parameter
(df = spark.read
.format("delta")
.load("/path/to/my/table@v5238"))
%sql
-- Query metastore-defined Delta table by version
SELECT COUNT(*) FROM my_table VERSION AS OF 5238
SELECT COUNT(*) FROM my_table@v5238
-- Query Delta table by file path by version
SELECT count(*) FROM delta.`/path/to/my/table@v5238`
Now you have the data corresponding to this version. You can create a view on top of this to perform an operation on it, or you can use it as described in the above code snippet.
Hope this is helpful
โ05-18-2022 02:08 PM
Hi @Gnanasoundari Soundarajanโ , Just a friendly follow-up. Do you still need help, or @Deepak Bhutadaโ 's response help you to find the solution? Please let us know.
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโt want to miss the chance to attend and share knowledge.
If there isnโt a group near you, start one and help create a community that brings people together.
Request a New Group