Hi all, we have a issue while trying to write a quite large data frame, close to 35 million records. We try to write it as parquet and also table and none work. But writing a small chink (10k records) is working. Basically we have some text on which we apply a spacy model and after we create a new data frame.
def process_partition(iterator):
import spacy
from scispacy.umls_linking import UmlsEntityLinker
core_sci = spacy.load("en_core_sci_sm", disable=['tok2vec','tagger','parser', 'senter', 'attribute_ruler', 'lemmatizer'])
if component_name not in core_sci.pipe_names:
print("add pipe")
core_sci.add_pipe(component_name, config={"resolve_abbreviations": True, "linker_name": "umls"})
for row in iterator:
result = process_canonical_names(row.abstract, core_sci)
yield Row(pmid=row.pmid, canonical_names=result["cn"], concept_ids=result["concept_id"])
But when we run on all the data we get the next errors:
df.write.mode("overwrite").parquet(mount_dir)
org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 1.0 failed 4 times, most recent failure: Lost task 3.3 in stage 1.0 (TID 77) (10.80.246.21 executor 7): org.apache.spark.SparkException: [TASK_WRITE_FAILED] Task failed while writing rows to dbfs:/mnt/...
and as table df.coalesce(20).write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable(table_name)
org.apache.spark.SparkException: Job aborted due to stage failure: Task 11 in stage 2.0 failed 4 times, most recent failure: Lost task 11.3 in stage 2.0 (TID 49) (10.80.233.136 executor 8): org.apache.spark.SparkException: [TASK_WRITE_FAILED] Task failed while writing rows to dbfs:/user/hive/warehouse/...
We are using databricks 13.3 LTS (includes Apache Spark 3.4.1, Scala 2.12) and node type r5d.xlarge up to 10 workers also we have metastore enable and Unity Catalog.
And we are wondering if what we are doing is wrong or we are using wrong resources or is something else. Any suggestion might help. Thank you!