cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Table write command stuck "Filtering files for query."

boskicl
New Contributor III

Hello all,

Background:

I am having an issue today with databricks using pyspark-sql and writing a delta table. The dataframe is made by doing an inner join between two tables and that is the table which I am trying to write to a delta table. The table sometimes won't even do a row count (count()) but other-times for some reason it can, and the output is usually around 1.9 billion rows and it even in those cases will do a display (display() ).

Issue:

When going to write this dataframe into a delta table it kind of seems like it is getting stuck on one stage. The command I am using to write the table is as follows:

(
  df
  .write
  .format("delta")
  .mode(write_mode)
  .option("replaceWhere", "eventDate >= {} AND eventDate < {}".format(start_date_str, end_date_str))
  .option("overwriteSchema", "true")
  .partitionBy("eventDate")
  .saveAsTable("default.df")
)

The issue that arrises once it gets to one stage and says "Filtering files for query":

filteringjob_infoNow when trying to look into that particular stage (1493) , I see the following:

spill_memory 

Can anyone give me any insight on what is going on? I have run this code for over 10 hours and nothing happens, it will always be stuck on that last stage with that same message of "Filtering files for query." From my understanding databricks optimizes delta tables really well, and I even have a partition on the table which I am trying to write. I am not a memory/cache expert so I am not sure if things can't be loaded into cache quick enough for it to write. Is there any insight anyone can provide?

Info on cluster:

Databricks runtime version: 10.2 (includes Apache Spark 3.2.0, Scala 2.12)

Worker type: i3.2xlarge 61GB memory, 8 cores

Driver type: i3.4xlarge 122GB memory, 16 cores

Thank you!

1 ACCEPTED SOLUTION

Accepted Solutions

Anonymous
Not applicable

@Ljuboslav Boskic​ 

there can be multiple reasons why the query is taking more time , during this phase metadata look-up activity happens, can you please check on the below things 

  • Ensuring the tables are z-ordered properly, and that the merge key (on clause) included the z-order key and the partition key in it.
  • you can use vacuum and optimise on the source delta tables and please find the below documents 

https://docs.databricks.com/spark/latest/spark-sql/language-manual/delta-vacuum.html

https://docs.databricks.com/spark/latest/spark-sql/language-manual/delta-optimize.html

Please let us know if you still face the same issue

View solution in original post

5 REPLIES 5

Hubert-Dudek
Esteemed Contributor III

do you have there maybe unique id? so you could run easy MERGE (upsert):

MERGE INTO your_table USING your_new_view
ON your_table .uniqueId = your_new_view.uniqueId AND eventDate >= {} AND eventDate < {}
WHEN NOT MATCHED
  THEN INSERT *
WHEN MATCHED 
  THEN UPDATE *

boskicl
New Contributor III

@Hubert Dudek​ , So let me backtrack one second, when I create the inner join I also put some conditions:

data_to_save =(
        table1
       .join(table2,
       (table1.user == table2.user) 
        & (col("eventTime") >= col("tableTime"))
        & (datediff(table1.eventDate,table2.tableEventDate) <= window)
        ,"inner")
)

This is the table I want to write to a delta table.

The code snippet you put up, is this something I need to put in that write statement I had or prior in the join statement which I have pasted above?

As for your question:

Each row may not have one unique id. For example lets consider 10 rows - in those 10 rows, we may have 3 rows which have in one column called "user Id" to be the same but in another column called "eventId" be different. For me it is important to keep these rows separate due to having the same user id. I hope this answers your question.

sauri
New Contributor III

Hello,

I did 3 steps

1.- restart the cluster

2.- remove all PARQUET files that you no longer use with the VACUMM command.

example:

VACUUM my schema.table RETAIN 0 HOURS;

image3.- run the query

Anonymous
Not applicable

@Ljuboslav Boskic​ 

there can be multiple reasons why the query is taking more time , during this phase metadata look-up activity happens, can you please check on the below things 

  • Ensuring the tables are z-ordered properly, and that the merge key (on clause) included the z-order key and the partition key in it.
  • you can use vacuum and optimise on the source delta tables and please find the below documents 

https://docs.databricks.com/spark/latest/spark-sql/language-manual/delta-vacuum.html

https://docs.databricks.com/spark/latest/spark-sql/language-manual/delta-optimize.html

Please let us know if you still face the same issue

Hi @Ljuboslav Boskic​,

Just a friendly follow-up. Do you still need help or any of the responses helped to resolved your issue? please let us know.

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.