cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Directories added to the Python sys.path do not always work fine on executors for shared access mod

Yousry_Ibrahim
New Contributor

Let's assume we have a workspace folder containing two Python files.

module1 with a simple addition function:

def add_numbers(a, b):
  return a + b

module2 with a dummy PySpark custom data source:

from pyspark.sql.datasource import DataSource, DataSourceReader, InputPartition

class DummyDataSourceReader(DataSourceReader):
    def __init__(self, schema, options):
        self.schema = schema
        self.options = options

    def partitions(self):
        partitions = [InputPartition(x) for x in ["a", "b", "c"]]
        return partitions

    def read(self, partition: InputPartition):
        for row in range(5):
            item = [row, partition.value]
            yield tuple(item)

class DummyDataSource(DataSource):
    def __init__(self, options):
        self.options = options

    @classmethod
    def name(cls):
        return "dummy"

    def schema(self):
        self.options["schema"] = "value int, group string"
        return self.options["schema"]
    
    def reader(self, schema):
        return DummyDataSourceReader(schema, self.options)

Now let's create a fresh notebook and connect it to a multi-node cluster on DBR 16.t with single-user (dedicated) access mode.

First step is to add the location of the two modules to the Python sys path.
Then consuming code from module1 one via a UDF works fine.

Yousry_Ibrahim_3-1756774969049.png

The custom data source works fine as well.

Yousry_Ibrahim_1-1756774189101.png

Let's switch gears and change the cluster to standard access mode (i.e. shared).

The UDF still works fine but the custom data source breaks with a module not found error.

Yousry_Ibrahim_2-1756774813091.png

I will leave the whole stack trace at the end of this message.

The docs have this:
In Databricks Runtime 13.3 LTS and above, directories added to the Python sys.path, or directories that are structured as Python packages, are automatically distributed to all executors in the cluster. In Databricks Runtime 12.2 LTS and below, libraries added to the sys.path must be explicitly installed on executors.

I am using DBR 16.4 LTS hence expect any modules registered via sys.path to be available across all executors and this is what happens with the UDF case. But unfortuantley it does not work well with the PySpark custom data source at least in shared access mode.

Is there anything wrong/missing I am doing or is there a way to make things work in shared access mode.
P.S. Due to some other factors, I cannot use DABs or build my code into wheels, etc. Prefer to have the clean OOTB behaviour that is found with the UDF.

Stack trace:

