โ12-10-2024 02:53 AM
Hi,
I'm encountering an issue in a pyspark code, where I'm calculating certain information monthly in a loop. The flow is pretty much as:
Which 75% of the time fails at some random month, and 25% it succeeds. I just cannot figure out what causes the failure at 75% of the time. When it succeeds the run takes around ~40 mins and it processes around 180 months.
Here's the error log:
Py4JJavaError: An error occurred while calling o71002.parquet.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 7856.0 failed 4 times, most recent failure: Lost task 1.3 in stage 7856.0 (TID 77757) (10.2.222.135 executor 12): org.apache.spark.SparkFileNotFoundException: Operation failed: "The specified path does not exist.", 404, GET, https://xxx.dfs.core.windows.net/teamdata/Dev/Zones/Product/IM_AWL_Products/part-00001-tid-588925637..., PathNotFound, "The specified path does not exist. RequestId:ac6649bc-201f-000b-47f0-4a9296000000 Time:2024-12-10T10:47:46.7936012Z". [DEFAULT_FILE_NOT_FOUND] It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved. If disk cache is stale or the underlying files have been removed, you can invalidate disk cache manually by restarting the cluster. SQLSTATE: 42K03
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
Thanks.
โ12-10-2024 04:19 AM
@bcsalay This could possibly be a cache issue.
Are you doing dataframe.cache() anywhere in your code? if so please follow that by using unpersist()
Also can you use the below config at cluster level
spark.databricks.io.cache.enabled false
โ12-17-2024 07:42 AM
Hi @MuthuLakshmi thank you for your response. No I don't use df.cache() anywhere in the code. Yet I tried uncaching intermediate table which is read and updated within the loop, but it didn't help:
spark.catalog.uncacheTable("IM_AWL_Products")
I don't want to disable cluster level caching because we have an entire team running their code in same cluster. So my preference is to solve this more within the code.
โ12-10-2024 11:27 AM
Can you show some code to get the gist of what the code does? Are the parquet files accessed as a catalog table? Could it be that some other job makes changes to input tables?
โ12-17-2024 08:01 AM - edited โ12-17-2024 08:02 AM
Hi @JacekLaskowski thank you for your response. No it is not a catalog table and not accessed/used by another job. I tried to explain above what the code does operationally, giving some more context: it is a development code, which processes historical data to create a certain business logic, and outputs are used to define a flag during statistical modelling, that's pretty much it. So it is not implemented anywhere yet, not a production code, just manually triggered by me or my team to create outputs.
I needed to write it with a loop because this is more convenient once this code is running in production, since business logic is built backward looking indefinitely in history, suppose a flag created in 2015 can impact next month's decision. Therefore code aggregates all historical information a row per product, and reads/updates it each month.
Hope this gives more clarity.
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโt want to miss the chance to attend and share knowledge.
If there isnโt a group near you, start one and help create a community that brings people together.
Request a New Group