cancel
Showing results forĀ 
Search instead forĀ 
Did you mean:Ā 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forĀ 
Search instead forĀ 
Did you mean:Ā 

FileAlreadyExistsException when restarting a structured stream with checkpoint (DBR 14.3)

Dwight
New Contributor II

Structured streams with a checkpoint location, which have been running fine for months, can no longer be restarted properly. When restarting they fail with a FileAlreadyExistsException.

I reproduced the issue in the attached pdf. Has anyone else experienced this recently?

Stacktrace:

 

SparkConnectGrpcException: (org.apache.hadoop.fs.FileAlreadyExistsException) Operation failed: "The specified path already exists.", 409, PUT, https://xxx.dfs.core.windows.net/checkpoint/streams/test_dwight_checkpointing?resource=directory&timeout=90&st=2024-09-12T05:37:38Z&sv=2020-02-10&ske=2024-09-12T07:37:38Z&sig=XXXXX&sktid=a6da9e90-b977-4dcc-8233-5dfdaa40adae&se=2024-09-12T07:03:12Z&sdd=0&skoid=4d4e91d6-eb04-42aaXXXXXXXXXXXXXXXXXX&spr=https&sks=b&skt=2024-09-12T05:37:38Z&sp=racwdxlm&skv=2021-08-06&sr=c, PathAlreadyExists, "The specified path already exists. RequestId:ed31bc49-401f-0069-14db-04c0d5000000 Time:2024-09-12T06:16:27.9368768Z"

JVM stacktrace:
org.apache.hadoop.fs.FileAlreadyExistsException
	at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.checkException(AzureBlobFileSystem.java:1699)
	at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.mkdirs(AzureBlobFileSystem.java:906)
	at com.databricks.common.filesystem.LokiFileSystem.mkdirs(LokiFileSystem.scala:274)
	at com.databricks.sql.acl.fs.CredentialScopeFileSystem.mkdirs(CredentialScopeFileSystem.scala:260)
	at com.databricks.spark.sql.streaming.AzureCheckpointFileManager.createCheckpointDirectory(DatabricksCheckpointFileManager.scala:316)
	at com.databricks.spark.sql.streaming.DatabricksCheckpointFileManager.createCheckpointDirectory(DatabricksCheckpointFileManager.scala:88)
	at org.apache.spark.sql.execution.streaming.ResolveWriteToStream$.resolveCheckpointLocation(ResolveWriteToStream.scala:145)
	at org.apache.spark.sql.execution.streaming.ResolveWriteToStream$$anonfun$apply$1.applyOrElse(ResolveWriteToStream.scala:45)
	at org.apache.spark.sql.execution.streaming.ResolveWriteToStream$$anonfun$apply$1.applyOrElse(ResolveWriteToStream.scala:41)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$2(AnalysisHelper.scala:219)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:83)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$1(AnalysisHelper.scala:219)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:400)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning(AnalysisHelper.scala:217)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning$(AnalysisHelper.scala:213)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsDownWithPruning(LogicalPlan.scala:39)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsWithPruning(AnalysisHelper.scala:102)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsWithPruning$(AnalysisHelper.scala:99)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsWithPruning(LogicalPlan.scala:39)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperators(AnalysisHelper.scala:79)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperators$(AnalysisHelper.scala:78)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:39)
	at org.apache.spark.sql.execution.streaming.ResolveWriteToStream$.apply(ResolveWriteToStream.scala:41)
	at org.apache.spark.sql.execution.streaming.ResolveWriteToStream$.apply(ResolveWriteToStream.scala:40)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$4(RuleExecutor.scala:312)
	at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$3(RuleExecutor.scala:312)
	at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
	at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
	at scala.collection.immutable.List.foldLeft(List.scala:91)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:309)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeBatch$1(RuleExecutor.scala:292)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$9(RuleExecutor.scala:385)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$9$adapted(RuleExecutor.scala:385)
	at scala.collection.immutable.List.foreach(List.scala:431)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:385)
	at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:256)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.executeSameContext(Analyzer.scala:422)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$execute$1(Analyzer.scala:415)
	at org.apache.spark.sql.catalyst.analysis.AnalysisContext$.withNewAnalysisContext(Analyzer.scala:329)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:415)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:348)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:248)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:166)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:248)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:400)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:407)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:399)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:247)
	at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:394)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$4(QueryExecution.scala:582)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:1103)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:582)
	at com.databricks.util.LexicalThreadLocal$Handle.runWith(LexicalThreadLocal.scala:63)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:578)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1175)
	at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:578)
	at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:241)
	at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:240)
	at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:324)
	at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:417)
	at org.apache.spark.sql.streaming.DataStreamWriter.startQuery(DataStreamWriter.scala:530)
	at org.apache.spark.sql.streaming.DataStreamWriter.startInternal(DataStreamWriter.scala:462)
	at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:275)
	at org.apache.spark.sql.connect.planner.SparkConnectPlanner.handleWriteStreamOperationStart(SparkConnectPlanner.scala:3240)
	at org.apache.spark.sql.connect.planner.SparkConnectPlanner.process(SparkConnectPlanner.scala:2698)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.handleCommand(ExecuteThreadRunner.scala:292)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1(ExecuteThreadRunner.scala:234)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1$adapted(ExecuteThreadRunner.scala:172)
	at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$2(SessionHolder.scala:333)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1175)
	at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$1(SessionHolder.scala:333)
	at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:97)
	at org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:84)
	at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:234)
	at org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:83)
	at org.apache.spark.sql.connect.service.SessionHolder.withSession(SessionHolder.scala:332)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.executeInternal(ExecuteThreadRunner.scala:172)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.org$apache$spark$sql$connect$execution$ExecuteThreadRunner$$execute(ExecuteThreadRunner.scala:123)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.$anonfun$run$2(ExecuteThreadRunner.scala:358)
	at com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:45)
	at com.databricks.unity.HandleImpl.runWith(UCSHandle.scala:104)
	at com.databricks.unity.HandleImpl.$anonfun$runWithAndClose$1(UCSHandle.scala:109)
	at scala.util.Using$.resource(Using.scala:269)
	at com.databricks.unity.HandleImpl.runWithAndClose(UCSHandle.scala:108)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.run(ExecuteThreadRunner.scala:357)
