<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Double job execution caused by databricks' RemoteServiceExec using databricks-connector in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/double-job-execution-caused-by-databricks-remoteserviceexec/m-p/16274#M10475</link>
    <description>&lt;P&gt;&lt;/P&gt;
&lt;P&gt; Hello!&lt;/P&gt;
&lt;P&gt; I'm using databricks-connector to launch spark jobs using python.&lt;/P&gt;
&lt;P&gt; I've validated that the python version (3.8.10) and runtime version (8.1) are supported by the installed databricks-connect (8.1.10).&lt;/P&gt;
&lt;P&gt; Everytime a mapPartitions/foreachPartition action is created this results in &lt;B&gt;two spark jobs executing&lt;/B&gt;, one after the other, duplicating every stage/step that happened before it.&lt;/P&gt;
&lt;P&gt; An example code follows:&lt;/P&gt;
&lt;PRE&gt;&lt;CODE&gt;#!/usr/bin/env python
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, LongType
schema = StructType([
    StructField('key', LongType(), True),
    StructField('value', StringType(), True)
])
spark = SparkSession.builder.appName('test').getOrCreate()
data = spark.read.schema(schema) \
    .option('header', 'true') \
    .csv('s3://path/to.csv')
def fun(rows):
    print(f"Got a partition with {len(list(rows))} rows")
# these only trigger one job
# data.collect()
# data.count()
# this triggers two!
data.foreachPartition(fun)
&lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;
This executes two jobs (which is fast in this example but not in real world code!):&lt;/P&gt;&lt;P&gt;The first job, which is the one that I'm not sure why it spawns:&lt;/P&gt;
&lt;span class="lia-inline-image-display-wrapper" image-alt="0693f000007OoMBAA0"&gt;&lt;img src="https://community.databricks.com/t5/image/serverpage/image-id/2449iB9E295944DE164E1/image-size/large?v=v2&amp;amp;px=999" role="button" title="0693f000007OoMBAA0" alt="0693f000007OoMBAA0" /&gt;&lt;/span&gt;
&lt;PRE&gt;&lt;CODE&gt;org.apache.spark.rdd.RDD.foreach(RDD.scala:1015)
com.databricks.service.RemoteServiceExec.doExecute(RemoteServiceExec.scala:244)
org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:196)
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:240)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:165)
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:236)
org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:192)
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:163)
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:162)
org.apache.spark.sql.Dataset.javaToPython(Dataset.scala:3569)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:498)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
py4j.Gateway.invoke(Gateway.java:295)
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
py4j.commands.CallCommand.execute(CallCommand.java:79)
py4j.GatewayConnection.run(GatewayConnection.java:251)
&lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt;
    And then the actual job:&lt;/P&gt;&lt;P&gt;
    &lt;span class="lia-inline-image-display-wrapper" image-alt="0693f000007OoMAAA0"&gt;&lt;img src="https://community.databricks.com/t5/image/serverpage/image-id/2460iD2CB3FDBFE16C352/image-size/large?v=v2&amp;amp;px=999" role="button" title="0693f000007OoMAAA0" alt="0693f000007OoMAAA0" /&gt;&lt;/span&gt;&lt;/P&gt;org.apache.spark.rdd.RDD.collect(RDD.scala:1034)
org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:260)
org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:498)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
py4j.Gateway.invoke(Gateway.java:295)
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
py4j.commands.CallCommand.execute(CallCommand.java:79)
py4j.GatewayConnection.run(GatewayConnection.java:251)
java.lang.Thread.run(Thread.java:748)
&lt;P&gt;&lt;/P&gt;&lt;P&gt;
    Any idea why this happens and how I can prevent the first job to run and only run the actual code?&lt;/P&gt;&lt;P&gt;
    I've confirmed that in the first pass, none of the code in the foreachPartitions runs.&lt;/P&gt;&lt;P&gt;
    Using .cache() is not recommended for real world scenarios because the datasets are large and would take even longer to persist than to execute the job again (possibly failing on disk availability).&lt;/P&gt;&lt;P&gt;
    One thing this shows is that it looks related to databricks' RemoteServiceExec code. Maybe its unknowingly causing the dataset/rdds to be materialized?&lt;/P&gt;&lt;P&gt;
    Anyone can help?&lt;/P&gt;&lt;P&gt;
    Thanks&lt;/P&gt;