[PYTHON_DATA_SOURCE_ERROR] Failed to create Python data source instance: Traceback (most recent call last): File "/databricks/spark/python/pyspark/serializers.py", line 192, in _read_with_length return self.loads(obj) ^^^^^^^^^^^^^^^ File "/databricks/spark/python/pyspark/serializers.py", line 617, in loads return cloudpickle.loads(obj, encoding=encoding) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ ModuleNotFoundError: No module named 'module2' During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/databricks/spark/python/pyspark/sql/worker/create_data_source.py", line 86, in main data_source_cls = read_command(pickleSer, infile) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/databricks/spark/python/pyspark/worker_util.py", line 71, in read_command command = serializer._read_with_length(file) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/databricks/spark/python/pyspark/serializers.py", line 196, in _read_with_length raise SerializationError("Caused by " + traceback.format_exc()) pyspark.serializers.SerializationError: Caused by Traceback (most recent call last): File "/databricks/spark/python/pyspark/serializers.py", line 192, in _read_with_length return self.loads(obj) ^^^^^^^^^^^^^^^ File "/databricks/spark/python/pyspark/serializers.py", line 617, in loads return cloudpickle.loads(obj, encoding=encoding) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ ModuleNotFoundError: No module named 'module2' SQLSTATE: 38000 JVM stacktrace: org.apache.spark.sql.AnalysisException at org.apache.spark.sql.errors.QueryCompilationErrors$.pythonDataSourceError(QueryCompilationErrors.scala:2972) at org.apache.spark.sql.execution.datasources.v2.python.UserDefinedPythonDataSourceRunner.receiveFromPython(UserDefinedPythonDataSource.scala:322) at org.apache.spark.sql.execution.datasources.v2.python.UserDefinedPythonDataSourceRunner.receiveFromPython(UserDefinedPythonDataSource.scala:287) at org.apache.spark.sql.execution.python.PythonPlannerRunner.runInPython(PythonPlannerRunner.scala:201) at org.apache.spark.sql.execution.datasources.v2.python.UserDefinedPythonDataSource.createDataSourceInPython(UserDefinedPythonDataSource.scala:72) at org.apache.spark.sql.execution.datasources.v2.python.PythonDataSourceV2.getOrCreateDataSourceInPython(PythonDataSourceV2.scala:50) at org.apache.spark.sql.execution.datasources.v2.python.PythonDataSourceV2.inferSchema(PythonDataSourceV2.scala:60) at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils$.getTableFromProvider(DataSourceV2Utils.scala:104) at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils$.loadV2Source(DataSourceV2Utils.scala:250) at org.apache.spark.sql.catalyst.analysis.ResolveDataSource$$anonfun$apply$1.$anonfun$applyOrElse$1(ResolveDataSource.scala:96) at scala.Option.flatMap(Option.scala:271) at org.apache.spark.sql.catalyst.analysis.ResolveDataSource$$anonfun$apply$1.applyOrElse(ResolveDataSource.scala:94) at org.apache.spark.sql.catalyst.analysis.ResolveDataSource$$anonfun$apply$1.applyOrElse(ResolveDataSource.scala:58) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$3(AnalysisHelper.scala:141) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:85) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$1(AnalysisHelper.scala:141) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:418) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning(AnalysisHelper.scala:137) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning$(AnalysisHelper.scala:133) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUpWithPruning(LogicalPlan.scala:42) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUp(AnalysisHelper.scala:114) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUp$(AnalysisHelper.scala:113) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUp(LogicalPlan.scala:42) at org.apache.spark.sql.catalyst.analysis.ResolveDataSource.apply(ResolveDataSource.scala:58) at org.apache.spark.sql.catalyst.analysis.ResolveDataSource.apply(ResolveDataSource.scala:56) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$16(RuleExecutor.scala:480) at org.apache.spark.sql.catalyst.rules.RecoverableRuleExecutionHelper.processRule(RuleExecutor.scala:629) at org.apache.spark.sql.catalyst.rules.RecoverableRuleExecutionHelper.processRule$(RuleExecutor.scala:613) at org.apache.spark.sql.catalyst.rules.RuleExecutor.processRule(RuleExecutor.scala:131) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$15(RuleExecutor.scala:480) at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$14(RuleExecutor.scala:479) at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126) at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122) at scala.collection.immutable.List.foldLeft(List.scala:91) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$13(RuleExecutor.scala:475) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94) at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeBatch$1(RuleExecutor.scala:452) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$22(RuleExecutor.scala:585) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$22$adapted(RuleExecutor.scala:585) at scala.collection.immutable.List.foreach(List.scala:431) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:585) at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94) at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:349) at org.apache.spark.sql.catalyst.analysis.Analyzer.executeSameContext(Analyzer.scala:507) at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$execute$1(Analyzer.scala:500) at org.apache.spark.sql.catalyst.analysis.AnalysisContext$.withNewAnalysisContext(Analyzer.scala:406) at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:500) at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:425) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:341) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:219) at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:341) at org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.resolveInFixedPoint(HybridAnalyzer.scala:252) at org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.$anonfun$apply$1(HybridAnalyzer.scala:96) at org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.withTrackedAnalyzerBridgeState(HybridAnalyzer.scala:131) at org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.apply(HybridAnalyzer.scala:87) at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:487) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:425) at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:487) at org.apache.spark.sql.execution.QueryExecution.$anonfun$lazyAnalyzed$3(QueryExecution.scala:308) at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94) at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:548) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$6(QueryExecution.scala:703) at org.apache.spark.sql.execution.SQLExecution$.withExecutionPhase(SQLExecution.scala:152) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$5(QueryExecution.scala:703) at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:1342) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:696) at com.databricks.util.LexicalThreadLocal$Handle.runWith(LexicalThreadLocal.scala:63) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:692) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1462) at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:692) at org.apache.spark.sql.execution.QueryExecution.$anonfun$lazyAnalyzed$2(QueryExecution.scala:295) at com.databricks.sql.util.MemoryTrackerHelper.withMemoryTracking(MemoryTrackerHelper.scala:80) at org.apache.spark.sql.execution.QueryExecution.$anonfun$lazyAnalyzed$1(QueryExecution.scala:294) at scala.util.Try$.apply(Try.scala:213) at org.apache.spark.util.Utils$.doTryWithCallerStacktrace(Utils.scala:1684) at org.apache.spark.util.Utils$.getTryWithCallerStacktrace(Utils.scala:1745) at org.apache.spark.util.LazyTry.get(LazyTry.scala:58) at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:340) at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:274) at org.apache.spark.sql.Dataset$.$anonfun$ofRows$1(Dataset.scala:108) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1462) at org.apache.spark.sql.SparkSession.$anonfun$withActiveAndFrameProfiler$1(SparkSession.scala:1469) at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94) at org.apache.spark.sql.SparkSession.withActiveAndFrameProfiler(SparkSession.scala:1469) at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:106) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:265) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:224) at org.apache.spark.sql.connect.planner.SparkConnectPlanner.transformReadRel(SparkConnectPlanner.scala:1811) at org.apache.spark.sql.connect.planner.SparkConnectPlanner.$anonfun$transformRelation$1(SparkConnectPlanner.scala:190) at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$usePlanCache$8(SessionHolder.scala:619) at org.apache.spark.sql.connect.service.SessionHolder.measureSubtreeRelationNodes(SessionHolder.scala:635) at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$usePlanCache$6(SessionHolder.scala:618) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.sql.connect.service.SessionHolder.usePlanCache(SessionHolder.scala:616) at org.apache.spark.sql.connect.planner.SparkConnectPlanner.transformRelation(SparkConnectPlanner.scala:185) at org.apache.spark.sql.connect.service.SparkConnectAnalyzeHandler.transformRelation$1(SparkConnectAnalyzeHandler.scala:119) at org.apache.spark.sql.connect.service.SparkConnectAnalyzeHandler.process(SparkConnectAnalyzeHandler.scala:213) at org.apache.spark.sql.connect.service.SparkConnectAnalyzeHandler.$anonfun$handle$3(SparkConnectAnalyzeHandler.scala:104) at org.apache.spark.sql.connect.service.SparkConnectAnalyzeHandler.$anonfun$handle$3$adapted(SparkConnectAnalyzeHandler.scala:66) at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$2(SessionHolder.scala:464) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1462) at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$1(SessionHolder.scala:464) at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:97) at org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:90) at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:241) at org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:89) at org.apache.spark.sql.connect.service.SessionHolder.withSession(SessionHolder.scala:463) at org.apache.spark.sql.connect.service.SparkConnectAnalyzeHandler.$anonfun$handle$1(SparkConnectAnalyzeHandler.scala:66) at org.apache.spark.sql.connect.service.SparkConnectAnalyzeHandler.$anonfun$handle$1$adapted(SparkConnectAnalyzeHandler.scala:51) at com.databricks.spark.connect.logging.rpc.SparkConnectRpcMetricsCollectorUtils$.collectMetrics(SparkConnectRpcMetricsCollector.scala:258) at org.apache.spark.sql.connect.service.SparkConnectAnalyzeHandler.handle(SparkConnectAnalyzeHandler.scala:50) at org.apache.spark.sql.connect.service.SparkConnectService.analyzePlan(SparkConnectService.scala:109) at org.apache.spark.connect.proto.SparkConnectServiceGrpc$MethodHandlers.invoke(SparkConnectServiceGrpc.java:801) at grpc_shaded.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:182) at grpc_shaded.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35) at grpc_shaded.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23) at grpc_shaded.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40) at grpc_shaded.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35) at grpc_shaded.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23) at grpc_shaded.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40) at com.databricks.spark.connect.service.AuthenticationInterceptor$AuthenticatedServerCallListener.$anonfun$onHalfClose$1(AuthenticationInterceptor.scala:381) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:51) at com.databricks.unity.HandleImpl.runWith(UCSHandle.scala:104) at com.databricks.spark.connect.service.RequestContext.$anonfun$runWith$3(RequestContext.scala:337) at com.databricks.spark.connect.service.RequestContext$.com$databricks$spark$connect$service$RequestContext$$withLocalProperties(RequestContext.scala:537) at com.databricks.spark.connect.service.RequestContext.$anonfun$runWith$2(RequestContext.scala:337) at com.databricks.logging.AttributionContextTracing.$anonfun$withAttributionContext$1(AttributionContextTracing.scala:49) at com.databricks.logging.AttributionContext$.$anonfun$withValue$1(AttributionContext.scala:293) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62) at com.databricks.logging.AttributionContext$.withValue(AttributionContext.scala:289) at com.databricks.logging.AttributionContextTracing.withAttributionContext(AttributionContextTracing.scala:47) at com.databricks.logging.AttributionContextTracing.withAttributionContext$(AttributionContextTracing.scala:44) at com.databricks.spark.util.PublicDBLogging.withAttributionContext(DatabricksSparkUsageLogger.scala:30) at com.databricks.spark.util.UniverseAttributionContextWrapper.withValue(AttributionContextUtils.scala:242) at com.databricks.spark.connect.service.RequestContext.$anonfun$runWith$1(RequestContext.scala:336) at com.databricks.spark.connect.service.RequestContext.withContext(RequestContext.scala:349) at com.databricks.spark.connect.service.RequestContext.runWith(RequestContext.scala:329) at com.databricks.spark.connect.service.AuthenticationInterceptor$AuthenticatedServerCallListener.onHalfClose(AuthenticationInterceptor.scala:381) at grpc_shaded.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:351) at grpc_shaded.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:861) at grpc_shaded.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) at grpc_shaded.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133) at org.apache.spark.util.threads.SparkThreadLocalCapturingRunnable.$anonfun$run$1(SparkThreadLocalForwardingThreadPoolExecutor.scala:165) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.util.LexicalThreadLocal$Handle.runWith(LexicalThreadLocal.scala:63) at org.apache.spark.util.threads.SparkThreadLocalCapturingHelper.$anonfun$runWithCaptured$6(SparkThreadLocalForwardingThreadPoolExecutor.scala:119) at com.databricks.sql.transaction.tahoe.mst.MSTThreadHelper$.runWithMstTxnId(MSTThreadHelper.scala:57) at org.apache.spark.util.threads.SparkThreadLocalCapturingHelper.$anonfun$runWithCaptured$5(SparkThreadLocalForwardingThreadPoolExecutor.scala:118) at com.databricks.spark.util.IdentityClaim$.withClaim(IdentityClaim.scala:48) at org.apache.spark.util.threads.SparkThreadLocalCapturingHelper.$anonfun$runWithCaptured$4(SparkThreadLocalForwardingThreadPoolExecutor.scala:117) at com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:51) at org.apache.spark.util.threads.SparkThreadLocalCapturingHelper.runWithCaptured(SparkThreadLocalForwardingThreadPoolExecutor.scala:116) at org.apache.spark.util.threads.SparkThreadLocalCapturingHelper.runWithCaptured$(SparkThreadLocalForwardingThreadPoolExecutor.scala:93) at org.apache.spark.util.threads.SparkThreadLocalCapturingRunnable.runWithCaptured(SparkThreadLocalForwardingThreadPoolExecutor.scala:162) at org.apache.spark.util.threads.SparkThreadLocalCapturingRunnable.run(SparkThreadLocalForwardingThreadPoolExecutor.scala:165) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.lang.Thread.run(Thread.java:840)
File <command-5056821242561777>, line 3 1 from module2 import DummyDataSource 2 spark.dataSource.register(DummyDataSource) ----> 3 spark.read.format("dummy").load().display()
File /databricks/python_shell/lib/dbruntime/monkey_patches.py:73, in apply_dataframe_display_patch.<locals>.df_display(df, *args, **kwargs) 69 def df_display(df, *args, **kwargs): 70 """ 71 df.display() is an alias for display(df). Run help(display) for more information. 72 """ ---> 73 display(df, *args, **kwargs)
File /databricks/python_shell/lib/dbruntime/display.py:133, in Display.display(self, input, *args, **kwargs) 131 self.display_connect_table(input, **kwargs) 132 elif isinstance(input, ConnectDataFrame): --> 133 if input.isStreaming: 134 handleStreamingConnectDataFramePy4j(input, self.entry_point, kwargs) 135 else:
File /usr/lib/python3.12/functools.py:995, in cached_property.__get__(self, instance, owner) 993 val = cache.get(self.attrname, _NOT_FOUND) 994 if val is _NOT_FOUND: --> 995 val = self.func(instance) 996 try: 997 cache[self.attrname] = val

 

 

 

 

