Is the question that you want to use docling inside the stream to do the conversion?
If so you can pip install docling in your notebook and then use it inside forEachBatch etc.
This code here is just from Google to show you the general structure:
def process_row(row):
# Custom logic for each row
print(row)
def foreach_batch_function(batch_df, epoch_id):
# Efficiently iterate over local rows on the driver
for row in batch_df.toLocalIterator():
process_row(row)
streaming_df.writeStream.foreachBatch(foreach_batch_function).start()
Here's pseudo-code (untested) for how you might want to do it:
def parse_batch(batch_df, batch_id: int):
import io, json, tempfile
from docling.document_converter import DocumentConverter
# Reuse one converter per micro-batch
converter = DocumentConverter() # defaults handle PDF parsing well :llmCitationRef[7]
rows = []
# toLocalIterator streams rows without materializing entire batch on driver
for row in batch_df.toLocalIterator():
# Write binary content to a temp file; convert accepts file path :llmCitationRef[8]
with tempfile.NamedTemporaryFile(suffix=".pdf", delete=True) as tmp:
tmp.write(row.content)
tmp.flush()
result = converter.convert(tmp.name) # returns conversion result :llmCitationRef[9]
doc = result.document # DoclingDocument :llmCitationRef[10]
md = doc.export_to_markdown() # Markdown export :llmCitationRef[11]
j = json.dumps(doc.export_to_dict()) # JSON export :llmCitationRef[12]
rows.append((row.document_id, row.path, md, j))
if rows:
out_df = spark.createDataFrame(
rows, ["document_id", "path", "markdown", "json"]
)
out_df.write.mode("append").saveAsTable(target_table)
query = (
df.writeStream
.option("checkpointLocation", "/Volumes/default/checkpoints/unstructured_docs_docling")
.foreachBatch(parse_batch)
.start()
)