&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;</description>
    <pubDate>Wed, 18 Aug 2021 18:48:02 GMT</pubDate>
    <dc:creator>JoãoRafael</dc:creator>
    <dc:date>2021-08-18T18:48:02Z</dc:date>
    <item>
      <title>Double job execution caused by databricks' RemoteServiceExec using databricks-connector</title>
      <link>https://community.databricks.com/t5/data-engineering/double-job-execution-caused-by-databricks-remoteserviceexec/m-p/16274#M10475</link>
      <description>&lt;P&gt;&lt;/P&gt;
&lt;P&gt; Hello!&lt;/P&gt;
&lt;P&gt; I'm using databricks-connector to launch spark jobs using python.&lt;/P&gt;
&lt;P&gt; I've validated that the python version (3.8.10) and runtime version (8.1) are supported by the installed databricks-connect (8.1.10).&lt;/P&gt;
&lt;P&gt; Everytime a mapPartitions/foreachPartition action is created this results in &lt;B&gt;two spark jobs executing&lt;/B&gt;, one after the other, duplicating every stage/step that happened before it.&lt;/P&gt;
&lt;P&gt; An example code follows:&lt;/P&gt;
&lt;PRE&gt;&lt;CODE&gt;#!/usr/bin/env python
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, LongType
schema = StructType([
    StructField('key', LongType(), True),
    StructField('value', StringType(), True)
])
spark = SparkSession.builder.appName('test').getOrCreate()
data = spark.read.schema(schema) \
    .option('header', 'true') \
    .csv('s3://path/to.csv')
def fun(rows):
    print(f"Got a partition with {len(list(rows))} rows")
# these only trigger one job
# data.collect()
# data.count()
# this triggers two!
data.foreachPartition(fun)
&lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;
This executes two jobs (which is fast in this example but not in real world code!):&lt;/P&gt;&lt;P&gt;The first job, which is the one that I'm not sure why it spawns:&lt;/P&gt;
&lt;span class="lia-inline-image-display-wrapper" image-alt="0693f000007OoMBAA0"&gt;&lt;img src="https://community.databricks.com/t5/image/serverpage/image-id/2449iB9E295944DE164E1/image-size/large?v=v2&amp;amp;px=999" role="button" title="0693f000007OoMBAA0" alt="0693f000007OoMBAA0" /&gt;&lt;/span&gt;
&lt;PRE&gt;&lt;CODE&gt;org.apache.spark.rdd.RDD.foreach(RDD.scala:1015)
com.databricks.service.RemoteServiceExec.doExecute(RemoteServiceExec.scala:244)
org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:196)
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:240)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:165)
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:236)
org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:192)
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:163)
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:162)
org.apache.spark.sql.Dataset.javaToPython(Dataset.scala:3569)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:498)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
py4j.Gateway.invoke(Gateway.java:295)
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
py4j.commands.CallCommand.execute(CallCommand.java:79)
py4j.GatewayConnection.run(GatewayConnection.java:251)
&lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt;
    And then the actual job:&lt;/P&gt;&lt;P&gt;
    &lt;span class="lia-inline-image-display-wrapper" image-alt="0693f000007OoMAAA0"&gt;&lt;img src="https://community.databricks.com/t5/image/serverpage/image-id/2460iD2CB3FDBFE16C352/image-size/large?v=v2&amp;amp;px=999" role="button" title="0693f000007OoMAAA0" alt="0693f000007OoMAAA0" /&gt;&lt;/span&gt;&lt;/P&gt;org.apache.spark.rdd.RDD.collect(RDD.scala:1034)