1 ACCEPTED SOLUTION

Accepted Solutions

ck7007
New Contributor II

@Yousry_Ibrahim This is a known limitation with PySpark custom data sources in shared access mode. The issue is that custom data sources serialize differently from UDFs.

Root Cause

Custom data sources use cloudpickle serialization, which doesn't properly capture the sys.path modifications in shared clusters. UDFs work because they use a different serialization path.

Solution 1: Add Module to Spark Files

# Add the module file explicitly to the Spark context
spark.sparkContext.addPyFile("/Workspace/path/to/module2.py")

# Now import and register
from module2 import DummyDataSource
spark.dataSource.register(DummyDataSource)
spark.read.format("dummy"). load(). display()

Solution 2: Package as an Init-Script

Create an init script that adds your modules to the Python path on all nodes:
#!/bin/bash
# cluster-init.sh
echo "export PYTHONPATH=/Workspace/your_modules:$PYTHONPATH" >> /databricks/spark/conf/spark-env.sh

Solution 3: Inline the Data Source (Temporary Fix)

For testing, define the data source directly in the notebook:
# Define the classes in the notebook itself
exec(open('/Workspace/path/to/module2.py').read())
spark.dataSource.register(DummyDataSource)

Why This Happens

  • UDFs: Executed in Python worker processes that inherit sys.path
  • Data Sources: Serialized at the driver and deserialized at executors without sys.path context
  • Shared Mode: Additional isolation prevents path propagation

