<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: Slow Delta write when creating embeddings with mapPartitions in Generative AI</title>
    <link>https://community.databricks.com/t5/generative-ai/slow-delta-write-when-creating-embeddings-with-mappartitions/m-p/141251#M1503</link>
    <description>&lt;P&gt;Hi&lt;/P&gt;&lt;P&gt;You’ve optimised the &lt;EM&gt;embedding&lt;/EM&gt; side really nicely already, batching in mapPartitions and creating one Azure client per partition is exactly what we recommend.&lt;/P&gt;&lt;P&gt;For 35k rows, if embedding is fast but the &lt;STRONG&gt;Delta write/commit is slow&lt;/STRONG&gt;, it’s almost always due to:&lt;/P&gt;&lt;UL&gt;&lt;LI&gt;&lt;P&gt;too many small output files, and/or&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P&gt;extra passes over the DataFrame, and/or&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P&gt;a cluster that’s over-parallelised for the amount of data.&lt;/P&gt;&lt;/LI&gt;&lt;/UL&gt;&lt;P&gt;I would suggest to look into these:&lt;/P&gt;&lt;H2&gt;Control the number of output files&lt;/H2&gt;&lt;P&gt;By default Spark uses something like spark.sql.shuffle.partitions = 200, which means your createDataFrame(...).write can easily produce ~200 tiny files for just 35k rows. The overhead of creating those files + committing metadata often dominates the runtime.&lt;/P&gt;&lt;P&gt;For a dataset of this size, you typically want &lt;STRONG&gt;a small number of files&lt;/STRONG&gt; (1–4, maybe 8 max).&lt;/P&gt;&lt;P&gt;Key points:&lt;/P&gt;&lt;UL&gt;&lt;LI&gt;&lt;P&gt;Use &lt;STRONG&gt;coalesce()&lt;/STRONG&gt;, not repartition(), right before the write.&lt;BR /&gt;coalesce(n) avoids a shuffle and just reduces the number of output partitions.&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P&gt;For 35k vectors, 1–4 files is absolutely fine and usually much faster to commit.&lt;/P&gt;&lt;/LI&gt;&lt;/UL&gt;&lt;H2&gt;2. Avoid double computation / extra actions&lt;/H2&gt;&lt;P&gt;You mentioned using persist() to avoid recomputing when you call count(). That’s good. Two extra tips:&lt;/P&gt;&lt;UL&gt;&lt;LI&gt;&lt;P&gt;Persist &lt;STRONG&gt;after&lt;/STRONG&gt; your embedding transform, not on the original DF.&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P&gt;Only trigger &lt;STRONG&gt;one action&lt;/STRONG&gt; before the final write (e.g. count() or maybe display() for debugging). Don’t call count(), show(), and then write without persistence, or Spark will recompute the whole pipeline multiple times.&lt;/P&gt;&lt;/LI&gt;&lt;/UL&gt;&lt;H2&gt;3. Tuning cluster size &amp;amp; IO for this workload&lt;/H2&gt;&lt;P&gt;For 35k rows of 3072-dim embeddings:&lt;/P&gt;&lt;UL&gt;&lt;LI&gt;&lt;P&gt;You &lt;STRONG&gt;don’t need a huge cluster&lt;/STRONG&gt;.&lt;BR /&gt;Too many workers mean too many tiny output tasks and more small files.&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P&gt;Often a small cluster (e.g. 1–2 workers with decent memory) is &lt;EM&gt;faster&lt;/EM&gt; end-to-end than a large autoscaling cluster for this kind of “wide but not huge” dataset.&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P&gt;Make sure you’re writing to a performant storage account (Premium / general purpose v2). In most managed Databricks setups, DBFS is already backed by appropriate storage, so usually this is fine.&lt;/P&gt;&lt;/LI&gt;&lt;/UL&gt;&lt;P&gt;If you see a lot of tiny tasks in the Spark UI, that’s a sign to:&lt;/P&gt;&lt;UL&gt;&lt;LI&gt;&lt;P&gt;lower spark.sql.shuffle.partitions for the job (e.g. 32 or even 8 for this size), and/or&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P&gt;coalesce before writing as shown above.&lt;/P&gt;&lt;/LI&gt;&lt;/UL&gt;&lt;H2&gt;4. Data layout &amp;amp; Delta options (Stitch / OPTIMIZE)&lt;/H2&gt;&lt;P&gt;For a one-off creation of 35k embeddings, the Delta housekeeping features (Stitch / OPTIMIZE) usually aren’t needed for &lt;EM&gt;performance of the write itself&lt;/EM&gt;, but they matter if:&lt;/P&gt;&lt;UL&gt;&lt;LI&gt;&lt;P&gt;you will repeatedly &lt;STRONG&gt;append&lt;/STRONG&gt; to this table,&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P&gt;you will &lt;STRONG&gt;query&lt;/STRONG&gt; it a lot (e.g. for vector search candidates), or&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P&gt;you accidentally created many small files in early runs.&lt;/P&gt;&lt;/LI&gt;&lt;/UL&gt;&lt;H2&gt;5. Data type choice for embeddings&lt;/H2&gt;&lt;P&gt;You’re using ArrayType(FloatType()), which is fine. A few extra notes:&lt;/P&gt;&lt;UL&gt;&lt;LI&gt;&lt;P&gt;If you’re on a Databricks runtime that supports the &lt;STRONG&gt;VECTOR&lt;/STRONG&gt; type (for native vector search), consider storing as VECTOR(3072) – it doesn’t massively change write &lt;EM&gt;speed&lt;/EM&gt;, but it’s the recommended long-term format for similarity search.&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P&gt;If you stick to arrays, make sure the schema is stable between runs (same type, same dimension). Schema evolution (new columns or type changes) can add extra overhead due to Delta metadata handling.&lt;/P&gt;&lt;/LI&gt;&lt;/UL&gt;</description>
    <pubDate>Fri, 05 Dec 2025 13:06:03 GMT</pubDate>
    <dc:creator>bianca_unifeye</dc:creator>
    <dc:date>2025-12-05T13:06:03Z</dc:date>
    <item>
      <title>Slow Delta write when creating embeddings with mapPartitions</title>
      <link>https://community.databricks.com/t5/generative-ai/slow-delta-write-when-creating-embeddings-with-mappartitions/m-p/141196#M1502</link>
      <description>&lt;P&gt;I’m trying to generate 35k+ embeddings in Databricks. What I’ve tried so far:&lt;/P&gt;&lt;UL&gt;&lt;LI&gt;Per-row UDF (very slow).&lt;/LI&gt;&lt;LI&gt;Replaced UDF with rdd.mapPartitions to batch API calls, create one Azure client per partition, and call client.embed_documents(texts) in batches. This avoids per-row Python UDF overhead and improves embedding throughput.&lt;/LI&gt;&lt;LI&gt;Measured embedding execution vs Delta write time; embedding materialization is fine but the Delta write (saveAsTable / commit) is now the dominant, slow step. I used persist() to avoid double computation when calling count() before write.&lt;/LI&gt;&lt;/UL&gt;&lt;P&gt;Minimal embedding function I tested (simplified):&lt;/P&gt;&lt;LI-CODE lang="python"&gt;from pyspark.sql import Row, DataFrame