org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:260)
org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:498)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
py4j.Gateway.invoke(Gateway.java:295)
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
py4j.commands.CallCommand.execute(CallCommand.java:79)
py4j.GatewayConnection.run(GatewayConnection.java:251)
java.lang.Thread.run(Thread.java:748)
&lt;P&gt;&lt;/P&gt;&lt;P&gt;
    Any idea why this happens and how I can prevent the first job to run and only run the actual code?&lt;/P&gt;&lt;P&gt;
    I've confirmed that in the first pass, none of the code in the foreachPartitions runs.&lt;/P&gt;&lt;P&gt;
    Using .cache() is not recommended for real world scenarios because the datasets are large and would take even longer to persist than to execute the job again (possibly failing on disk availability).&lt;/P&gt;&lt;P&gt;
    One thing this shows is that it looks related to databricks' RemoteServiceExec code. Maybe its unknowingly causing the dataset/rdds to be materialized?&lt;/P&gt;&lt;P&gt;
    Anyone can help?&lt;/P&gt;&lt;P&gt;
    Thanks&lt;/P&gt;
&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;</description>
      <pubDate>Wed, 18 Aug 2021 18:48:02 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/double-job-execution-caused-by-databricks-remoteserviceexec/m-p/16274#M10475</guid>
      <dc:creator>JoãoRafael</dc:creator>
      <dc:date>2021-08-18T18:48:02Z</dc:date>
    </item>
    <item>
      <title>Re: Double job execution caused by databricks' RemoteServiceExec using databricks-connector</title>
      <link>https://community.databricks.com/t5/data-engineering/double-job-execution-caused-by-databricks-remoteserviceexec/m-p/16275#M10476</link>
      <description>&lt;P&gt;&lt;/P&gt;
&lt;P&gt;I've also confirmed this doesn't happen in vanilla spark in local mode. Nor does it happen when running the same code directly in a databricks notebook. &lt;/P&gt; 
&lt;P&gt;&lt;/P&gt;</description>
      <pubDate>Thu, 19 Aug 2021 13:28:33 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/double-job-execution-caused-by-databricks-remoteserviceexec/m-p/16275#M10476</guid>
      <dc:creator>JoãoRafael</dc:creator>
      <dc:date>2021-08-19T13:28:33Z</dc:date>
    </item>
    <item>
      <title>Re: Double job execution caused by databricks' RemoteServiceExec using databricks-connector</title>
      <link>https://community.databricks.com/t5/data-engineering/double-job-execution-caused-by-databricks-remoteserviceexec/m-p/16276#M10477</link>
      <description>&lt;P&gt;A community forum to discuss working with Databricks Cloud and Spark. ... Double job execution caused by databricks' RemoteServiceExec using databrick.&lt;/P&gt;&lt;P&gt;MyBalanceNow&lt;/P&gt;&lt;P&gt;&lt;/P&gt;</description>
      <pubDate>Fri, 20 Aug 2021 10:09:50 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/double-job-execution-caused-by-databricks-remoteserviceexec/m-p/16276#M10477</guid>
      <dc:creator>Anonymous</dc:creator>
      <dc:date>2021-08-20T10:09:50Z</dc:date>
    </item>
    <item>
      <title>Re: Double job execution caused by databricks' RemoteServiceExec using databricks-connector</title>
      <link>https://community.databricks.com/t5/data-engineering/double-job-execution-caused-by-databricks-remoteserviceexec/m-p/16277#M10478</link>
      <description>&lt;P&gt;&lt;/P&gt;
&lt;P&gt;I don't understand your comment. This only happens in Databricks cloud. &lt;/P&gt; 
&lt;P&gt;&lt;/P&gt;</description>
      <pubDate>Fri, 20 Aug 2021 13:31:30 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/double-job-execution-caused-by-databricks-remoteserviceexec/m-p/16277#M10478</guid>
      <dc:creator>JoãoRafael</dc:creator>
      <dc:date>2021-08-20T13:31:30Z</dc:date>
    </item>
  </channel>
</rss>