The addPyFile approach is the most reliable for shared clusters. It ensures the module is distributed to all executors before deserialization.

Have you tried Solution 1? It should work immediately without a cluster restart.

View solution in original post

8 REPLIES 8

-werners-
Esteemed Contributor III

shared access mode does not have the same functionalities as single-user.
https://docs.databricks.com/aws/en/compute/standard-limitations
Your issue is not specifically mentioned but chances are real it is because of the access mode.
Any particular reason why you would want to use shared?

 

szymon_dybczak
Esteemed Contributor III

Hi @Yousry_Ibrahim 

I think this could be serialization problem. I've recreated your scenario using cluster with shared access mode and I got the same error:

szymon_dybczak_0-1756843086870.png

But look what happens if I don't use sys.path.append but instead I use absolute import - now it works:

szymon_dybczak_1-1756843211873.png

 

 

Thanks @szymon_dybczak , it is a great idea.
I will give it a go but in my case the notebook and the modules live in completely different hierarchies so there will be some sort of relative path handling.
The sys.path.append is declared by Databricks to refelect on the executors also. It works fine with UDFs but not for custom source at least on dedicated mode.

No problem @Yousry_Ibrahim . But I agree with you and this is something that I also wonder about. As you wrote, this path should also be distributed to the workers. However, as we can see, this is not happening.
In the documentation, there is no mention anywhere that in shared access mode the path will not be added to the executors.
So in theory what you're trying to do should work.