from pyspark.sql.types import StructField, StructType, ArrayType, FloatType
import os, time
from langchain_openai import AzureOpenAIEmbeddings

def embed_with_map_partitions_simple(df: DataFrame, column_names: str | list[str], batch_size: int = 128, repartition: int | None = None) -&amp;gt; DataFrame:
    if isinstance(column_names, str):
        column_names = [column_names]
    if repartition:
        df = df.repartition(repartition)
    spark = df.sparkSession
    new_fields = list(df.schema.fields) + [StructField(f"{c}_embedding", ArrayType(FloatType()), True) for c in column_names]
    new_schema = StructType(new_fields)

    def partition_embed(rows_iter):
        rows = list(rows_iter)
        if not rows:
            return iter(())
        client = AzureOpenAIEmbeddings(
            azure_endpoint=os.getenv("openai_api_base"),
            azure_deployment=os.getenv("openai_deployment_name"),
            api_key=os.getenv("openai_api_key"),
            api_version=os.getenv("openai_api_version")
        )
        n = len(rows)
        embeddings_per_column = {c: [None]*n for c in column_names}
        for col in column_names:
            for i in range(0, n, batch_size):
                batch = rows[i:i+batch_size]
                texts = [getattr(r, col, "") or "" for r in batch]
                try:
                    batch_emb = client.embed_documents(texts)
                except Exception:
                    batch_emb = [[0.0]*3072 for _ in texts]
                for j, emb in enumerate(batch_emb):
                    embeddings_per_column[col][i+j] = emb
        for idx, r in enumerate(rows):
            d = r.asDict()
            for c in column_names:
                d[f"{c}_embedding"] = embeddings_per_column[c][idx] or [0.0]*3072
            yield Row(**d)

    return spark.createDataFrame(df.rdd.mapPartitions(partition_embed), schema=new_schema)&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;Question: can Databricks advise best practices to reduce Delta write/commit time for this workflow (recommended write options, file sizing/num files, transaction tuning, or cluster/io settings)? Also any guidance on safely persisting large transformed DF before writing and on Stitch/OPTIMIZE usage would be helpful.&lt;/P&gt;&lt;P&gt;Thanks.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Thu, 04 Dec 2025 20:50:30 GMT</pubDate>
      <guid>https://community.databricks.com/t5/generative-ai/slow-delta-write-when-creating-embeddings-with-mappartitions/m-p/141196#M1502</guid>
      <dc:creator>andcch552</dc:creator>
      <dc:date>2025-12-04T20:50:30Z</dc:date>
    </item>
    <item>
      <title>Re: Slow Delta write when creating embeddings with mapPartitions</title>
      <link>https://community.databricks.com/t5/generative-ai/slow-delta-write-when-creating-embeddings-with-mappartitions/m-p/141251#M1503</link>
      <description>&lt;P&gt;Hi&lt;/P&gt;&lt;P&gt;You’ve optimised the &lt;EM&gt;embedding&lt;/EM&gt; side really nicely already, batching in mapPartitions and creating one Azure client per partition is exactly what we recommend.&lt;/P&gt;&lt;P&gt;For 35k rows, if embedding is fast but the &lt;STRONG&gt;Delta write/commit is slow&lt;/STRONG&gt;, it’s almost always due to:&lt;/P&gt;&lt;UL&gt;&lt;LI&gt;&lt;P&gt;too many small output files, and/or&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P&gt;extra passes over the DataFrame, and/or&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P&gt;a cluster that’s over-parallelised for the amount of data.&lt;/P&gt;&lt;/LI&gt;&lt;/UL&gt;&lt;P&gt;I would suggest to look into these:&lt;/P&gt;&lt;H2&gt;Control the number of output files&lt;/H2&gt;&lt;P&gt;By default Spark uses something like spark.sql.shuffle.partitions = 200, which means your createDataFrame(...).write can easily produce ~200 tiny files for just 35k rows. The overhead of creating those files + committing metadata often dominates the runtime.&lt;/P&gt;&lt;P&gt;For a dataset of this size, you typically want &lt;STRONG&gt;a small number of files&lt;/STRONG&gt; (1–4, maybe 8 max).&lt;/P&gt;&lt;P&gt;Key points:&lt;/P&gt;&lt;UL&gt;&lt;LI&gt;&lt;P&gt;Use &lt;STRONG&gt;coalesce()&lt;/STRONG&gt;, not repartition(), right before the write.&lt;BR /&gt;coalesce(n) avoids a shuffle and just reduces the number of output partitions.&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P&gt;For 35k vectors, 1–4 files is absolutely fine and usually much faster to commit.&lt;/P&gt;&lt;/LI&gt;&lt;/UL&gt;&lt;H2&gt;2. Avoid double computation / extra actions&lt;/H2&gt;&lt;P&gt;You mentioned using persist() to avoid recomputing when you call count(). That’s good. Two extra tips:&lt;/P&gt;&lt;UL&gt;&lt;LI&gt;&lt;P&gt;Persist &lt;STRONG&gt;after&lt;/STRONG&gt; your embedding transform, not on the original DF.&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P&gt;Only trigger &lt;STRONG&gt;one action&lt;/STRONG&gt; before the final write (e.g. count() or maybe display() for debugging). Don’t call count(), show(), and then write without persistence, or Spark will recompute the whole pipeline multiple times.&lt;/P&gt;&lt;/LI&gt;&lt;/UL&gt;&lt;H2&gt;3. Tuning cluster size &amp;amp; IO for this workload&lt;/H2&gt;&lt;P&gt;For 35k rows of 3072-dim embeddings:&lt;/P&gt;&lt;UL&gt;&lt;LI&gt;&lt;P&gt;You &lt;STRONG&gt;don’t need a huge cluster&lt;/STRONG&gt;.&lt;BR /&gt;Too many workers mean too many tiny output tasks and more small files.&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P&gt;Often a small cluster (e.g. 1–2 workers with decent memory) is &lt;EM&gt;faster&lt;/EM&gt; end-to-end than a large autoscaling cluster for this kind of “wide but not huge” dataset.&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P&gt;Make sure you’re writing to a performant storage account (Premium / general purpose v2). In most managed Databricks setups, DBFS is already backed by appropriate storage, so usually this is fine.&lt;/P&gt;&lt;/LI&gt;&lt;/UL&gt;&lt;P&gt;If you see a lot of tiny tasks in the Spark UI, that’s a sign to:&lt;/P&gt;&lt;UL&gt;&lt;LI&gt;&lt;P&gt;lower spark.sql.shuffle.partitions for the job (e.g. 32 or even 8 for this size), and/or&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P&gt;coalesce before writing as shown above.&lt;/P&gt;&lt;/LI&gt;&lt;/UL&gt;&lt;H2&gt;4. Data layout &amp;amp; Delta options (Stitch / OPTIMIZE)&lt;/H2&gt;&lt;P&gt;For a one-off creation of 35k embeddings, the Delta housekeeping features (Stitch / OPTIMIZE) usually aren’t needed for &lt;EM&gt;performance of the write itself&lt;/EM&gt;, but they matter if:&lt;/P&gt;&lt;UL&gt;&lt;LI&gt;&lt;P&gt;you will repeatedly &lt;STRONG&gt;append&lt;/STRONG&gt; to this table,&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P&gt;you will &lt;STRONG&gt;query&lt;/STRONG&gt; it a lot (e.g. for vector search candidates), or&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P&gt;you accidentally created many small files in early runs.&lt;/P&gt;&lt;/LI&gt;&lt;/UL&gt;&lt;H2&gt;5. Data type choice for embeddings&lt;/H2&gt;&lt;P&gt;You’re using ArrayType(FloatType()), which is fine. A few extra notes:&lt;/P&gt;&lt;UL&gt;&lt;LI&gt;&lt;P&gt;If you’re on a Databricks runtime that supports the &lt;STRONG&gt;VECTOR&lt;/STRONG&gt; type (for native vector search), consider storing as VECTOR(3072) – it doesn’t massively change write &lt;EM&gt;speed&lt;/EM&gt;, but it’s the recommended long-term format for similarity search.&lt;/P&gt;&lt;/LI&gt;&lt;LI&gt;&lt;P&gt;If you stick to arrays, make sure the schema is stable between runs (same type, same dimension). Schema evolution (new columns or type changes) can add extra overhead due to Delta metadata handling.&lt;/P&gt;&lt;/LI&gt;&lt;/UL&gt;</description>
      <pubDate>Fri, 05 Dec 2025 13:06:03 GMT</pubDate>
      <guid>https://community.databricks.com/t5/generative-ai/slow-delta-write-when-creating-embeddings-with-mappartitions/m-p/141251#M1503</guid>
      <dc:creator>bianca_unifeye</dc:creator>
      <dc:date>2025-12-05T13:06:03Z</dc:date>
    </item>
  </channel>
</rss>

