Friday
yesterday
You’re right that the behaviour is weird at first glance (“5k rows on a 64 GB cluster and I blow up on write”), but your stack trace is actually very revealing: this isn’t a classic Delta write / shuffle OOM – it’s SciSpaCy/UMLS falling over when loading its ANN index on the executors.
This is happening inside get_pipeline() when you construct the EntityLinker. That’s where SciSpaCy loads the big UMLS ANN index from disk (via cached_path).
Some points to consider:
df_entities.show() only triggers enough partitions to show the first N rows (by default 20), so maybe 1–2 Python workers actually run get_pipeline() and load the index.
df_entities.write... needs to process all partitions, so more Python worker processes spin up across your autoscaling cluster.
Each new Python worker sees global_nlp is None and tries to load SciSpaCy + UMLS index again.
The ANN index for UMLS is big (multi-GB). Multiple concurrent loads → heavy disk and memory pressure and/or a partially read index file → basic_ios::clear: iostream error.
So the “OOM on write” is a side effect of many workers loading huge models and a huge index at once, not the Delta write itself.
Can you please try the following?
Stop autoscaling and run a single worker (or even single-node cluster):
Driver: 64 GB, 8 cores
Workers: 0–1 (or use a single-node cluster with driver only)
Force fewer Python workers by lowering partitions (just 1 or max 2)
I have some other ideas but let's first see this one.
19 hours ago - last edited 18 hours ago
# Databricks notebook source
# MAGIC %run ../helpers/clone_nlp_service
# COMMAND ----------
# MAGIC %run ../helpers/sentiment_analysis_service
# COMMAND ----------
# COMMAND ----------
SOURCE_TABLE = "crc_lakehouse.bronze.emr_notes"
DESTINATION_TABLE = "crc_lakehouse.silver.notes_nlp_processed"
SPECIAL_KEYWORD_TABLE = "crc_lakehouse.silver.notes_keyword_monitoring"
DAILY_LIMIT = 5000
BATCH_SIZE = 100
CLIENT_SPECIAL_KEYWORDS = [
"Spondylopathy", "Malnutrition", "Morbid obesity", "Lying flat",
"Orthopnea", "Wasting", "Atrophy", "Respiratory failure",
"Dysphagia following CVA", "Hemiparesis", "Hemiplegia",
"Difficulty swallowing", "Hydration", "Parkinson's", "COPD",
"Asthma", "Sepsis", "Septicemia", "IVF", "Transplant",
"Immune deficiency", "Endocarditis", "Interstitial lung",
"Pulmonary fibrosis", "Cystic fibrosis", "Narcolepsy",
"Osteomyelitis", "Chronic pancreatitis", "Diabetic retinopathy",
"Myelodysplastic syndrome", "pocketing food", "drooling",
"cough with liquid", "coughing with liquid", "painful swallow",
"pain with swallow"
]
# COMMAND ----------
# COMMAND ----------
from pyspark.sql import functions as F
from pyspark.sql.types import *
from datetime import datetime, timedelta
import json
import pandas as pd
import gc
def extract_text_from_draftjs(content_json):
try:
if not content_json:
return None
data = json.loads(content_json)
blocks = data.get("blocks", [])
text = "\n".join(
block.get("text", "").strip()
for block in blocks
if block.get("text", "").strip()
)
return text if text else None
except:
return None
def check_client_keywords(entities, plain_text, note_id):
keyword_matches = []
for keyword in CLIENT_SPECIAL_KEYWORDS:
keyword_lower = keyword.lower()
for entity in entities:
canonical = (entity.get("canonical_name") or "").lower()
extracted = (entity.get("extracted_text") or "").lower()
if keyword_lower in canonical or keyword_lower in extracted:
start = entity.get("start_char", 0)
end = entity.get("end_char", 0)
context_start = max(0, start - 50)
context_end = min(len(plain_text), end + 50)
keyword_matches.append({
"note_id": note_id,
"keyword_target": keyword,
"match_found": True,
"matched_term": entity.get("extracted_text"),
"match_type": entity.get("match_type"),
"confidence_score": float(entity.get("confidence_score")) if entity.get("confidence_score") else None,
"cui": entity.get("cui"),
"canonical_name": entity.get("canonical_name"),
"context_snippet": plain_text[context_start:context_end],
"processed_timestamp": datetime.utcnow()
})
break
return keyword_matches
# COMMAND ----------
# COMMAND ----------
nlp_model, linker_model = load_pipeline("en_ner_bc5cdr_md", resolve_abbreviations=True)
# COMMAND ----------
# COMMAND ----------
yesterday = (datetime.now() - timedelta(days=1)).date()
df_to_process = spark.table(SOURCE_TABLE).select(
F.col("notes_id").alias("note_id"),
"contentResolved",
"ingestion_time"
).filter(
F.to_date(F.col("ingestion_time")) == F.lit(yesterday)
).filter(
(F.col("contentResolved").isNotNull()) &
(F.trim(F.col("contentResolved")) != "")
).dropDuplicates(["note_id"]).limit(DAILY_LIMIT)
# ✅ CRITICAL: Single partition for single-node
df_to_process = df_to_process.repartition(1)
if df_to_process.count() == 0:
dbutils.notebook.exit("No new notes")
# COMMAND ----------
# COMMAND ----------
nlp_schema = StructType([
StructField("note_id", StringType(), False),
StructField("keywords_extracted", StringType(), True),
StructField("sentiment_analysis", StringType(), True),
StructField("note_section", StringType(), False),
StructField("processed_timestamp", TimestampType(), False),
])
keyword_schema = StructType([
StructField("note_id", StringType(), False),
StructField("keyword_target", StringType(), False),
StructField("match_found", BooleanType(), False),
StructField("matched_term", StringType(), True),
StructField("match_type", StringType(), True),
StructField("confidence_score", FloatType(), True),
StructField("cui", StringType(), True),
StructField("canonical_name", StringType(), True),
StructField("context_snippet", StringType(), True),
StructField("processed_timestamp", TimestampType(), False)
])
all_notes = df_to_process.select("note_id", "contentResolved").toPandas()
total_notes = len(all_notes)
total_batches = (total_notes + BATCH_SIZE - 1) // BATCH_SIZE
for batch_num in range(total_batches):
batch_start = batch_num * BATCH_SIZE
batch_end = min(batch_start + BATCH_SIZE, total_notes)
batch_df = all_notes.iloc[batch_start:batch_end]
nlp_rows = []
keyword_rows = []
for idx, row in batch_df.iterrows():
note_id = row['note_id']
content = row['contentResolved']
plain_text = extract_text_from_draftjs(content)
if not plain_text:
continue
try:
entities = extract_entities(nlp_model, linker_model, plain_text, "en_ner_bc5cdr_md")
sentiment = analyze_sentiment(plain_text)
keywords = check_client_keywords(entities, plain_text, note_id)
nlp_rows.append({
"note_id": note_id,
"keywords_extracted": json.dumps(entities, ensure_ascii=False),
"sentiment_analysis": json.dumps({
"sentiment": sentiment['sentiment'],
"confidence": round(sentiment['confidence'] * 100, 2),
"scores": {
"negative": round(sentiment['scores']['negative'] * 100, 2),
"neutral": round(sentiment['scores']['neutral'] * 100, 2),
"positive": round(sentiment['scores']['positive'] * 100, 2)
}
}, ensure_ascii=False),
"note_section": json.dumps({"original_note": content, "plain_text": plain_text}, ensure_ascii=False),
"processed_timestamp": datetime.utcnow()
})
keyword_rows.extend(keywords)
del entities, sentiment, keywords, plain_text
except:
continue
if nlp_rows:
spark.createDataFrame(nlp_rows, schema=nlp_schema).write.mode("append").saveAsTable(DESTINATION_TABLE)
if keyword_rows:
spark.createDataFrame(keyword_rows, schema=keyword_schema).write.mode("append").saveAsTable(SPECIAL_KEYWORD_TABLE)
del batch_df, nlp_rows, keyword_rows
gc.collect()
del all_notes
gc.collect()
Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!
Sign Up Now