<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: Problem with streaming jobs (foreachBatch) with USER_ISOLATION compute cluster in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/problem-with-streaming-jobs-foreachbatch-with-user-isolation/m-p/103045#M41309</link>
    <description>&lt;P&gt;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/120079"&gt;@prakharcode&lt;/a&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;Thank you for sharing the detailed information about your issue. Before diving into solutions, I want to confirm if this is still an ongoing problem you're facing.&amp;nbsp;Regarding the difference in job performance between "NO_ISOLATION" mode and "USER_ISOLATION" mode, it might be good to dissect the memory space with memory dumps / heap dumps, but generally speaking, I think it makes sense to consider that in "USER_ISOLATION" mode, there are additional isolation layers to ensure that resources are securely allocated to individual users. This isolation can consume extra memory and computational resources, leading to potential OOM errors and performance degradation. For example,&amp;nbsp;"USER_ISOLATION" mode enforces stricter security and isolation policies, which can impact the overall performance of the cluster. This mode is designed to provide better security at the cost of some performance overhead. In your use case, we would have to benchmark and collect more data points to determine more specifically what is the memory utilization and overhead, as well as where is it coming from, what are the biggest contributors/dominators, etc.&lt;/P&gt;</description>
    <pubDate>Mon, 23 Dec 2024 16:44:01 GMT</pubDate>
    <dc:creator>VZLA</dc:creator>
    <dc:date>2024-12-23T16:44:01Z</dc:date>
    <item>
      <title>Problem with streaming jobs (foreachBatch) with USER_ISOLATION compute cluster</title>
      <link>https://community.databricks.com/t5/data-engineering/problem-with-streaming-jobs-foreachbatch-with-user-isolation/m-p/89293#M37755</link>
      <description>&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;DIV class=""&gt;We have been trying to run a streaming job on an all-purpose compute (4 cores, 16 gb) in the “user_isolation”, recommended by databricks to run with/for unity catalog. The job reads CDC files produced by a table refreshed every hour and produces around ~480k rows that is then merged with a table of about ~980k rows.&lt;BR /&gt;The join is executed as streaming `foreachBatch` job, where we execute read from files from s3&amp;nbsp; and write it like:&lt;DIV class=""&gt;&amp;nbsp;&lt;PRE&gt;spark&lt;BR /&gt;.readStream&lt;BR /&gt;.format("cloudFiles") &lt;BR /&gt;.schema(df_schema)&lt;BR /&gt;.option("cloudFiles.format", "parquet")&lt;BR /&gt;.load(f"{s3_path_base}/*/*")&lt;BR /&gt;.writeStream.foreachBatch(upsert_to_delta)&lt;BR /&gt;.option( "checkpointLocation", "&amp;lt;location_in_s3&amp;gt;", )&lt;BR /&gt;.trigger(availableNow=True)&lt;BR /&gt;.start()&lt;/PRE&gt;&lt;DIV class=""&gt;I have mentioned the&lt;SPAN&gt;&lt;SPAN&gt;&amp;nbsp;upsert_to_delta&lt;SPAN&gt;&amp;nbsp;function at the end with other relevant details.&lt;/SPAN&gt;&lt;/SPAN&gt;&lt;/SPAN&gt;&lt;DIV class=""&gt;&amp;nbsp;&lt;DIV class=""&gt;The same job with the same table run perfectly on a cluster without data_secutiy_mode set to “USER_ISOLATION”. As soon as the “USER_ISOLATION” mode is turned on, with the same cluster specifications and configuration the job starts hitting OOM errors. Another part that we are facing is general degradation in the performance of the jobs. Due to some internal overhead of the unity catalog, the jobs are running slow.&lt;BR /&gt;The jobs that used to run within a minute on a cluster with “NO_ISOLATION”, with the same configuration for the cluster and similar size of data the job takes sometimes twice the time or even more. No change has been made to the cluster setting whatsoever and still, and we are still seeing OOM errors or performance hits.&lt;DIV class=""&gt;&amp;nbsp;&lt;DIV class=""&gt;&lt;STRONG&gt;&lt;STRONG&gt;Important questions:&lt;/STRONG&gt;&lt;/STRONG&gt;&lt;DIV class=""&gt;Is there something that we can do to overcome the OOM error and improve the performance of the job?&lt;DIV class=""&gt;Also, why does the same job runs on a cluster with exactly same configuration with "NO_ISOLATION" mode and fails with "USER_ISOLATION" mode?&lt;DIV class=""&gt;&amp;nbsp;&lt;DIV class=""&gt;Any help is appreciated! Thank you.&lt;DIV class=""&gt;&amp;nbsp;&lt;DIV class=""&gt;&lt;STRONG&gt;&lt;STRONG&gt;General information:&lt;/STRONG&gt;&lt;/STRONG&gt;&lt;DIV class=""&gt;Data type that is being processed at source is Parquet.&lt;DIV class=""&gt;Target table is delta table.&lt;BR /&gt;&lt;STRONG&gt;&lt;STRONG&gt;DBR&lt;SPAN&gt;&amp;nbsp;version: 14.3 LTS (spark 3.5, scala 2.12)&lt;BR /&gt;Driver and worker type: m6gd.xlarge (2 workers)&lt;/SPAN&gt;&lt;/STRONG&gt;&lt;/STRONG&gt;&lt;DIV class=""&gt;&amp;nbsp;&lt;DIV class=""&gt;&lt;STRONG&gt;&lt;STRONG&gt;error returned:&lt;/STRONG&gt;&lt;/STRONG&gt;&lt;DIV class=""&gt;&lt;SPAN&gt;&lt;SPAN&gt;INTERNAL: Job aborted due to stage failure: Task 2 in stage 546.0 failed 4 times, most recent failure: Lost task 2.3 in stage 546.0 (TID 3942) (10.48.255.186 executor 6): ExecutorLostFailure (executor 6 exited caused by one of the running tasks) Reason: Command exited with code 52&lt;/SPAN&gt;&lt;/SPAN&gt;&lt;DIV class=""&gt;&lt;P&gt;&lt;STRONG&gt;relevant code:&lt;/STRONG&gt;&lt;/P&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;def upsert_to_delta(micro_df, batch_id):
    
    # spark DF of the columns and its type from source cdc files
    spark.createDataFrame(
      micro_batch_df.dtypes, schema=self.schema  # schema here is just &amp;lt;column_name, data_type&amp;gt;
    ).createOrReplaceGlobalTempView("SOURCE_CDC_FILES_VIEW_COLUMNS")

    # spark DF of the columns and its type from the delta target table
    spark.createDataFrame(
      spark.read.table(target_table).dtypes,
      schema=self.schema, # schema here is just &amp;lt;column_name, data_type&amp;gt;
    ).createOrReplaceGlobalTempView("TARGET_DBX_TABLE_COLUMNS")

    # (left) joining the columns from both source and target to get a list of
    # columns in the source files where we take the column type of target table for any
    # common columns. Giving priority to the column type of source table.
    df_col = spark.sql(
      f"""SELECT
          'CAST(sc.' || s.column_name || ' AS ' || COALESCE(t.data_type, s.data_type) || ') AS ' || s.column_name AS column_name
        FROM
         global_temp.SOURCE_CDC_FILES_VIEW_COLUMNS s
          LEFT JOIN global_temp.TARGET_DBX_TABLE_COLUMNS t
          ON (s.column_name = t.column_name)"""
    )
    columns = ", ".join(list(df_col.toPandas()["column_name"]))

    # Making a spark view from the streaming dataframe
    micro_batch_df.createOrReplaceGlobalTempView("SOURCE_DMS_FILES_VIEW")

    # Making the merge query to merge the streaming DF
    sql_query_for_micro_batch = f"""MERGE INTO &amp;lt;target_table&amp;gt; s
      USING (
        SELECT
         {columns}
        FROM global_temp.SOURCE_CDC_FILES_VIEW sc
          INNER JOIN (
            SELECT {self.unique_key},
                MAX(transact_seq) AS transact_seq
            FROM global_temp.{SOURCE_CDC_FILES_VIEW}
            GROUP BY 1) mc
          ON
            (sc.{self.unique_key} = mc.{self.unique_key}
            AND sc.transact_seq = mc.transact_seq)) b
      ON b.{self.unique_key} = s.{self.unique_key}
      WHEN MATCHED AND b.Op = "U"
       THEN UPDATE SET *
      WHEN MATCHED AND b.Op = "D"
       THEN DELETE
      WHEN NOT MATCHED AND b.Op = "I" OR b.Op = "U"
       THEN INSERT *"""

    LOGGER.info("Executing the merge")
    LOGGER.info(f"Merge SQL: {sql_query_for_micro_batch}")
    spark.sql(sql_query_for_micro_batch)
    LOGGER.info("Merge is done")
    spark.catalog.dropGlobalTempView("SOURCE_CDC_FILES_VIEW_COLUMNS")
    spark.catalog.dropGlobalTempView("TARGET_DBX_TABLE_COLUMNS")
    spark.catalog.dropGlobalTempView("SOURCE_CDC_FILES_VIEW")​&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Tue, 10 Sep 2024 13:06:05 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/problem-with-streaming-jobs-foreachbatch-with-user-isolation/m-p/89293#M37755</guid>
      <dc:creator>prakharcode</dc:creator>
      <dc:date>2024-09-10T13:06:05Z</dc:date>
    </item>
    <item>
      <title>Re: Problem with streaming jobs (foreachBatch) with USER_ISOLATION compute cluster</title>
      <link>https://community.databricks.com/t5/data-engineering/problem-with-streaming-jobs-foreachbatch-with-user-isolation/m-p/103045#M41309</link>
      <description>&lt;P&gt;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/120079"&gt;@prakharcode&lt;/a&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;Thank you for sharing the detailed information about your issue. Before diving into solutions, I want to confirm if this is still an ongoing problem you're facing.&amp;nbsp;Regarding the difference in job performance between "NO_ISOLATION" mode and "USER_ISOLATION" mode, it might be good to dissect the memory space with memory dumps / heap dumps, but generally speaking, I think it makes sense to consider that in "USER_ISOLATION" mode, there are additional isolation layers to ensure that resources are securely allocated to individual users. This isolation can consume extra memory and computational resources, leading to potential OOM errors and performance degradation. For example,&amp;nbsp;"USER_ISOLATION" mode enforces stricter security and isolation policies, which can impact the overall performance of the cluster. This mode is designed to provide better security at the cost of some performance overhead. In your use case, we would have to benchmark and collect more data points to determine more specifically what is the memory utilization and overhead, as well as where is it coming from, what are the biggest contributors/dominators, etc.&lt;/P&gt;</description>
      <pubDate>Mon, 23 Dec 2024 16:44:01 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/problem-with-streaming-jobs-foreachbatch-with-user-isolation/m-p/103045#M41309</guid>
      <dc:creator>VZLA</dc:creator>
      <dc:date>2024-12-23T16:44:01Z</dc:date>
    </item>
  </channel>
</rss>

