Let's assume we have a workspace folder containing two Python files.
module1 with a simple addition function:
def add_numbers(a, b):
return a + b
module2 with a dummy PySpark custom data source:
from pyspark.sql.datasource import DataSource, DataSourceReader, InputPartition
class DummyDataSourceReader(DataSourceReader):
def __init__(self, schema, options):
self.schema = schema
self.options = options
def partitions(self):
partitions = [InputPartition(x) for x in ["a", "b", "c"]]
return partitions
def read(self, partition: InputPartition):
for row in range(5):
item = [row, partition.value]
yield tuple(item)
class DummyDataSource(DataSource):
def __init__(self, options):
self.options = options
@classmethod
def name(cls):
return "dummy"
def schema(self):
self.options["schema"] = "value int, group string"
return self.options["schema"]
def reader(self, schema):
return DummyDataSourceReader(schema, self.options)
Now let's create a fresh notebook and connect it to a multi-node cluster on DBR 16.t with single-user (dedicated) access mode.
First step is to add the location of the two modules to the Python sys path.
Then consuming code from module1 one via a UDF works fine.

The custom data source works fine as well.

Let's switch gears and change the cluster to standard access mode (i.e. shared).
The UDF still works fine but the custom data source breaks with a module not found error.

I will leave the whole stack trace at the end of this message.
The docs have this:
In Databricks Runtime 13.3 LTS and above, directories added to the Python sys.path, or directories that are structured as Python packages, are automatically distributed to all executors in the cluster. In Databricks Runtime 12.2 LTS and below, libraries added to the sys.path must be explicitly installed on executors.
I am using DBR 16.4 LTS hence expect any modules registered via sys.path to be available across all executors and this is what happens with the UDF case. But unfortuantley it does not work well with the PySpark custom data source at least in shared access mode.
Is there anything wrong/missing I am doing or is there a way to make things work in shared access mode.
P.S. Due to some other factors, I cannot use DABs or build my code into wheels, etc. Prefer to have the clean OOTB behaviour that is found with the UDF.
Stack trace:
[
PYTHON_DATA_SOURCE_ERROR] Failed to create Python data source instance: Traceback (most recent call last): File "/databricks/spark/python/pyspark/serializers.py", line 192, in _read_with_length return self.loads(obj) ^^^^^^^^^^^^^^^ File "/databricks/spark/python/pyspark/serializers.py", line 617, in loads return cloudpickle.loads(obj, encoding=encoding) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ ModuleNotFoundError: No module named 'module2' During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/databricks/spark/python/pyspark/sql/worker/create_data_source.py", line 86, in main data_source_cls = read_command(pickleSer, infile) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/databricks/spark/python/pyspark/worker_util.py", line 71, in read_command command = serializer._read_with_length(file) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/databricks/spark/python/pyspark/serializers.py", line 196, in _read_with_length raise SerializationError("Caused by " + traceback.format_exc()) pyspark.serializers.SerializationError: Caused by Traceback (most recent call last): File "/databricks/spark/python/pyspark/serializers.py", line 192, in _read_with_length return self.loads(obj) ^^^^^^^^^^^^^^^ File "/databricks/spark/python/pyspark/serializers.py", line 617, in loads return cloudpickle.loads(obj, encoding=encoding) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ ModuleNotFoundError: No module named 'module2' SQLSTATE: 38000 JVM stacktrace: org.apache.spark.sql.AnalysisException at org.apache.spark.sql.errors.QueryCompilationErrors$.pythonDataSourceError(QueryCompilationErrors.scala:2972) at org.apache.spark.sql.execution.datasources.v2.python.UserDefinedPythonDataSourceRunner.receiveFromPython(UserDefinedPythonDataSource.scala:322) at org.apache.spark.sql.execution.datasources.v2.python.UserDefinedPythonDataSourceRunner.receiveFromPython(UserDefinedPythonDataSource.scala:287) at org.apache.spark.sql.execution.python.PythonPlannerRunner.runInPython(PythonPlannerRunner.scala:201) at org.apache.spark.sql.execution.datasources.v2.python.UserDefinedPythonDataSource.createDataSourceInPython(UserDefinedPythonDataSource.scala:72) at org.apache.spark.sql.execution.datasources.v2.python.PythonDataSourceV2.getOrCreateDataSourceInPython(PythonDataSourceV2.scala:50) at org.apache.spark.sql.execution.datasources.v2.python.PythonDataSourceV2.inferSchema(PythonDataSourceV2.scala:60) at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils$.getTableFromProvider(DataSourceV2Utils.scala:104) at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils$.loadV2Source(DataSourceV2Utils.scala:250) at org.apache.spark.sql.catalyst.analysis.ResolveDataSource$$anonfun$apply$1.$anonfun$applyOrElse$1(ResolveDataSource.scala:96) at scala.Option.flatMap(Option.scala:271) at org.apache.spark.sql.catalyst.analysis.ResolveDataSource$$anonfun$apply$1.applyOrElse(ResolveDataSource.scala:94) at org.apache.spark.sql.catalyst.analysis.ResolveDataSource$$anonfun$apply$1.applyOrElse(ResolveDataSource.scala:58) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$3(AnalysisHelper.scala:141) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:85) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$1(AnalysisHelper.scala:141) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:418) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning(AnalysisHelper.scala:137) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning$(AnalysisHelper.scala:133) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUpWithPruning(LogicalPlan.scala:42) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUp(AnalysisHelper.scala:114) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUp$(AnalysisHelper.scala:113) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUp(LogicalPlan.scala:42) at org.apache.spark.sql.catalyst.analysis.ResolveDataSource.apply(ResolveDataSource.scala:58) at org.apache.spark.sql.catalyst.analysis.ResolveDataSource.apply(ResolveDataSource.scala:56) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$16(RuleExecutor.scala:480) at org.apache.spark.sql.catalyst.rules.RecoverableRuleExecutionHelper.processRule(RuleExecutor.scala:629) at org.apache.spark.sql.catalyst.rules.RecoverableRuleExecutionHelper.processRule$(RuleExecutor.scala:613) at org.apache.spark.sql.catalyst.rules.RuleExecutor.processRule(RuleExecutor.scala:131) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$15(RuleExecutor.scala:480) at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$14(RuleExecutor.scala:479) at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126) at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122) at scala.collection.immutable.List.foldLeft(List.scala:91) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$13(RuleExecutor.scala:475) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94) at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeBatch$1(RuleExecutor.scala:452) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$22(RuleExecutor.scala:585) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$22$adapted(RuleExecutor.scala:585) at scala.collection.immutable.List.foreach(List.scala:431) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:585) at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94) at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:349) at org.apache.spark.sql.catalyst.analysis.Analyzer.executeSameContext(Analyzer.scala:507) at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$execute$1(Analyzer.scala:500) at org.apache.spark.sql.catalyst.analysis.AnalysisContext$.withNewAnalysisContext(Analyzer.scala:406) at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:500) at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:425) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:341) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:219) at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:341) at org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.resolveInFixedPoint(HybridAnalyzer.scala:252) at org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.$anonfun$apply$1(HybridAnalyzer.scala:96) at org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.withTrackedAnalyzerBridgeState(HybridAnalyzer.scala:131) at org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.apply(HybridAnalyzer.scala:87) at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:487) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:425) at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:487) at org.apache.spark.sql.execution.QueryExecution.$anonfun$lazyAnalyzed$3(QueryExecution.scala:308) at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94) at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:548) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$6(QueryExecution.scala:703) at org.apache.spark.sql.execution.SQLExecution$.withExecutionPhase(SQLExecution.scala:152) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$5(QueryExecution.scala:703) at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:1342) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:696) at com.databricks.util.LexicalThreadLocal$Handle.runWith(LexicalThreadLocal.scala:63) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:692) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1462) at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:692) at org.apache.spark.sql.execution.QueryExecution.$anonfun$lazyAnalyzed$2(QueryExecution.scala:295) at com.databricks.sql.util.MemoryTrackerHelper.withMemoryTracking(MemoryTrackerHelper.scala:80) at org.apache.spark.sql.execution.QueryExecution.$anonfun$lazyAnalyzed$1(QueryExecution.scala:294) at scala.util.Try$.apply(Try.scala:213) at org.apache.spark.util.Utils$.doTryWithCallerStacktrace(Utils.scala:1684) at org.apache.spark.util.Utils$.getTryWithCallerStacktrace(Utils.scala:1745) at org.apache.spark.util.LazyTry.get(LazyTry.scala:58) at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:340) at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:274) at org.apache.spark.sql.Dataset$.$anonfun$ofRows$1(Dataset.scala:108) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1462) at org.apache.spark.sql.SparkSession.$anonfun$withActiveAndFrameProfiler$1(SparkSession.scala:1469) at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94) at org.apache.spark.sql.SparkSession.withActiveAndFrameProfiler(SparkSession.scala:1469) at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:106) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:265) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:224) at org.apache.spark.sql.connect.planner.SparkConnectPlanner.transformReadRel(SparkConnectPlanner.scala:1811) at org.apache.spark.sql.connect.planner.SparkConnectPlanner.$anonfun$transformRelation$1(SparkConnectPlanner.scala:190) at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$usePlanCache$8(SessionHolder.scala:619) at org.apache.spark.sql.connect.service.SessionHolder.measureSubtreeRelationNodes(SessionHolder.scala:635) at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$usePlanCache$6(SessionHolder.scala:618) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.sql.connect.service.SessionHolder.usePlanCache(SessionHolder.scala:616) at org.apache.spark.sql.connect.planner.SparkConnectPlanner.transformRelation(SparkConnectPlanner.scala:185) at org.apache.spark.sql.connect.service.SparkConnectAnalyzeHandler.transformRelation$1(SparkConnectAnalyzeHandler.scala:119) at org.apache.spark.sql.connect.service.SparkConnectAnalyzeHandler.process(SparkConnectAnalyzeHandler.scala:213) at org.apache.spark.sql.connect.service.SparkConnectAnalyzeHandler.$anonfun$handle$3(SparkConnectAnalyzeHandler.scala:104) at org.apache.spark.sql.connect.service.SparkConnectAnalyzeHandler.$anonfun$handle$3$adapted(SparkConnectAnalyzeHandler.scala:66) at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$2(SessionHolder.scala:464) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1462) at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$1(SessionHolder.scala:464) at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:97) at org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:90) at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:241) at org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:89) at org.apache.spark.sql.connect.service.SessionHolder.withSession(SessionHolder.scala:463) at org.apache.spark.sql.connect.service.SparkConnectAnalyzeHandler.$anonfun$handle$1(SparkConnectAnalyzeHandler.scala:66) at org.apache.spark.sql.connect.service.SparkConnectAnalyzeHandler.$anonfun$handle$1$adapted(SparkConnectAnalyzeHandler.scala:51) at com.databricks.spark.connect.logging.rpc.SparkConnectRpcMetricsCollectorUtils$.collectMetrics(SparkConnectRpcMetricsCollector.scala:258) at org.apache.spark.sql.connect.service.SparkConnectAnalyzeHandler.handle(SparkConnectAnalyzeHandler.scala:50) at org.apache.spark.sql.connect.service.SparkConnectService.analyzePlan(SparkConnectService.scala:109) at org.apache.spark.connect.proto.SparkConnectServiceGrpc$MethodHandlers.invoke(SparkConnectServiceGrpc.java:801) at grpc_shaded.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:182) at grpc_shaded.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35) at grpc_shaded.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23) at grpc_shaded.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40) at grpc_shaded.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35) at grpc_shaded.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23) at grpc_shaded.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40) at com.databricks.spark.connect.service.AuthenticationInterceptor$AuthenticatedServerCallListener.$anonfun$onHalfClose$1(AuthenticationInterceptor.scala:381) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:51) at com.databricks.unity.HandleImpl.runWith(UCSHandle.scala:104) at com.databricks.spark.connect.service.RequestContext.$anonfun$runWith$3(RequestContext.scala:337) at com.databricks.spark.connect.service.RequestContext$.com$databricks$spark$connect$service$RequestContext$$withLocalProperties(RequestContext.scala:537) at com.databricks.spark.connect.service.RequestContext.$anonfun$runWith$2(RequestContext.scala:337) at com.databricks.logging.AttributionContextTracing.$anonfun$withAttributionContext$1(AttributionContextTracing.scala:49) at com.databricks.logging.AttributionContext$.$anonfun$withValue$1(AttributionContext.scala:293) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62) at com.databricks.logging.AttributionContext$.withValue(AttributionContext.scala:289) at com.databricks.logging.AttributionContextTracing.withAttributionContext(AttributionContextTracing.scala:47) at com.databricks.logging.AttributionContextTracing.withAttributionContext$(AttributionContextTracing.scala:44) at com.databricks.spark.util.PublicDBLogging.withAttributionContext(DatabricksSparkUsageLogger.scala:30) at com.databricks.spark.util.UniverseAttributionContextWrapper.withValue(AttributionContextUtils.scala:242) at com.databricks.spark.connect.service.RequestContext.$anonfun$runWith$1(RequestContext.scala:336) at com.databricks.spark.connect.service.RequestContext.withContext(RequestContext.scala:349) at com.databricks.spark.connect.service.RequestContext.runWith(RequestContext.scala:329) at com.databricks.spark.connect.service.AuthenticationInterceptor$AuthenticatedServerCallListener.onHalfClose(AuthenticationInterceptor.scala:381) at grpc_shaded.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:351) at grpc_shaded.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:861) at grpc_shaded.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) at grpc_shaded.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133) at org.apache.spark.util.threads.SparkThreadLocalCapturingRunnable.$anonfun$run$1(SparkThreadLocalForwardingThreadPoolExecutor.scala:165) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.util.LexicalThreadLocal$Handle.runWith(LexicalThreadLocal.scala:63) at org.apache.spark.util.threads.SparkThreadLocalCapturingHelper.$anonfun$runWithCaptured$6(SparkThreadLocalForwardingThreadPoolExecutor.scala:119) at com.databricks.sql.transaction.tahoe.mst.MSTThreadHelper$.runWithMstTxnId(MSTThreadHelper.scala:57) at org.apache.spark.util.threads.SparkThreadLocalCapturingHelper.$anonfun$runWithCaptured$5(SparkThreadLocalForwardingThreadPoolExecutor.scala:118) at com.databricks.spark.util.IdentityClaim$.withClaim(IdentityClaim.scala:48) at org.apache.spark.util.threads.SparkThreadLocalCapturingHelper.$anonfun$runWithCaptured$4(SparkThreadLocalForwardingThreadPoolExecutor.scala:117) at com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:51) at org.apache.spark.util.threads.SparkThreadLocalCapturingHelper.runWithCaptured(SparkThreadLocalForwardingThreadPoolExecutor.scala:116) at org.apache.spark.util.threads.SparkThreadLocalCapturingHelper.runWithCaptured$(SparkThreadLocalForwardingThreadPoolExecutor.scala:93) at org.apache.spark.util.threads.SparkThreadLocalCapturingRunnable.runWithCaptured(SparkThreadLocalForwardingThreadPoolExecutor.scala:162) at org.apache.spark.util.threads.SparkThreadLocalCapturingRunnable.run(SparkThreadLocalForwardingThreadPoolExecutor.scala:165) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.lang.Thread.run(Thread.java:840)
File
<command-5056821242561777>, line 3 1 from module2 import DummyDataSource
2 spark
.dataSource
.register(DummyDataSource)
----> 3 spark
.read
.format(
"dummy")
.load()
.display()
File /databricks/python_shell/lib/dbruntime/monkey_patches.py:73, in apply_dataframe_display_patch.<locals>.df_display(df, *args, **kwargs) 69 def df_display(df, *args, **kwargs): 70 """ 71 df.display() is an alias for display(df). Run help(display) for more information. 72 """ ---> 73 display(df, *args, **kwargs)
File /databricks/python_shell/lib/dbruntime/display.py:133, in Display.display(self, input, *args, **kwargs) 131 self.display_connect_table(input, **kwargs) 132 elif isinstance(input, ConnectDataFrame): --> 133 if input.isStreaming: 134 handleStreamingConnectDataFramePy4j(input, self.entry_point, kwargs) 135 else:
File /usr/lib/python3.12/functools.py:995, in cached_property.__get__(self, instance, owner) 993 val = cache.get(self.attrname, _NOT_FOUND) 994 if val is _NOT_FOUND: --> 995 val = self.func(instance) 996 try: 997 cache[self.attrname] = val