cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

best practice to parse binary file like pdf, png, docx

seefoods
Valued Contributor

Hello Guyz, 

i have use autoloader to load pdf file like binary source, So i dont want to use ai_parse for databricks but i use doclin someone knows how to use it well? 

my df look like 


from pyspark.sql import functions as F
df = (
    spark.readStream.format("cloudFiles")
    .option("cloudFiles.format", "binaryFile")
    .option("pathGlobFilter", "*.pdf")
    .load("/Volumes/default/landing/unstructured_data/client")
    .selectExpr("*", "_metadata as _source_file_metadata")
    .withColumns(
        {
            "ingestion_timestamp": F.current_timestamp(),
            "file_extension": F.lower(F.element_at(F.split("path", "\\."), -1)),
            "file_size_mb": col("length") / (1024 * 1024),
            "document_id": F.sha1(
                F.concat(
                    F.col("_source_file_metadata.file_name"), col("modificationTime")
                )
            ),
            "supported_format": F.when(
                F.col("file_extension").isin(
                    ["pdf", "PDF", "png", "PNG", "jpg", "JPG", "jpeg", "JPEG"]
                ),
                F.lit(True),
            ).otherwise(F.lit(False)),
        }
    )
)



Cordially, 

1 ACCEPTED SOLUTION

Accepted Solutions

MoJaMa
Databricks Employee
Databricks Employee

Is the question that you want to use docling inside the stream to do the conversion?

If so you can pip install docling in your notebook and then use it inside forEachBatch etc.

This code here is just from Google to show you the general structure:

def process_row(row):
    # Custom logic for each row
    print(row)

def foreach_batch_function(batch_df, epoch_id):
    # Efficiently iterate over local rows on the driver
    for row in batch_df.toLocalIterator():
        process_row(row)

streaming_df.writeStream.foreachBatch(foreach_batch_function).start()

 Here's pseudo-code (untested) for how you might want to do it:

def parse_batch(batch_df, batch_id: int):
    import io, json, tempfile
    from docling.document_converter import DocumentConverter

    # Reuse one converter per micro-batch
    converter = DocumentConverter()  # defaults handle PDF parsing well :llmCitationRef[7]

    rows = []
    # toLocalIterator streams rows without materializing entire batch on driver
    for row in batch_df.toLocalIterator():
        # Write binary content to a temp file; convert accepts file path :llmCitationRef[8]
        with tempfile.NamedTemporaryFile(suffix=".pdf", delete=True) as tmp:
            tmp.write(row.content)
            tmp.flush()
            result = converter.convert(tmp.name)  # returns conversion result :llmCitationRef[9]
            doc = result.document  # DoclingDocument :llmCitationRef[10]
            md = doc.export_to_markdown()         # Markdown export :llmCitationRef[11]
            j = json.dumps(doc.export_to_dict())  # JSON export :llmCitationRef[12]
            rows.append((row.document_id, row.path, md, j))

    if rows:
        out_df = spark.createDataFrame(
            rows, ["document_id", "path", "markdown", "json"]
        )
        out_df.write.mode("append").saveAsTable(target_table)

query = (
    df.writeStream
    .option("checkpointLocation", "/Volumes/default/checkpoints/unstructured_docs_docling")
    .foreachBatch(parse_batch)
    .start()
)

 

View solution in original post

1 REPLY 1

MoJaMa
Databricks Employee
Databricks Employee

Is the question that you want to use docling inside the stream to do the conversion?

If so you can pip install docling in your notebook and then use it inside forEachBatch etc.

This code here is just from Google to show you the general structure:

def process_row(row):
    # Custom logic for each row
    print(row)

def foreach_batch_function(batch_df, epoch_id):
    # Efficiently iterate over local rows on the driver
    for row in batch_df.toLocalIterator():
        process_row(row)

streaming_df.writeStream.foreachBatch(foreach_batch_function).start()

 Here's pseudo-code (untested) for how you might want to do it:

def parse_batch(batch_df, batch_id: int):
    import io, json, tempfile
    from docling.document_converter import DocumentConverter

    # Reuse one converter per micro-batch
    converter = DocumentConverter()  # defaults handle PDF parsing well :llmCitationRef[7]

    rows = []
    # toLocalIterator streams rows without materializing entire batch on driver
    for row in batch_df.toLocalIterator():
        # Write binary content to a temp file; convert accepts file path :llmCitationRef[8]
        with tempfile.NamedTemporaryFile(suffix=".pdf", delete=True) as tmp:
            tmp.write(row.content)
            tmp.flush()
            result = converter.convert(tmp.name)  # returns conversion result :llmCitationRef[9]
            doc = result.document  # DoclingDocument :llmCitationRef[10]
            md = doc.export_to_markdown()         # Markdown export :llmCitationRef[11]
            j = json.dumps(doc.export_to_dict())  # JSON export :llmCitationRef[12]
            rows.append((row.document_id, row.path, md, j))

    if rows:
        out_df = spark.createDataFrame(
            rows, ["document_id", "path", "markdown", "json"]
        )
        out_df.write.mode("append").saveAsTable(target_table)

query = (
    df.writeStream
    .option("checkpointLocation", "/Volumes/default/checkpoints/unstructured_docs_docling")
    .foreachBatch(parse_batch)
    .start()
)