Caused by: shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException: Operation failed: "The specified path already exists.", 409, PUT, https://xxx.dfs.core.windows.net/checkpoint/streams/test_dwight_checkpointing?resource=directory&timeout=90&st=2024-09-12T05:37:38Z&sv=2020-02-10&ske=2024-09-12T07:37:38Z&sig=XXXXX&sktid=a6da9e90-b977-4dcc-8233-5dfdaa40adae&se=2024-09-12T07:03:12Z&sdd=0&skoid=4d4e91d6-eb04-42aaXXXXXXXXXXXXXXXXXX&spr=https&sks=b&skt=2024-09-12T05:37:38Z&sp=racwdxlm&skv=2021-08-06&sr=c, PathAlreadyExists, "The specified path already exists. RequestId:ed31bc49-401f-0069-14db-04c0d5000000 Time:2024-09-12T06:16:27.9368768Z"
	at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.completeExecute(AbfsRestOperation.java:265)
	at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.lambda$execute$0(AbfsRestOperation.java:212)
	at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.measureDurationOfInvocation(IOStatisticsBinding.java:494)
	at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfInvocation(IOStatisticsBinding.java:465)
	at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.execute(AbfsRestOperation.java:210)
	at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.services.AbfsClient.createPath(AbfsClient.java:477)
	at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.createDirectory(AzureBlobFileSystemStore.java:825)
	at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.mkdirs(AzureBlobFileSystem.java:900)
	at com.databricks.common.filesystem.LokiFileSystem.mkdirs(LokiFileSystem.scala:274)
	at com.databricks.sql.acl.fs.CredentialScopeFileSystem.mkdirs(CredentialScopeFileSystem.scala:260)
	at com.databricks.spark.sql.streaming.AzureCheckpointFileManager.createCheckpointDirectory(DatabricksCheckpointFileManager.scala:316)
	at com.databricks.spark.sql.streaming.DatabricksCheckpointFileManager.createCheckpointDirectory(DatabricksCheckpointFileManager.scala:88)
	at org.apache.spark.sql.execution.streaming.ResolveWriteToStream$.resolveCheckpointLocation(ResolveWriteToStream.scala:145)
	at org.apache.spark.sql.execution.streaming.ResolveWriteToStream$$anonfun$apply$1.applyOrElse(ResolveWriteToStream.scala:45)
	at org.apache.spark.sql.execution.streaming.ResolveWriteToStream$$anonfun$apply$1.applyOrElse(ResolveWriteToStream.scala:41)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$2(AnalysisHelper.scala:219)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:83)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$1(AnalysisHelper.scala:219)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:400)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning(AnalysisHelper.scala:217)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning$(AnalysisHelper.scala:213)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsDownWithPruning(LogicalPlan.scala:39)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsWithPruning(AnalysisHelper.scala:102)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsWithPruning$(AnalysisHelper.scala:99)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsWithPruning(LogicalPlan.scala:39)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperators(AnalysisHelper.scala:79)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperators$(AnalysisHelper.scala:78)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:39)
	at org.apache.spark.sql.execution.streaming.ResolveWriteToStream$.apply(ResolveWriteToStream.scala:41)
	at org.apache.spark.sql.execution.streaming.ResolveWriteToStream$.apply(ResolveWriteToStream.scala:40)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$4(RuleExecutor.scala:312)
	at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$3(RuleExecutor.scala:312)
	at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
	at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
	at scala.collection.immutable.List.foldLeft(List.scala:91)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:309)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeBatch$1(RuleExecutor.scala:292)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$9(RuleExecutor.scala:385)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$9$adapted(RuleExecutor.scala:385)
	at scala.collection.immutable.List.foreach(List.scala:431)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:385)
	at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:256)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.executeSameContext(Analyzer.scala:422)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$execute$1(Analyzer.scala:415)
	at org.apache.spark.sql.catalyst.analysis.AnalysisContext$.withNewAnalysisContext(Analyzer.scala:329)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:415)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:348)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:248)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:166)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:248)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:400)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:407)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:399)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:247)
	at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:394)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$4(QueryExecution.scala:582)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:1103)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:582)
	at com.databricks.util.LexicalThreadLocal$Handle.runWith(LexicalThreadLocal.scala:63)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:578)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1175)
	at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:578)
	at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:241)
	at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:240)
	at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:324)
	at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:417)
	at org.apache.spark.sql.streaming.DataStreamWriter.startQuery(DataStreamWriter.scala:530)
	at org.apache.spark.sql.streaming.DataStreamWriter.startInternal(DataStreamWriter.scala:462)
	at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:275)
	at org.apache.spark.sql.connect.planner.SparkConnectPlanner.handleWriteStreamOperationStart(SparkConnectPlanner.scala:3240)
	at org.apache.spark.sql.connect.planner.SparkConnectPlanner.process(SparkConnectPlanner.scala:2698)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.handleCommand(ExecuteThreadRunner.scala:292)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1(ExecuteThreadRunner.scala:234)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1$adapted(ExecuteThreadRunner.scala:172)
	at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$2(SessionHolder.scala:333)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1175)
	at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$1(SessionHolder.scala:333)
	at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:97)
	at org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:84)
	at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:234)
	at org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:83)
	at org.apache.spark.sql.connect.service.SessionHolder.withSession(SessionHolder.scala:332)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.executeInternal(ExecuteThreadRunner.scala:172)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.org$apache$spark$sql$connect$execution$ExecuteThreadRunner$$execute(ExecuteThreadRunner.scala:123)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.$anonfun$run$2(ExecuteThreadRunner.scala:358)
	at com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:45)
	at com.databricks.unity.HandleImpl.runWith(UCSHandle.scala:104)
	at com.databricks.unity.HandleImpl.$anonfun$runWithAndClose$1(UCSHandle.scala:109)
	at scala.util.Using$.resource(Using.scala:269)
	at com.databricks.unity.HandleImpl.runWithAndClose(UCSHandle.scala:108)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.run(ExecuteThreadRunner.scala:357)
