cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Table write command stuck "Filtering files for query."

boskicl
New Contributor III

Hello all,

Background:

I am having an issue today with databricks using pyspark-sql and writing a delta table. The dataframe is made by doing an inner join between two tables and that is the table which I am trying to write to a delta table. The table sometimes won't even do a row count (count()) but other-times for some reason it can, and the output is usually around 1.9 billion rows and it even in those cases will do a display (display() ).

Issue:

When going to write this dataframe into a delta table it kind of seems like it is getting stuck on one stage. The command I am using to write the table is as follows:

(
  df
  .write
  .format("delta")
  .mode(write_mode)
  .option("replaceWhere", "eventDate >= {} AND eventDate < {}".format(start_date_str, end_date_str))
  .option("overwriteSchema", "true")
  .partitionBy("eventDate")
  .saveAsTable("default.df")
)

The issue that arrises once it gets to one stage and says "Filtering files for query":

filteringjob_infoNow when trying to look into that particular stage (1493) , I see the following:

spill_memory 

Can anyone give me any insight on what is going on? I have run this code for over 10 hours and nothing happens, it will always be stuck on that last stage with that same message of "Filtering files for query." From my understanding databricks optimizes delta tables really well, and I even have a partition on the table which I am trying to write. I am not a memory/cache expert so I am not sure if things can't be loaded into cache quick enough for it to write. Is there any insight anyone can provide?

Info on cluster:

Databricks runtime version: 10.2 (includes Apache Spark 3.2.0, Scala 2.12)

Worker type: i3.2xlarge 61GB memory, 8 cores

Driver type: i3.4xlarge 122GB memory, 16 cores

Thank you!

1 ACCEPTED SOLUTION

Accepted Solutions

Anonymous
Not applicable

@Ljuboslav Boskicโ€‹ 

there can be multiple reasons why the query is taking more time , during this phase metadata look-up activity happens, can you please check on the below things 

  • Ensuring the tables are z-ordered properly, and that the merge key (on clause) included the z-order key and the partition key in it.
  • you can use vacuum and optimise on the source delta tables and please find the below documents 

https://docs.databricks.com/spark/latest/spark-sql/language-manual/delta-vacuum.html

https://docs.databricks.com/spark/latest/spark-sql/language-manual/delta-optimize.html

Please let us know if you still face the same issue

View solution in original post

5 REPLIES 5

Hubert-Dudek
Esteemed Contributor III

do you have there maybe unique id? so you could run easy MERGE (upsert):

MERGE INTO your_table USING your_new_view
ON your_table .uniqueId = your_new_view.uniqueId AND eventDate >= {} AND eventDate < {}
WHEN NOT MATCHED
  THEN INSERT *
WHEN MATCHED 
  THEN UPDATE *

boskicl
New Contributor III

@Hubert Dudekโ€‹ , So let me backtrack one second, when I create the inner join I also put some conditions:

data_to_save =(
        table1
       .join(table2,
       (table1.user == table2.user) 
        & (col("eventTime") >= col("tableTime"))
        & (datediff(table1.eventDate,table2.tableEventDate) <= window)
        ,"inner")
)

This is the table I want to write to a delta table.

The code snippet you put up, is this something I need to put in that write statement I had or prior in the join statement which I have pasted above?

As for your question:

Each row may not have one unique id. For example lets consider 10 rows - in those 10 rows, we may have 3 rows which have in one column called "user Id" to be the same but in another column called "eventId" be different. For me it is important to keep these rows separate due to having the same user id. I hope this answers your question.

sauri
New Contributor III

Hello,

I did 3 steps

1.- restart the cluster

2.- remove all PARQUET files that you no longer use with the VACUMM command.

example:

VACUUM my schema.table RETAIN 0 HOURS;

image3.- run the query

Anonymous
Not applicable

@Ljuboslav Boskicโ€‹ 

there can be multiple reasons why the query is taking more time , during this phase metadata look-up activity happens, can you please check on the below things 

  • Ensuring the tables are z-ordered properly, and that the merge key (on clause) included the z-order key and the partition key in it.
  • you can use vacuum and optimise on the source delta tables and please find the below documents 

https://docs.databricks.com/spark/latest/spark-sql/language-manual/delta-vacuum.html

https://docs.databricks.com/spark/latest/spark-sql/language-manual/delta-optimize.html

Please let us know if you still face the same issue

Hi @Ljuboslav Boskicโ€‹,

Just a friendly follow-up. Do you still need help or any of the responses helped to resolved your issue? please let us know.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group