Hi @-werners- 
The reason why I am keen to use shared mode is due to some imposed templates prepared by the DevOps team. I don't have full control on the clusters or access mode I can use.
There is a solution proposed by @szymon_dybczak  and I will see how it works in my case.

ck7007
New Contributor II

@Yousry_Ibrahim This is a known limitation with PySpark custom data sources in shared access mode. The issue is that custom data sources serialize differently from UDFs.

Root Cause

Custom data sources use cloudpickle serialization, which doesn't properly capture the sys.path modifications in shared clusters. UDFs work because they use a different serialization path.

Solution 1: Add Module to Spark Files

# Add the module file explicitly to the Spark context
spark.sparkContext.addPyFile("/Workspace/path/to/module2.py")

# Now import and register
from module2 import DummyDataSource
spark.dataSource.register(DummyDataSource)
spark.read.format("dummy"). load(). display()

Solution 2: Package as an Init-Script

Create an init script that adds your modules to the Python path on all nodes:
#!/bin/bash
# cluster-init.sh
echo "export PYTHONPATH=/Workspace/your_modules:$PYTHONPATH" >> /databricks/spark/conf/spark-env.sh

Solution 3: Inline the Data Source (Temporary Fix)

For testing, define the data source directly in the notebook:
# Define the classes in the notebook itself
exec(open('/Workspace/path/to/module2.py').read())
spark.dataSource.register(DummyDataSource)

Why This Happens

  • UDFs: Executed in Python worker processes that inherit sys.path
  • Data Sources: Serialized at the driver and deserialized at executors without sys.path context
  • Shared Mode: Additional isolation prevents path propagation

The addPyFile approach is the most reliable for shared clusters. It ensures the module is distributed to all executors before deserialization.

Have you tried Solution 1? It should work immediately without a cluster restart.

szymon_dybczak
Esteemed Contributor III

If this is a well known limitation then share some links with us. For LLMs every problem is well known I guess 😄

Yousry_Ibrahim
New Contributor

Hi all,
Thanks for the feedback and proposed ideas.
@szymon_dybczak  Your idea of relative imports work when the module is hosted in a child directory to the current running notebook. It does not work if we need to go up one or two directories and navigate from there.

The error in such case is "ImportError: attempted relative import with no known parent package". 
I will accept the solution proposed by @ck7007 
Practically speaking, I have to stick with things like "%run notebook" due to some other limitations.
Regarding the porposed options:

  1. "spark.sparkContext.addPyFile" is also not supported on shared clusters
  2. The CI templates I am bound to use does not allow tweaking init scripts easily
  3. I tried the "exec" method before and it is working but feels a bit hacky and will get so complicated if we have modules depending on other modules and so on. It would become tricky easily.

If I have better control, I would have just used dedicated mode and off you go.

Thanks all.

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now