<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Can’t save results to target table – out-of-memory error in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/can-t-save-results-to-target-table-out-of-memory-error/m-p/139060#M51083</link>
    <description>&lt;DIV&gt;&lt;DIV&gt;Hi team,&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;I’m processing ~5,000 EMR notes with a Databricks notebook. The job reads from `crc_lakehouse.bronze.emr_notes`, runs SciSpaCy UMLS entity extraction plus a fine-tuned BERT sentiment model per partition, and builds a DataFrame (`df_entities`) with JSON fields for keywords, sentiment, and note sections.&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;Cluster details:&lt;/DIV&gt;&lt;DIV&gt;- Driver: 64 GB RAM, 8 cores&lt;/DIV&gt;&lt;DIV&gt;- Workers: 64 GB RAM, 8-core CPU each, autoscaling 1–4 nodes&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;Symptoms:&lt;/DIV&gt;&lt;DIV&gt;1. `df_entities.show()` works, so the transformation succeeds.&lt;/DIV&gt;&lt;DIV&gt;2. As soon as I try to `write`/save the DataFrame to another table (for example, `write.format("delta").mode("append").saveAsTable(...)`), the job fails with an OOM / executor memory error. No rows are persisted.&lt;/DIV&gt;&lt;DIV&gt;3. Only ~5k rows are processed, so I expected this to fit easily on this cluster profile.&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;What I’ve checked:&lt;/DIV&gt;&lt;DIV&gt;- Repartitioned input to 16 partitions.&lt;/DIV&gt;&lt;DIV&gt;- Verified no skew in the source table.&lt;/DIV&gt;&lt;DIV&gt;- Tried caching, disabling broadcast, lowering `show()` counts—none change the failure when writing.&lt;/DIV&gt;&lt;DIV&gt;- No custom memory configs; using defaults for this cluster size.&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;Could you help identify why the write stage is exhausting memory despite the modest dataset? Are there best practices for running SciSpaCy + transformer sentiment inside `mapPartitions` on this configuration so the output can be saved?&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;Full notebook code (`pipelines/pipelines/script_01_2025-11-14 09_22_15.py`):&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;```&lt;/DIV&gt;&lt;DIV&gt;# Databricks notebook source&lt;/DIV&gt;&lt;DIV&gt;import json&lt;/DIV&gt;&lt;DIV&gt;from pyspark.sql.functions import col, trim, length, to_date, current_date, date_sub&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;SOURCE_TABLE = "crc_lakehouse.bronze.emr_notes"&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;# Fetch only yesterday's non-empty notes&lt;/DIV&gt;&lt;DIV&gt;df_yesterday = (&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; spark.read.table(SOURCE_TABLE)&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; .select("notes_id", "contentResolved", "ingestion_time")&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; .filter(to_date(col("ingestion_time")) == date_sub(current_date(), 1))&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; .filter(col("contentResolved").isNotNull() &amp;amp; (length(trim(col("contentResolved"))) &amp;gt; 0))&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; .select("notes_id", "contentResolved")&lt;/DIV&gt;&lt;DIV&gt;)&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;# Repartition to match worker cores (8 cores = 8 partitions)&lt;/DIV&gt;&lt;DIV&gt;df_yesterday = df_yesterday.repartition(16)&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;# COMMAND ----------&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;df_yesterday.count()&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;# COMMAND ----------&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;# MAGIC %run ../helpers/umls_utils&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;# COMMAND ----------&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;# Global variables to hold models per executor&lt;/DIV&gt;&lt;DIV&gt;global_nlp = None&lt;/DIV&gt;&lt;DIV&gt;global_linker = None&lt;/DIV&gt;&lt;DIV&gt;global_tokenizer = None&lt;/DIV&gt;&lt;DIV&gt;global_sentiment_model = None&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;def get_pipeline():&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; """Load NLP and sentiment models once per executor"""&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; global global_nlp, global_linker, global_tokenizer, global_sentiment_model&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; if global_nlp is None or global_linker is None:&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; import spacy&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; from scispacy.abbreviation import AbbreviationDetector&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; import time&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; from scispacy.linking import EntityLinker&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; import torch&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; from transformers import AutoTokenizer, AutoModelForSequenceClassification&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; # Load SciSpacy NLP&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; t0 = time.time()&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; nlp = spacy.load("en_core_sci_lg")&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; if "abbreviation_detector" not in nlp.pipe_names:&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; try:&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; nlp.add_pipe("abbreviation_detector")&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; except Exception:&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; nlp.add_pipe(AbbreviationDetector(nlp))&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; if "scispacy_linker" not in nlp.pipe_names:&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; nlp.add_pipe(&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "scispacy_linker",&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; config={"resolve_abbreviations": True, "linker_name": "umls"},&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; )&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; linker = nlp.get_pipe("scispacy_linker")&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; print(f"&lt;span class="lia-unicode-emoji" title=":white_heavy_check_mark:"&gt;✅&lt;/span&gt; NLP loaded in executor in {time.time()-t0:.2f}s")&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; # Load sentiment model&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; MODEL_PATH = "../training/fine_tuned_bert_sentiment_v2"&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; tokenizer = AutoTokenizer.from_pretrained(MODEL_PATH)&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; sentiment_model = AutoModelForSequenceClassification.from_pretrained(MODEL_PATH)&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; sentiment_model.eval()&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; print("&lt;span class="lia-unicode-emoji" title=":white_heavy_check_mark:"&gt;✅&lt;/span&gt; Sentiment model loaded in executor")&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; # Save globally for this executor&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; global_nlp = nlp&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; global_linker = linker&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; global_tokenizer = tokenizer&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; global_sentiment_model = sentiment_model&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; return global_nlp, global_linker, global_tokenizer, global_sentiment_model&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;# COMMAND ----------&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;def process_partition_rows(iterator):&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; nlp, linker, tokenizer, sentiment_model = get_pipeline()&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; import torch&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; from rapidfuzz import fuzz, process&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; import time&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; MIN_SCORE = 0.80&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; ALLOWED_GROUPS = {"Disorders", "Drugs", "Anatomy", "Procedures", "Physiology"}&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; SENTIMENT_LABELS = ["negative", "neutral", "positive"]&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; def analyze_sentiment(text):&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; """Sentiment for a single text"""&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; inputs = tokenizer(&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; text, return_tensors="pt", truncation=True, max_length=128, padding=True&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; )&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; with torch.no_grad():&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; outputs = sentiment_model(**inputs)&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; probs = torch.softmax(outputs.logits, dim=1)[0]&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; pred_class = torch.argmax(probs).item()&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; return {&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "sentiment": SENTIMENT_LABELS[pred_class],&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "confidence": float(probs[pred_class]),&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "scores": {&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "negative": float(probs[0]),&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "neutral": float(probs[1]),&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "positive": float(probs[2])&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; }&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; }&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; def get_match_type_with_aliases(entity_text, concept):&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; t = (entity_text or "").strip().lower()&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; canon = (getattr(concept, "canonical_name", "") or "").strip().lower()&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; aliases = [a.strip().lower() for a in (getattr(concept, "aliases", []) or [])]&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; if t == canon:&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; return "Exact"&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; if t in aliases:&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; return "Synonym"&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; score_canon = fuzz.token_set_ratio(t, canon) if canon else 0&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; best_alias_score = 0&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; if aliases:&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; match = process.extractOne(t, aliases, scorer=fuzz.token_set_ratio)&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; best_alias_score = match[1] if match else 0&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; score = max(score_canon, best_alias_score)&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; return "Fuzzy" if score &amp;gt;= 85 else "Synonym"&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; def draftjs_to_text(draftjs_json):&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; """Convert Draft.js JSON content to plain text"""&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; if not draftjs_json:&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; return ""&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; if isinstance(draftjs_json, str):&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; try:&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; data = json.loads(draftjs_json)&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; except Exception:&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; return draftjs_json&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; else:&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; data = draftjs_json&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; blocks = data.get("blocks", [])&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; text = "\n".join(block.get("text", "") for block in blocks)&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; return text&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; for row in iterator:&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; notes_id = row.notes_id&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; original_draftjs = row.contentResolved&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; plain_text = draftjs_to_text(original_draftjs)&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; # Prepare note_section dictionary&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; note_section = {&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "original_note": original_draftjs,&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "plain_text": plain_text&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; }&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; entities_list = []&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; # UMLS entity extraction&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; if plain_text:&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; doc = nlp(plain_text)&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; all_entities = []&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; for ent in doc.ents:&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; if not ent._.kb_ents:&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; continue&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; umls_id, score = ent._.kb_ents[0]&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; if score &amp;lt; MIN_SCORE:&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; continue&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; concept = linker.kb.cui_to_entity.get(umls_id)&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; if concept is None:&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; continue&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; tui_codes = list(getattr(concept, "types", []) or [])&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; semantic_groups = format_semantic_types(tui_codes, format_type="group")&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; semantic_type_names = format_semantic_types(tui_codes, format_type="full")&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; group_set = {g.strip() for g in semantic_groups.split(",") if g.strip()}&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; if ALLOWED_GROUPS and group_set.isdisjoint(ALLOWED_GROUPS):&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; continue&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; match_type = get_match_type_with_aliases(ent.text, concept)&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; status = "allow" if match_type in ["Exact", "Synonym"] else "not-allow"&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; definition = getattr(concept, "definition", "N/A") or "N/A"&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; aliases_count = len(getattr(concept, "aliases", []))&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; all_entities.append({&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "extracted_text": ent.text,&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "entity_label": ent.label_,&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "start_char": ent.start_char,&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "end_char": ent.end_char,&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "cui": umls_id,&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "canonical_name": concept.canonical_name,&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "match_type": match_type,&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "status": status,&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "confidence_score": round(score, 4),&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "category": semantic_groups,&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "detailed_types": semantic_type_names,&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "tui_codes": ", ".join(tui_codes),&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "definition": definition[:200] if len(definition) &amp;gt; 200 else definition,&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "aliases_count": aliases_count,&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; })&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; # Deduplicate and count mentions&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; text_counts = {}&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; for e in all_entities:&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; t = e["extracted_text"].lower().strip()&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; text_counts[t] = text_counts.get(t, 0) + 1&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; seen = set()&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; for e in all_entities:&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; t = e["extracted_text"].lower().strip()&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; if t not in seen:&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; seen.add(t)&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; e["mention_count"] = text_counts[t]&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; entities_list.append(e)&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; # Sentiment analysis&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; sentiment_result = analyze_sentiment(plain_text) if plain_text else None&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; # Yield all four fields&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; yield (notes_id, entities_list, sentiment_result, note_section)&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;# COMMAND ----------&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;import json&lt;/DIV&gt;&lt;DIV&gt;from pyspark.sql import Row&lt;/DIV&gt;&lt;DIV&gt;from pyspark.sql.functions import current_timestamp&lt;/DIV&gt;&lt;DIV&gt;from pyspark.sql.types import StructType, StructField, StringType, TimestampType&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;# Call the partition processor to create the RDD&lt;/DIV&gt;&lt;DIV&gt;processed_rdd = df_yesterday.rdd.mapPartitions(process_partition_rows)&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;# Convert RDD to DataFrame with JSON fields&lt;/DIV&gt;&lt;DIV&gt;df_entities = processed_rdd.map(lambda x: Row(&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; note_id=x[0] if x[0] is not None else "",&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; keywords_extracted=json.dumps(x[1]) if x[1] else "[]",&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; sentiment_analysis=json.dumps(x[2]) if x[2] else "{}",&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; note_section=json.dumps(x[3]) if x[3] else "{}"&lt;/DIV&gt;&lt;DIV&gt;)).toDF()&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;# Define schema explicitly&lt;/DIV&gt;&lt;DIV&gt;schema = StructType([&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; StructField("note_id", StringType(), True),&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; StructField("keywords_extracted", StringType(), True),&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; StructField("sentiment_analysis", StringType(), True),&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; StructField("note_section", StringType(), True)&lt;/DIV&gt;&lt;DIV&gt;])&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;df_entities = spark.createDataFrame(df_entities.rdd, schema)&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;# Add processing timestamp&lt;/DIV&gt;&lt;DIV&gt;df_entities = df_entities.withColumn("processed_timestamp", current_timestamp())&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;# Show the DataFrame instead of saving&lt;/DIV&gt;&lt;DIV&gt;# df_entities.show(truncate=False) # Set truncate=False to see full content&lt;/DIV&gt;&lt;DIV&gt;df_entities.write.mode("overwrite").saveAsTable(&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; "crc_lakehouse.silver.notes_nlp_processed"&lt;/DIV&gt;&lt;DIV&gt;)&lt;/DIV&gt;&lt;DIV&gt;```&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;Full error trace:&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;```&lt;/DIV&gt;&lt;DIV&gt;Py4JJavaError: An error occurred while calling o625.saveAsTable.&lt;/DIV&gt;&lt;DIV&gt;: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 20.0 failed 4 times, most recent failure: Lost task 3.3 in stage 20.0 (TID 123) (10.139.64.8 executor 5): org.apache.spark.api.python.PythonException: Traceback (most recent call last):&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; File "/databricks/spark/python/pyspark/worker.py", line 1980, in main&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; process()&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; File "/databricks/spark/python/pyspark/worker.py", line 1972, in process&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; serializer.dump_stream(out_iter, outfile)&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; File "/databricks/spark/python/pyspark/serializers.py", line 356, in dump_stream&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; vs = list(itertools.islice(iterator, batch))&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; File "/root/.ipykernel/2187/command-4748090034621861-3307002280", line 2, in process_partition_rows&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; File "/root/.ipykernel/2187/command-4748090034621856-3519326438", line 30, in get_pipeline&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; File "/local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.11/site-packages/spacy/language.py", line 821, in add_pipe&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; pipe_component = self.create_pipe(&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;^^^^^^^^^^^^^^^^^&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; File "/local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.11/site-packages/spacy/language.py", line 709, in create_pipe&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; resolved = registry.resolve(cfg, validate=validate)&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; File "/databricks/python/lib/python3.11/site-packages/confection/__init__.py", line 759, in resolve&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; resolved, _ = cls._make(&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; ^^^^^^^^^^&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; File "/databricks/python/lib/python3.11/site-packages/confection/__init__.py", line 808, in _make&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; filled, _, resolved = cls._fill(&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; ^^^^^^^^^^&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; File "/databricks/python/lib/python3.11/site-packages/confection/__init__.py", line 880, in _fill&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; getter_result = getter(*args, **kwargs)&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; ^^^^^^^^^^^^^^^^^^^^^^^&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; File "/local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.11/site-packages/scispacy/linking.py", line 85, in __init__&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; self.candidate_generator = candidate_generator or CandidateGenerator(&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; ^^^^^^^^^^^^^^^^^^^&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; File "/local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.11/site-packages/scispacy/candidate_generation.py", line 221, in __init__&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; self.ann_index = ann_index or load_approximate_nearest_neighbours_index(&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; File "/local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.11/site-packages/scispacy/candidate_generation.py", line 141, in load_approximate_nearest_neighbours_index&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; ann_index.loadIndex(cached_path(linker_paths.ann_index))&lt;/DIV&gt;&lt;DIV&gt;RuntimeError: basic_ios::clear: iostream error&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:604)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:1063)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:1048)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:558)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;at scala.collection.Iterator$$anon$11 hasNext(Iterator.scala:491)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;... (full Spark stack trace continues)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;Driver stacktrace:&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;at org.apache.spark.scheduler.DAGScheduler.$anonfun$failJobAndIndependentStages$1(DAGScheduler.scala:4043)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;... (truncated for brevity) ...&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; File "/databricks/spark/python/pyspark/worker.py", line 1980, in main&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; process()&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; File "/databricks/spark/python/pyspark/worker.py", line 1972, in process&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; serializer.dump_stream(out_iter, outfile)&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; File "/databricks/spark/python/pyspark/serializers.py", line 356, in dump_stream&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; vs = list(itertools.islice(iterator, batch))&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; File "/root/.ipykernel/2187/command-4748090034621861-3307002280", line 2, in process_partition_rows&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; File "/root/.ipykernel/2187/command-4748090034621856-3519326438", line 30, in get_pipeline&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; File "/local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.11/site-packages/spacy/language.py", line 821, in add_pipe&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; pipe_component = self.create_pipe(&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;^^^^^^^^^^^^^^^^^&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; File "/local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.11/site-packages/spacy/language.py", line 709, in create_pipe&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; resolved = registry.resolve(cfg, validate=validate)&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; File "/databricks/python/lib/python3.11/site-packages/confection/__init__.py", line 759, in resolve&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; resolved, _ = cls._make(&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; ^^^^^^^^^^&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; File "/databricks/python/lib/python3.11/site-packages/confection/__init__.py", line 808, in _make&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; filled, _, resolved = cls._fill(&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; ^^^^^^^^^^&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; File "/databricks/python/lib/python3.11/site-packages/confection/__init__.py", line 880, in _fill&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; getter_result = getter(*args, **kwargs)&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; ^^^^^^^^^^^^^^^^^^^^^^^&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; File "/local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.11/site-packages/scispacy/linking.py", line 85, in __init__&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; self.candidate_generator = candidate_generator or CandidateGenerator(&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; ^^^^^^^^^^^^^^^^^^^&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; File "/local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.11/site-packages/scispacy/candidate_generation.py", line 221, in __init__&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; self.ann_index = ann_index or load_approximate_nearest_neighbours_index(&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; File "/local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.11/site-packages/scispacy/candidate_generation.py", line 141, in load_approximate_nearest_neighbours_index&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; ann_index.loadIndex(cached_path(linker_paths.ann_index))&lt;/DIV&gt;&lt;DIV&gt;RuntimeError: basic_ios::clear: iostream error&lt;/DIV&gt;&lt;DIV&gt;```&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;Any guidance would be appreciated.&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;/DIV&gt;</description>
    <pubDate>Fri, 14 Nov 2025 10:57:12 GMT</pubDate>
    <dc:creator>Techtic_kush</dc:creator>
    <dc:date>2025-11-14T10:57:12Z</dc:date>
    <item>
      <title>Can’t save results to target table – out-of-memory error</title>
      <link>https://community.databricks.com/t5/data-engineering/can-t-save-results-to-target-table-out-of-memory-error/m-p/139060#M51083</link>
      <description>&lt;DIV&gt;&lt;DIV&gt;Hi team,&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;I’m processing ~5,000 EMR notes with a Databricks notebook. The job reads from `crc_lakehouse.bronze.emr_notes`, runs SciSpaCy UMLS entity extraction plus a fine-tuned BERT sentiment model per partition, and builds a DataFrame (`df_entities`) with JSON fields for keywords, sentiment, and note sections.&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;Cluster details:&lt;/DIV&gt;&lt;DIV&gt;- Driver: 64 GB RAM, 8 cores&lt;/DIV&gt;&lt;DIV&gt;- Workers: 64 GB RAM, 8-core CPU each, autoscaling 1–4 nodes&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;Symptoms:&lt;/DIV&gt;&lt;DIV&gt;1. `df_entities.show()` works, so the transformation succeeds.&lt;/DIV&gt;&lt;DIV&gt;2. As soon as I try to `write`/save the DataFrame to another table (for example, `write.format("delta").mode("append").saveAsTable(...)`), the job fails with an OOM / executor memory error. No rows are persisted.&lt;/DIV&gt;&lt;DIV&gt;3. Only ~5k rows are processed, so I expected this to fit easily on this cluster profile.&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;What I’ve checked:&lt;/DIV&gt;&lt;DIV&gt;- Repartitioned input to 16 partitions.&lt;/DIV&gt;&lt;DIV&gt;- Verified no skew in the source table.&lt;/DIV&gt;&lt;DIV&gt;- Tried caching, disabling broadcast, lowering `show()` counts—none change the failure when writing.&lt;/DIV&gt;&lt;DIV&gt;- No custom memory configs; using defaults for this cluster size.&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;Could you help identify why the write stage is exhausting memory despite the modest dataset? Are there best practices for running SciSpaCy + transformer sentiment inside `mapPartitions` on this configuration so the output can be saved?&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;Full notebook code (`pipelines/pipelines/script_01_2025-11-14 09_22_15.py`):&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;```&lt;/DIV&gt;&lt;DIV&gt;# Databricks notebook source&lt;/DIV&gt;&lt;DIV&gt;import json&lt;/DIV&gt;&lt;DIV&gt;from pyspark.sql.functions import col, trim, length, to_date, current_date, date_sub&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;SOURCE_TABLE = "crc_lakehouse.bronze.emr_notes"&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;# Fetch only yesterday's non-empty notes&lt;/DIV&gt;&lt;DIV&gt;df_yesterday = (&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; spark.read.table(SOURCE_TABLE)&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; .select("notes_id", "contentResolved", "ingestion_time")&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; .filter(to_date(col("ingestion_time")) == date_sub(current_date(), 1))&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; .filter(col("contentResolved").isNotNull() &amp;amp; (length(trim(col("contentResolved"))) &amp;gt; 0))&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; .select("notes_id", "contentResolved")&lt;/DIV&gt;&lt;DIV&gt;)&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;# Repartition to match worker cores (8 cores = 8 partitions)&lt;/DIV&gt;&lt;DIV&gt;df_yesterday = df_yesterday.repartition(16)&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;# COMMAND ----------&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;df_yesterday.count()&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;# COMMAND ----------&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;# MAGIC %run ../helpers/umls_utils&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;# COMMAND ----------&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;# Global variables to hold models per executor&lt;/DIV&gt;&lt;DIV&gt;global_nlp = None&lt;/DIV&gt;&lt;DIV&gt;global_linker = None&lt;/DIV&gt;&lt;DIV&gt;global_tokenizer = None&lt;/DIV&gt;&lt;DIV&gt;global_sentiment_model = None&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;def get_pipeline():&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; """Load NLP and sentiment models once per executor"""&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; global global_nlp, global_linker, global_tokenizer, global_sentiment_model&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; if global_nlp is None or global_linker is None:&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; import spacy&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; from scispacy.abbreviation import AbbreviationDetector&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; import time&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; from scispacy.linking import EntityLinker&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; import torch&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; from transformers import AutoTokenizer, AutoModelForSequenceClassification&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; # Load SciSpacy NLP&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; t0 = time.time()&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; nlp = spacy.load("en_core_sci_lg")&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; if "abbreviation_detector" not in nlp.pipe_names:&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; try:&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; nlp.add_pipe("abbreviation_detector")&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; except Exception:&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; nlp.add_pipe(AbbreviationDetector(nlp))&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; if "scispacy_linker" not in nlp.pipe_names:&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; nlp.add_pipe(&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "scispacy_linker",&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; config={"resolve_abbreviations": True, "linker_name": "umls"},&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; )&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; linker = nlp.get_pipe("scispacy_linker")&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; print(f"&lt;span class="lia-unicode-emoji" title=":white_heavy_check_mark:"&gt;✅&lt;/span&gt; NLP loaded in executor in {time.time()-t0:.2f}s")&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; # Load sentiment model&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; MODEL_PATH = "../training/fine_tuned_bert_sentiment_v2"&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; tokenizer = AutoTokenizer.from_pretrained(MODEL_PATH)&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; sentiment_model = AutoModelForSequenceClassification.from_pretrained(MODEL_PATH)&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; sentiment_model.eval()&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; print("&lt;span class="lia-unicode-emoji" title=":white_heavy_check_mark:"&gt;✅&lt;/span&gt; Sentiment model loaded in executor")&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; # Save globally for this executor&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; global_nlp = nlp&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; global_linker = linker&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; global_tokenizer = tokenizer&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; global_sentiment_model = sentiment_model&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; return global_nlp, global_linker, global_tokenizer, global_sentiment_model&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;# COMMAND ----------&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;def process_partition_rows(iterator):&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; nlp, linker, tokenizer, sentiment_model = get_pipeline()&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; import torch&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; from rapidfuzz import fuzz, process&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; import time&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; MIN_SCORE = 0.80&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; ALLOWED_GROUPS = {"Disorders", "Drugs", "Anatomy", "Procedures", "Physiology"}&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; SENTIMENT_LABELS = ["negative", "neutral", "positive"]&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; def analyze_sentiment(text):&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; """Sentiment for a single text"""&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; inputs = tokenizer(&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; text, return_tensors="pt", truncation=True, max_length=128, padding=True&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; )&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; with torch.no_grad():&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; outputs = sentiment_model(**inputs)&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; probs = torch.softmax(outputs.logits, dim=1)[0]&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; pred_class = torch.argmax(probs).item()&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; return {&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "sentiment": SENTIMENT_LABELS[pred_class],&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "confidence": float(probs[pred_class]),&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "scores": {&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "negative": float(probs[0]),&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "neutral": float(probs[1]),&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "positive": float(probs[2])&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; }&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; }&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; def get_match_type_with_aliases(entity_text, concept):&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; t = (entity_text or "").strip().lower()&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; canon = (getattr(concept, "canonical_name", "") or "").strip().lower()&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; aliases = [a.strip().lower() for a in (getattr(concept, "aliases", []) or [])]&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; if t == canon:&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; return "Exact"&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; if t in aliases:&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; return "Synonym"&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; score_canon = fuzz.token_set_ratio(t, canon) if canon else 0&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; best_alias_score = 0&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; if aliases:&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; match = process.extractOne(t, aliases, scorer=fuzz.token_set_ratio)&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; best_alias_score = match[1] if match else 0&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; score = max(score_canon, best_alias_score)&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; return "Fuzzy" if score &amp;gt;= 85 else "Synonym"&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; def draftjs_to_text(draftjs_json):&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; """Convert Draft.js JSON content to plain text"""&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; if not draftjs_json:&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; return ""&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; if isinstance(draftjs_json, str):&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; try:&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; data = json.loads(draftjs_json)&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; except Exception:&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; return draftjs_json&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; else:&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; data = draftjs_json&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; blocks = data.get("blocks", [])&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; text = "\n".join(block.get("text", "") for block in blocks)&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; return text&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; for row in iterator:&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; notes_id = row.notes_id&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; original_draftjs = row.contentResolved&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; plain_text = draftjs_to_text(original_draftjs)&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; # Prepare note_section dictionary&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; note_section = {&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "original_note": original_draftjs,&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "plain_text": plain_text&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; }&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; entities_list = []&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; # UMLS entity extraction&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; if plain_text:&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; doc = nlp(plain_text)&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; all_entities = []&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; for ent in doc.ents:&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; if not ent._.kb_ents:&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; continue&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; umls_id, score = ent._.kb_ents[0]&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; if score &amp;lt; MIN_SCORE:&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; continue&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; concept = linker.kb.cui_to_entity.get(umls_id)&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; if concept is None:&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; continue&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; tui_codes = list(getattr(concept, "types", []) or [])&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; semantic_groups = format_semantic_types(tui_codes, format_type="group")&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; semantic_type_names = format_semantic_types(tui_codes, format_type="full")&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; group_set = {g.strip() for g in semantic_groups.split(",") if g.strip()}&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; if ALLOWED_GROUPS and group_set.isdisjoint(ALLOWED_GROUPS):&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; continue&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; match_type = get_match_type_with_aliases(ent.text, concept)&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; status = "allow" if match_type in ["Exact", "Synonym"] else "not-allow"&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; definition = getattr(concept, "definition", "N/A") or "N/A"&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; aliases_count = len(getattr(concept, "aliases", []))&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; all_entities.append({&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "extracted_text": ent.text,&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "entity_label": ent.label_,&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "start_char": ent.start_char,&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "end_char": ent.end_char,&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "cui": umls_id,&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "canonical_name": concept.canonical_name,&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "match_type": match_type,&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "status": status,&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "confidence_score": round(score, 4),&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "category": semantic_groups,&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "detailed_types": semantic_type_names,&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "tui_codes": ", ".join(tui_codes),&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "definition": definition[:200] if len(definition) &amp;gt; 200 else definition,&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; "aliases_count": aliases_count,&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; })&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; # Deduplicate and count mentions&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; text_counts = {}&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; for e in all_entities:&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; t = e["extracted_text"].lower().strip()&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; text_counts[t] = text_counts.get(t, 0) + 1&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; seen = set()&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; for e in all_entities:&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; t = e["extracted_text"].lower().strip()&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; if t not in seen:&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; seen.add(t)&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; e["mention_count"] = text_counts[t]&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; entities_list.append(e)&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; # Sentiment analysis&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; sentiment_result = analyze_sentiment(plain_text) if plain_text else None&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; # Yield all four fields&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; yield (notes_id, entities_list, sentiment_result, note_section)&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;# COMMAND ----------&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;import json&lt;/DIV&gt;&lt;DIV&gt;from pyspark.sql import Row&lt;/DIV&gt;&lt;DIV&gt;from pyspark.sql.functions import current_timestamp&lt;/DIV&gt;&lt;DIV&gt;from pyspark.sql.types import StructType, StructField, StringType, TimestampType&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;# Call the partition processor to create the RDD&lt;/DIV&gt;&lt;DIV&gt;processed_rdd = df_yesterday.rdd.mapPartitions(process_partition_rows)&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;# Convert RDD to DataFrame with JSON fields&lt;/DIV&gt;&lt;DIV&gt;df_entities = processed_rdd.map(lambda x: Row(&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; note_id=x[0] if x[0] is not None else "",&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; keywords_extracted=json.dumps(x[1]) if x[1] else "[]",&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; sentiment_analysis=json.dumps(x[2]) if x[2] else "{}",&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; note_section=json.dumps(x[3]) if x[3] else "{}"&lt;/DIV&gt;&lt;DIV&gt;)).toDF()&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;# Define schema explicitly&lt;/DIV&gt;&lt;DIV&gt;schema = StructType([&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; StructField("note_id", StringType(), True),&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; StructField("keywords_extracted", StringType(), True),&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; StructField("sentiment_analysis", StringType(), True),&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; StructField("note_section", StringType(), True)&lt;/DIV&gt;&lt;DIV&gt;])&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;df_entities = spark.createDataFrame(df_entities.rdd, schema)&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;# Add processing timestamp&lt;/DIV&gt;&lt;DIV&gt;df_entities = df_entities.withColumn("processed_timestamp", current_timestamp())&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;# Show the DataFrame instead of saving&lt;/DIV&gt;&lt;DIV&gt;# df_entities.show(truncate=False) # Set truncate=False to see full content&lt;/DIV&gt;&lt;DIV&gt;df_entities.write.mode("overwrite").saveAsTable(&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; "crc_lakehouse.silver.notes_nlp_processed"&lt;/DIV&gt;&lt;DIV&gt;)&lt;/DIV&gt;&lt;DIV&gt;```&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;Full error trace:&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;```&lt;/DIV&gt;&lt;DIV&gt;Py4JJavaError: An error occurred while calling o625.saveAsTable.&lt;/DIV&gt;&lt;DIV&gt;: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 20.0 failed 4 times, most recent failure: Lost task 3.3 in stage 20.0 (TID 123) (10.139.64.8 executor 5): org.apache.spark.api.python.PythonException: Traceback (most recent call last):&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; File "/databricks/spark/python/pyspark/worker.py", line 1980, in main&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; process()&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; File "/databricks/spark/python/pyspark/worker.py", line 1972, in process&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; serializer.dump_stream(out_iter, outfile)&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; File "/databricks/spark/python/pyspark/serializers.py", line 356, in dump_stream&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; vs = list(itertools.islice(iterator, batch))&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; File "/root/.ipykernel/2187/command-4748090034621861-3307002280", line 2, in process_partition_rows&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; File "/root/.ipykernel/2187/command-4748090034621856-3519326438", line 30, in get_pipeline&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; File "/local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.11/site-packages/spacy/language.py", line 821, in add_pipe&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; pipe_component = self.create_pipe(&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;^^^^^^^^^^^^^^^^^&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; File "/local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.11/site-packages/spacy/language.py", line 709, in create_pipe&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; resolved = registry.resolve(cfg, validate=validate)&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; File "/databricks/python/lib/python3.11/site-packages/confection/__init__.py", line 759, in resolve&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; resolved, _ = cls._make(&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; ^^^^^^^^^^&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; File "/databricks/python/lib/python3.11/site-packages/confection/__init__.py", line 808, in _make&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; filled, _, resolved = cls._fill(&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; ^^^^^^^^^^&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; File "/databricks/python/lib/python3.11/site-packages/confection/__init__.py", line 880, in _fill&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; getter_result = getter(*args, **kwargs)&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; ^^^^^^^^^^^^^^^^^^^^^^^&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; File "/local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.11/site-packages/scispacy/linking.py", line 85, in __init__&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; self.candidate_generator = candidate_generator or CandidateGenerator(&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; ^^^^^^^^^^^^^^^^^^^&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; File "/local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.11/site-packages/scispacy/candidate_generation.py", line 221, in __init__&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; self.ann_index = ann_index or load_approximate_nearest_neighbours_index(&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; File "/local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.11/site-packages/scispacy/candidate_generation.py", line 141, in load_approximate_nearest_neighbours_index&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; ann_index.loadIndex(cached_path(linker_paths.ann_index))&lt;/DIV&gt;&lt;DIV&gt;RuntimeError: basic_ios::clear: iostream error&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:604)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:1063)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:1048)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:558)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;at scala.collection.Iterator$$anon$11 hasNext(Iterator.scala:491)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;... (full Spark stack trace continues)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;Driver stacktrace:&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;at org.apache.spark.scheduler.DAGScheduler.$anonfun$failJobAndIndependentStages$1(DAGScheduler.scala:4043)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;... (truncated for brevity) ...&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; File "/databricks/spark/python/pyspark/worker.py", line 1980, in main&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; process()&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; File "/databricks/spark/python/pyspark/worker.py", line 1972, in process&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; serializer.dump_stream(out_iter, outfile)&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; File "/databricks/spark/python/pyspark/serializers.py", line 356, in dump_stream&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; vs = list(itertools.islice(iterator, batch))&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; File "/root/.ipykernel/2187/command-4748090034621861-3307002280", line 2, in process_partition_rows&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; File "/root/.ipykernel/2187/command-4748090034621856-3519326438", line 30, in get_pipeline&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; File "/local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.11/site-packages/spacy/language.py", line 821, in add_pipe&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; pipe_component = self.create_pipe(&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;^^^^^^^^^^^^^^^^^&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; File "/local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.11/site-packages/spacy/language.py", line 709, in create_pipe&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; resolved = registry.resolve(cfg, validate=validate)&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; File "/databricks/python/lib/python3.11/site-packages/confection/__init__.py", line 759, in resolve&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; resolved, _ = cls._make(&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; ^^^^^^^^^^&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; File "/databricks/python/lib/python3.11/site-packages/confection/__init__.py", line 808, in _make&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; filled, _, resolved = cls._fill(&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; ^^^^^^^^^^&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; File "/databricks/python/lib/python3.11/site-packages/confection/__init__.py", line 880, in _fill&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; getter_result = getter(*args, **kwargs)&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; ^^^^^^^^^^^^^^^^^^^^^^^&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; File "/local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.11/site-packages/scispacy/linking.py", line 85, in __init__&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; self.candidate_generator = candidate_generator or CandidateGenerator(&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; ^^^^^^^^^^^^^^^^^^^&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; File "/local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.11/site-packages/scispacy/candidate_generation.py", line 221, in __init__&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; self.ann_index = ann_index or load_approximate_nearest_neighbours_index(&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; File "/local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.11/site-packages/scispacy/candidate_generation.py", line 141, in load_approximate_nearest_neighbours_index&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp; &amp;nbsp; ann_index.loadIndex(cached_path(linker_paths.ann_index))&lt;/DIV&gt;&lt;DIV&gt;RuntimeError: basic_ios::clear: iostream error&lt;/DIV&gt;&lt;DIV&gt;```&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;Any guidance would be appreciated.&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;/DIV&gt;</description>
      <pubDate>Fri, 14 Nov 2025 10:57:12 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/can-t-save-results-to-target-table-out-of-memory-error/m-p/139060#M51083</guid>
      <dc:creator>Techtic_kush</dc:creator>
      <dc:date>2025-11-14T10:57:12Z</dc:date>
    </item>
    <item>
      <title>Re: Can’t save results to target table – out-of-memory error</title>
      <link>https://community.databricks.com/t5/data-engineering/can-t-save-results-to-target-table-out-of-memory-error/m-p/139404#M51192</link>
      <description>&lt;P&gt;You’re right that the behaviour is weird at first glance (“5k rows on a 64 GB cluster and I blow up on write”), but your stack trace is actually very revealing: this isn’t a classic Delta write / shuffle OOM – it’s SciSpaCy/UMLS falling over when loading its ANN index on the executors.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;This is happening inside get_pipeline() when you construct the EntityLinker. That’s where SciSpaCy loads the big UMLS ANN index from disk (via cached_path).&lt;/P&gt;&lt;P&gt;Some points to consider:&lt;/P&gt;&lt;UL&gt;&lt;LI&gt;&lt;P&gt;df_entities.show() only triggers &lt;STRONG&gt;enough partitions to show the first N rows&lt;/STRONG&gt; (by default 20), so maybe 1–2 Python workers actually run get_pipeline() and load the index.&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P&gt;df_entities.write... needs to process &lt;STRONG&gt;all partitions&lt;/STRONG&gt;, so &lt;STRONG&gt;more Python worker processes&lt;/STRONG&gt; spin up across your autoscaling cluster.&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P&gt;Each new Python worker sees global_nlp is None and &lt;STRONG&gt;tries to load SciSpaCy + UMLS index again&lt;/STRONG&gt;.&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P&gt;The ANN index for UMLS is big (multi-GB). Multiple concurrent loads → heavy disk and memory pressure and/or a partially read index file → basic_ios::clear: iostream error.&lt;/P&gt;&lt;/LI&gt;&lt;/UL&gt;&lt;P&gt;So the “OOM on write” is a side effect of &lt;STRONG&gt;many workers loading huge models and a huge index at once&lt;/STRONG&gt;, not the Delta write itself.&lt;/P&gt;&lt;P&gt;Can you please try the following?&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;OL&gt;&lt;LI&gt;&lt;P&gt;&lt;STRONG&gt;Stop autoscaling&lt;/STRONG&gt; and run a single worker (or even single-node cluster):&lt;/P&gt;&lt;UL&gt;&lt;LI&gt;&lt;P&gt;Driver: 64 GB, 8 cores&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P&gt;Workers: 0–1 (or use a single-node cluster with driver only)&lt;/P&gt;&lt;/LI&gt;&lt;/UL&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P&gt;Force fewer Python workers by lowering partitions (just 1 or max 2)&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;Rerun.&amp;nbsp;&lt;/LI&gt;&lt;/OL&gt;&lt;P&gt;I have some other ideas but let's first see this one.&lt;/P&gt;</description>
      <pubDate>Mon, 17 Nov 2025 17:05:47 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/can-t-save-results-to-target-table-out-of-memory-error/m-p/139404#M51192</guid>
      <dc:creator>bianca_unifeye</dc:creator>
      <dc:date>2025-11-17T17:05:47Z</dc:date>
    </item>
    <item>
      <title>Re: Can’t save results to target table – out-of-memory error</title>
      <link>https://community.databricks.com/t5/data-engineering/can-t-save-results-to-target-table-out-of-memory-error/m-p/139500#M51216</link>
      <description>&lt;DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;Thank you so much for your detailed diagnosis! Your solution worked perfectly.&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;Results:&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;-&lt;/SPAN&gt;&lt;SPAN&gt; Cluster config: 1 driver (64GB/8-core) + 1 worker (64GB/8-core), autoscaling OFF&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;-&lt;/SPAN&gt;&lt;SPAN&gt; Test data: 1,111 notes from yesterday&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;-&lt;/SPAN&gt;&lt;SPAN&gt; Processing time: 16 minutes&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;-&lt;/SPAN&gt;&lt;SPAN&gt; Status: Complete success - no crashes, all data processed and saved&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;Your diagnosis about concurrent UMLS index loading was spot-on. The repartition(1) combined with a single worker completely resolved the issue.&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;---&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;Since I'm new to Databricks, I'd really appreciate if you could review my implementation and suggest any optimizations. Here's my working code:&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;---&lt;/SPAN&gt;&lt;/DIV&gt;&lt;P&gt;# Databricks notebook source&lt;BR /&gt;# MAGIC %run ../helpers/clone_nlp_service&lt;/P&gt;&lt;P&gt;# COMMAND ----------&lt;/P&gt;&lt;P&gt;# MAGIC %run ../helpers/sentiment_analysis_service&lt;/P&gt;&lt;P&gt;# COMMAND ----------&lt;/P&gt;&lt;P&gt;# COMMAND ----------&lt;/P&gt;&lt;P&gt;SOURCE_TABLE = "crc_lakehouse.bronze.emr_notes"&lt;BR /&gt;DESTINATION_TABLE = "crc_lakehouse.silver.notes_nlp_processed"&lt;BR /&gt;SPECIAL_KEYWORD_TABLE = "crc_lakehouse.silver.notes_keyword_monitoring"&lt;/P&gt;&lt;P&gt;DAILY_LIMIT = 5000&lt;BR /&gt;BATCH_SIZE = 100&lt;/P&gt;&lt;P&gt;CLIENT_SPECIAL_KEYWORDS = [&lt;BR /&gt;"Spondylopathy", "Malnutrition", "Morbid obesity", "Lying flat",&lt;BR /&gt;"Orthopnea", "Wasting", "Atrophy", "Respiratory failure",&lt;BR /&gt;"Dysphagia following CVA", "Hemiparesis", "Hemiplegia",&lt;BR /&gt;"Difficulty swallowing", "Hydration", "Parkinson's", "COPD",&lt;BR /&gt;"Asthma", "Sepsis", "Septicemia", "IVF", "Transplant",&lt;BR /&gt;"Immune deficiency", "Endocarditis", "Interstitial lung",&lt;BR /&gt;"Pulmonary fibrosis", "Cystic fibrosis", "Narcolepsy",&lt;BR /&gt;"Osteomyelitis", "Chronic pancreatitis", "Diabetic retinopathy",&lt;BR /&gt;"Myelodysplastic syndrome", "pocketing food", "drooling",&lt;BR /&gt;"cough with liquid", "coughing with liquid", "painful swallow",&lt;BR /&gt;"pain with swallow"&lt;BR /&gt;]&lt;/P&gt;&lt;P&gt;# COMMAND ----------&lt;/P&gt;&lt;P&gt;# COMMAND ----------&lt;/P&gt;&lt;P&gt;from pyspark.sql import functions as F&lt;BR /&gt;from pyspark.sql.types import *&lt;BR /&gt;from datetime import datetime, timedelta&lt;BR /&gt;import json&lt;BR /&gt;import pandas as pd&lt;BR /&gt;import gc&lt;/P&gt;&lt;P&gt;def extract_text_from_draftjs(content_json):&lt;BR /&gt;try:&lt;BR /&gt;if not content_json:&lt;BR /&gt;return None&lt;BR /&gt;data = json.loads(content_json)&lt;BR /&gt;blocks = data.get("blocks", [])&lt;BR /&gt;text = "\n".join(&lt;BR /&gt;block.get("text", "").strip()&lt;BR /&gt;for block in blocks&lt;BR /&gt;if block.get("text", "").strip()&lt;BR /&gt;)&lt;BR /&gt;return text if text else None&lt;BR /&gt;except:&lt;BR /&gt;return None&lt;/P&gt;&lt;P&gt;def check_client_keywords(entities, plain_text, note_id):&lt;BR /&gt;keyword_matches = []&lt;BR /&gt;for keyword in CLIENT_SPECIAL_KEYWORDS:&lt;BR /&gt;keyword_lower = keyword.lower()&lt;BR /&gt;for entity in entities:&lt;BR /&gt;canonical = (entity.get("canonical_name") or "").lower()&lt;BR /&gt;extracted = (entity.get("extracted_text") or "").lower()&lt;BR /&gt;if keyword_lower in canonical or keyword_lower in extracted:&lt;BR /&gt;start = entity.get("start_char", 0)&lt;BR /&gt;end = entity.get("end_char", 0)&lt;BR /&gt;context_start = max(0, start - 50)&lt;BR /&gt;context_end = min(len(plain_text), end + 50)&lt;BR /&gt;keyword_matches.append({&lt;BR /&gt;"note_id": note_id,&lt;BR /&gt;"keyword_target": keyword,&lt;BR /&gt;"match_found": True,&lt;BR /&gt;"matched_term": entity.get("extracted_text"),&lt;BR /&gt;"match_type": entity.get("match_type"),&lt;BR /&gt;"confidence_score": float(entity.get("confidence_score")) if entity.get("confidence_score") else None,&lt;BR /&gt;"cui": entity.get("cui"),&lt;BR /&gt;"canonical_name": entity.get("canonical_name"),&lt;BR /&gt;"context_snippet": plain_text[context_start:context_end],&lt;BR /&gt;"processed_timestamp": datetime.utcnow()&lt;BR /&gt;})&lt;BR /&gt;break&lt;BR /&gt;return keyword_matches&lt;/P&gt;&lt;P&gt;# COMMAND ----------&lt;/P&gt;&lt;P&gt;# COMMAND ----------&lt;/P&gt;&lt;P&gt;nlp_model, linker_model = load_pipeline("en_ner_bc5cdr_md", resolve_abbreviations=True)&lt;/P&gt;&lt;P&gt;# COMMAND ----------&lt;/P&gt;&lt;P&gt;# COMMAND ----------&lt;/P&gt;&lt;P&gt;yesterday = (datetime.now() - timedelta(days=1)).date()&lt;/P&gt;&lt;P&gt;df_to_process = spark.table(SOURCE_TABLE).select(&lt;BR /&gt;F.col("notes_id").alias("note_id"),&lt;BR /&gt;"contentResolved",&lt;BR /&gt;"ingestion_time"&lt;BR /&gt;).filter(&lt;BR /&gt;F.to_date(F.col("ingestion_time")) == F.lit(yesterday)&lt;BR /&gt;).filter(&lt;BR /&gt;(F.col("contentResolved").isNotNull()) &amp;amp;&lt;BR /&gt;(F.trim(F.col("contentResolved")) != "")&lt;BR /&gt;).dropDuplicates(["note_id"]).limit(DAILY_LIMIT)&lt;/P&gt;&lt;P&gt;# &lt;span class="lia-unicode-emoji" title=":white_heavy_check_mark:"&gt;✅&lt;/span&gt; CRITICAL: Single partition for single-node&lt;BR /&gt;df_to_process = df_to_process.repartition(1)&lt;/P&gt;&lt;P&gt;if df_to_process.count() == 0:&lt;BR /&gt;dbutils.notebook.exit("No new notes")&lt;/P&gt;&lt;P&gt;# COMMAND ----------&lt;/P&gt;&lt;P&gt;# COMMAND ----------&lt;/P&gt;&lt;P&gt;nlp_schema = StructType([&lt;BR /&gt;StructField("note_id", StringType(), False),&lt;BR /&gt;StructField("keywords_extracted", StringType(), True),&lt;BR /&gt;StructField("sentiment_analysis", StringType(), True),&lt;BR /&gt;StructField("note_section", StringType(), False),&lt;BR /&gt;StructField("processed_timestamp", TimestampType(), False),&lt;BR /&gt;])&lt;/P&gt;&lt;P&gt;keyword_schema = StructType([&lt;BR /&gt;StructField("note_id", StringType(), False),&lt;BR /&gt;StructField("keyword_target", StringType(), False),&lt;BR /&gt;StructField("match_found", BooleanType(), False),&lt;BR /&gt;StructField("matched_term", StringType(), True),&lt;BR /&gt;StructField("match_type", StringType(), True),&lt;BR /&gt;StructField("confidence_score", FloatType(), True),&lt;BR /&gt;StructField("cui", StringType(), True),&lt;BR /&gt;StructField("canonical_name", StringType(), True),&lt;BR /&gt;StructField("context_snippet", StringType(), True),&lt;BR /&gt;StructField("processed_timestamp", TimestampType(), False)&lt;BR /&gt;])&lt;/P&gt;&lt;P&gt;all_notes = df_to_process.select("note_id", "contentResolved").toPandas()&lt;BR /&gt;total_notes = len(all_notes)&lt;BR /&gt;total_batches = (total_notes + BATCH_SIZE - 1) // BATCH_SIZE&lt;/P&gt;&lt;P&gt;for batch_num in range(total_batches):&lt;BR /&gt;batch_start = batch_num * BATCH_SIZE&lt;BR /&gt;batch_end = min(batch_start + BATCH_SIZE, total_notes)&lt;BR /&gt;batch_df = all_notes.iloc[batch_start:batch_end]&lt;BR /&gt;&lt;BR /&gt;nlp_rows = []&lt;BR /&gt;keyword_rows = []&lt;BR /&gt;&lt;BR /&gt;for idx, row in batch_df.iterrows():&lt;BR /&gt;note_id = row['note_id']&lt;BR /&gt;content = row['contentResolved']&lt;BR /&gt;&lt;BR /&gt;plain_text = extract_text_from_draftjs(content)&lt;BR /&gt;if not plain_text:&lt;BR /&gt;continue&lt;BR /&gt;&lt;BR /&gt;try:&lt;BR /&gt;entities = extract_entities(nlp_model, linker_model, plain_text, "en_ner_bc5cdr_md")&lt;BR /&gt;sentiment = analyze_sentiment(plain_text)&lt;BR /&gt;keywords = check_client_keywords(entities, plain_text, note_id)&lt;BR /&gt;&lt;BR /&gt;nlp_rows.append({&lt;BR /&gt;"note_id": note_id,&lt;BR /&gt;"keywords_extracted": json.dumps(entities, ensure_ascii=False),&lt;BR /&gt;"sentiment_analysis": json.dumps({&lt;BR /&gt;"sentiment": sentiment['sentiment'],&lt;BR /&gt;"confidence": round(sentiment['confidence'] * 100, 2),&lt;BR /&gt;"scores": {&lt;BR /&gt;"negative": round(sentiment['scores']['negative'] * 100, 2),&lt;BR /&gt;"neutral": round(sentiment['scores']['neutral'] * 100, 2),&lt;BR /&gt;"positive": round(sentiment['scores']['positive'] * 100, 2)&lt;BR /&gt;}&lt;BR /&gt;}, ensure_ascii=False),&lt;BR /&gt;"note_section": json.dumps({"original_note": content, "plain_text": plain_text}, ensure_ascii=False),&lt;BR /&gt;"processed_timestamp": datetime.utcnow()&lt;BR /&gt;})&lt;BR /&gt;&lt;BR /&gt;keyword_rows.extend(keywords)&lt;BR /&gt;del entities, sentiment, keywords, plain_text&lt;BR /&gt;&lt;BR /&gt;except:&lt;BR /&gt;continue&lt;BR /&gt;&lt;BR /&gt;if nlp_rows:&lt;BR /&gt;spark.createDataFrame(nlp_rows, schema=nlp_schema).write.mode("append").saveAsTable(DESTINATION_TABLE)&lt;BR /&gt;&lt;BR /&gt;if keyword_rows:&lt;BR /&gt;spark.createDataFrame(keyword_rows, schema=keyword_schema).write.mode("append").saveAsTable(SPECIAL_KEYWORD_TABLE)&lt;BR /&gt;&lt;BR /&gt;del batch_df, nlp_rows, keyword_rows&lt;BR /&gt;gc.collect()&lt;/P&gt;&lt;P&gt;del all_notes&lt;BR /&gt;gc.collect()&lt;/P&gt;&lt;DIV&gt;&lt;SPAN&gt;---&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;Thank you again for your help! Your insight saved me days of troubleshooting. Please let me know if you see any issues or have suggestions for optimization.&lt;/SPAN&gt;&lt;/DIV&gt;&lt;/DIV&gt;</description>
      <pubDate>Tue, 18 Nov 2025 10:12:32 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/can-t-save-results-to-target-table-out-of-memory-error/m-p/139500#M51216</guid>
      <dc:creator>Techtic_kush</dc:creator>
      <dc:date>2025-11-18T10:12:32Z</dc:date>
    </item>
  </channel>
</rss>