File <command-4134719625933011>, line 3
      1 # cancel the running command above to stop the stream
      2 # restart the stream once stopped, this time not removing the checkpoint dir
----> 3 start_stream()
File <command-4134719625933012>, line 24, in start_stream()
     10 def start_stream():
     11     """Starts streaming from the source table."""
     12     (
     13         spark.readStream
     14         .format("delta")
     15         .options(**{
     16             "ignoreDeletes": "true", 
     17             "skipChangeCommits": "true",
     18             "readChangeData": "false"
     19         })
     20         .table("bronze_dev.ignition.historian")
     21         .writeStream.format("delta")
     22         .foreachBatch(noop)
     23         .option("checkpointLocation", checkpoint_dir)
---> 24         .start()
     25     )
File /databricks/spark/python/pyspark/sql/connect/streaming/readwriter.py:641, in DataStreamWriter.start(self, path, format, outputMode, partitionBy, queryName, **options)
    632 def start(
    633     self,
    634     path: Optional[str] = None,
   (...)
    639     **options: "OptionalPrimitiveType",
    640 ) -> StreamingQuery:
--> 641     return self._start_internal(
    642         path=path,
    643         tableName=None,
    644         format=format,
    645         outputMode=outputMode,
    646         partitionBy=partitionBy,
    647         queryName=queryName,
    648         **options,
    649     )
