cancel
Showing results for 
Search instead for 
Did you mean: 
Get Started Discussions
Start your journey with Databricks by joining discussions on getting started guides, tutorials, and introductory topics. Connect with beginners and experts alike to kickstart your Databricks experience.
cancel
Showing results for 
Search instead for 
Did you mean: 

Fail to write large dataframe

Cosmin
New Contributor II

Hi all, we have a issue while trying to write a quite large data frame, close to 35 million records. We try to write it as parquet and also table and none work. But writing a small chink (10k records) is working. Basically we have some text on which we apply a spacy model and after we create a new data frame.

 

def process_partition(iterator):
    import spacy
    from scispacy.umls_linking import UmlsEntityLinker
    core_sci = spacy.load("en_core_sci_sm", disable=['tok2vec','tagger','parser', 'senter', 'attribute_ruler', 'lemmatizer'])
    if component_name not in core_sci.pipe_names:
        print("add pipe")
        core_sci.add_pipe(component_name, config={"resolve_abbreviations": True, "linker_name": "umls"})
    for row in iterator:
        result = process_canonical_names(row.abstract, core_sci)
        yield Row(pmid=row.pmid, canonical_names=result["cn"], concept_ids=result["concept_id"])

 

But when we run on all the data we get the next errors:

df.write.mode("overwrite").parquet(mount_dir)

org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 1.0 failed 4 times, most recent failure: Lost task 3.3 in stage 1.0 (TID 77) (10.80.246.21 executor 7): org.apache.spark.SparkException: [TASK_WRITE_FAILED] Task failed while writing rows to dbfs:/mnt/...

and as table df.coalesce(20).write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable(table_name)

org.apache.spark.SparkException: Job aborted due to stage failure: Task 11 in stage 2.0 failed 4 times, most recent failure: Lost task 11.3 in stage 2.0 (TID 49) (10.80.233.136 executor 8): org.apache.spark.SparkException: [TASK_WRITE_FAILED] Task failed while writing rows to dbfs:/user/hive/warehouse/...

We are using databricks 13.3 LTS (includes Apache Spark 3.4.1, Scala 2.12)  and node type r5d.xlarge up to 10 workers also we have metastore enable and Unity Catalog.

Cosmin_2-1702640369404.png

And we are wondering if what we are doing is wrong or we are using wrong resources or is something else. Any suggestion might help. Thank you!

3 REPLIES 3

-werners-
Esteemed Contributor III

The error is most likely caused by the row-per-row processing (due to the use of spacy).
Like that you bypass the parallel processing capabilities of spark.
A solution would by to not use a loop.  But in your case, using spacy, that does not seem possible ( I looked online but everybody seems to use a UDF and an iterator so that won't solve your issue).
Is it an option to use another NLP library that can run on pyspark like SparkNLP?

Cosmin
New Contributor II

Unfortunately we have to use the spacy models. Another approach that are we thinking is to deploy the models (methods that we use) as a API and do http request from spark. Can this approach work?

-werners-
Esteemed Contributor III

That could work, but you will have to create a UDF.
Check this SO topic for more info

Join 100K+ Data Experts: Register Now & Grow with Us!

Excited to expand your horizons with us? Click here to Register and begin your journey to success!

Already a member? Login and join your local regional user group! If there isn’t one near you, fill out this form and we’ll create one for you to join!