File /databricks/spark/python/pyspark/sql/connect/streaming/readwriter.py:605, in DataStreamWriter._start_internal(self, path, tableName, format, outputMode, partitionBy, queryName, **options)
    602     self._write_proto.table_name = tableName
    604 cmd = self._write_stream.command(self._session.client)
--> 605 (_, properties) = self._session.client.execute_command(cmd)
    607 start_result = cast(
    608     pb2.WriteStreamOperationStartResult, properties["write_stream_operation_start_result"]
    609 )
    610 query = StreamingQuery(
    611     session=self._session,
    612     queryId=start_result.query_id.id,
    613     runId=start_result.query_id.run_id,
    614     name=start_result.name,
    615 )
File /databricks/spark/python/pyspark/sql/connect/client/core.py:1205, in SparkConnectClient.execute_command(self, command, observations, extra_request_metadata)
   1203     req.user_context.user_id = self._user_id
   1204 req.plan.command.CopyFrom(command)
-> 1205 data, _, _, _, properties = self._execute_and_fetch(
   1206     req, observations or {}, extra_request_metadata
   1207 )
   1208 if data is not None:
   1209     return (data.to_pandas(), properties)
File /databricks/spark/python/pyspark/sql/connect/client/core.py:1627, in SparkConnectClient._execute_and_fetch(self, req, observations, extra_request_metadata, self_destruct)
   1624 schema: Optional[StructType] = None
   1625 properties: Dict[str, Any] = {}
-> 1627 for response in self._execute_and_fetch_as_iterator(
   1628     req, observations, extra_request_metadata or []
   1629 ):
   1630     if isinstance(response, StructType):
   1631         schema = response
File /databricks/spark/python/pyspark/sql/connect/client/core.py:1604, in SparkConnectClient._execute_and_fetch_as_iterator(self, req, observations, extra_request_metadata)
   1602                     yield from handle_response(b)
   1603 except Exception as error:
-> 1604     self._handle_error(error)
File /databricks/spark/python/pyspark/sql/connect/client/core.py:1913, in SparkConnectClient._handle_error(self, error)
   1911 self.thread_local.inside_error_handling = True
   1912 if isinstance(error, grpc.RpcError):
-> 1913     self._handle_rpc_error(error)
   1914 elif isinstance(error, ValueError):
   1915     if "Cannot invoke RPC" in str(error) and "closed" in str(error):
File /databricks/spark/python/pyspark/sql/connect/client/core.py:1988, in SparkConnectClient._handle_rpc_error(self, rpc_error)
   1985             info = error_details_pb2.ErrorInfo()
   1986             d.Unpack(info)
-> 1988             raise convert_exception(
   1989                 info,
   1990                 status.message,
   1991                 self._fetch_enriched_error(info),
   1992                 self._display_server_stack_trace(),
   1993             ) from None
   1995     raise SparkConnectGrpcException(status.message) from None
   1996 else:__

 

 

3 REPLIES 3

YoannBoyere
New Contributor II

Hi ! 
Yes, I have the same issue since 2 days... 
There is another forum talking about it : https://learn.microsoft.com/en-us/answers/questions/2007592/how-to-fix-the-specified-path-already-ex...

On my side, I create a ticket on the azure support. But still no solution. They propose to add this configuration : fs.azure.enable.mkdir.overwrite : true in the pipeline configuration. But it doesn't work for me.

Dwight
New Contributor II

Thanks for the sanity check @YoannBoyere. That particular config setting doesn't make a difference for me either.
I've commented on that ms thread as well, for good measure.

Dwight
New Contributor II

Apparently the issue was caused by a change in Azure, which was hotfixed cfr https://learn.microsoft.com/en-us/answers/questions/2007592/how-to-fix-the-specified-path-already-ex....

Checkpoints work fine now. We included a recovery attempt in our workflow that restarts streams while resetting checkpoints whenever they fail for whatever reason.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonā€™t want to miss the chance to attend and share knowledge.

If there isnā€™t a group near you, start one and help create a community that brings people together.

Request a New Group