09-11-2024 11:41 PM - edited 09-11-2024 11:44 PM
Structured streams with a checkpoint location, which have been running fine for months, can no longer be restarted properly. When restarting they fail with a FileAlreadyExistsException.
I reproduced the issue in the attached pdf. Has anyone else experienced this recently?
Stacktrace:
SparkConnectGrpcException: (org.apache.hadoop.fs.FileAlreadyExistsException) Operation failed: "The specified path already exists.", 409, PUT, https://xxx.dfs.core.windows.net/checkpoint/streams/test_dwight_checkpointing?resource=directory&timeout=90&st=2024-09-12T05:37:38Z&sv=2020-02-10&ske=2024-09-12T07:37:38Z&sig=XXXXX&sktid=a6da9e90-b977-4dcc-8233-5dfdaa40adae&se=2024-09-12T07:03:12Z&sdd=0&skoid=4d4e91d6-eb04-42aaXXXXXXXXXXXXXXXXXX&spr=https&sks=b&skt=2024-09-12T05:37:38Z&sp=racwdxlm&skv=2021-08-06&sr=c, PathAlreadyExists, "The specified path already exists. RequestId:ed31bc49-401f-0069-14db-04c0d5000000 Time:2024-09-12T06:16:27.9368768Z"
JVM stacktrace:
org.apache.hadoop.fs.FileAlreadyExistsException
at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.checkException(AzureBlobFileSystem.java:1699)
at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.mkdirs(AzureBlobFileSystem.java:906)
at com.databricks.common.filesystem.LokiFileSystem.mkdirs(LokiFileSystem.scala:274)
at com.databricks.sql.acl.fs.CredentialScopeFileSystem.mkdirs(CredentialScopeFileSystem.scala:260)
at com.databricks.spark.sql.streaming.AzureCheckpointFileManager.createCheckpointDirectory(DatabricksCheckpointFileManager.scala:316)
at com.databricks.spark.sql.streaming.DatabricksCheckpointFileManager.createCheckpointDirectory(DatabricksCheckpointFileManager.scala:88)
at org.apache.spark.sql.execution.streaming.ResolveWriteToStream$.resolveCheckpointLocation(ResolveWriteToStream.scala:145)
at org.apache.spark.sql.execution.streaming.ResolveWriteToStream$$anonfun$apply$1.applyOrElse(ResolveWriteToStream.scala:45)
at org.apache.spark.sql.execution.streaming.ResolveWriteToStream$$anonfun$apply$1.applyOrElse(ResolveWriteToStream.scala:41)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$2(AnalysisHelper.scala:219)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:83)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$1(AnalysisHelper.scala:219)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:400)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning(AnalysisHelper.scala:217)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning$(AnalysisHelper.scala:213)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsDownWithPruning(LogicalPlan.scala:39)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsWithPruning(AnalysisHelper.scala:102)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsWithPruning$(AnalysisHelper.scala:99)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsWithPruning(LogicalPlan.scala:39)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperators(AnalysisHelper.scala:79)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperators$(AnalysisHelper.scala:78)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:39)
at org.apache.spark.sql.execution.streaming.ResolveWriteToStream$.apply(ResolveWriteToStream.scala:41)
at org.apache.spark.sql.execution.streaming.ResolveWriteToStream$.apply(ResolveWriteToStream.scala:40)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$4(RuleExecutor.scala:312)
at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$3(RuleExecutor.scala:312)
at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
at scala.collection.immutable.List.foldLeft(List.scala:91)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:309)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeBatch$1(RuleExecutor.scala:292)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$9(RuleExecutor.scala:385)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$9$adapted(RuleExecutor.scala:385)
at scala.collection.immutable.List.foreach(List.scala:431)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:385)
at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:256)
at org.apache.spark.sql.catalyst.analysis.Analyzer.executeSameContext(Analyzer.scala:422)
at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$execute$1(Analyzer.scala:415)
at org.apache.spark.sql.catalyst.analysis.AnalysisContext$.withNewAnalysisContext(Analyzer.scala:329)
at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:415)
at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:348)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:248)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:166)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:248)
at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:400)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:407)
at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:399)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:247)
at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:394)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$4(QueryExecution.scala:582)
at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:1103)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:582)
at com.databricks.util.LexicalThreadLocal$Handle.runWith(LexicalThreadLocal.scala:63)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:578)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1175)
at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:578)
at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:241)
at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:240)
at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:324)
at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:417)
at org.apache.spark.sql.streaming.DataStreamWriter.startQuery(DataStreamWriter.scala:530)
at org.apache.spark.sql.streaming.DataStreamWriter.startInternal(DataStreamWriter.scala:462)
at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:275)
at org.apache.spark.sql.connect.planner.SparkConnectPlanner.handleWriteStreamOperationStart(SparkConnectPlanner.scala:3240)
at org.apache.spark.sql.connect.planner.SparkConnectPlanner.process(SparkConnectPlanner.scala:2698)
at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.handleCommand(ExecuteThreadRunner.scala:292)
at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1(ExecuteThreadRunner.scala:234)
at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1$adapted(ExecuteThreadRunner.scala:172)
at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$2(SessionHolder.scala:333)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1175)
at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$1(SessionHolder.scala:333)
at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:97)
at org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:84)
at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:234)
at org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:83)
at org.apache.spark.sql.connect.service.SessionHolder.withSession(SessionHolder.scala:332)
at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.executeInternal(ExecuteThreadRunner.scala:172)
at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.org$apache$spark$sql$connect$execution$ExecuteThreadRunner$$execute(ExecuteThreadRunner.scala:123)
at org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.$anonfun$run$2(ExecuteThreadRunner.scala:358)
at com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:45)
at com.databricks.unity.HandleImpl.runWith(UCSHandle.scala:104)
at com.databricks.unity.HandleImpl.$anonfun$runWithAndClose$1(UCSHandle.scala:109)
at scala.util.Using$.resource(Using.scala:269)
at com.databricks.unity.HandleImpl.runWithAndClose(UCSHandle.scala:108)
at org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.run(ExecuteThreadRunner.scala:357)
Caused by: shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException: Operation failed: "The specified path already exists.", 409, PUT, https://xxx.dfs.core.windows.net/checkpoint/streams/test_dwight_checkpointing?resource=directory&timeout=90&st=2024-09-12T05:37:38Z&sv=2020-02-10&ske=2024-09-12T07:37:38Z&sig=XXXXX&sktid=a6da9e90-b977-4dcc-8233-5dfdaa40adae&se=2024-09-12T07:03:12Z&sdd=0&skoid=4d4e91d6-eb04-42aaXXXXXXXXXXXXXXXXXX&spr=https&sks=b&skt=2024-09-12T05:37:38Z&sp=racwdxlm&skv=2021-08-06&sr=c, PathAlreadyExists, "The specified path already exists. RequestId:ed31bc49-401f-0069-14db-04c0d5000000 Time:2024-09-12T06:16:27.9368768Z"
at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.completeExecute(AbfsRestOperation.java:265)
at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.lambda$execute$0(AbfsRestOperation.java:212)
at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.measureDurationOfInvocation(IOStatisticsBinding.java:494)
at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfInvocation(IOStatisticsBinding.java:465)
at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.execute(AbfsRestOperation.java:210)
at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.services.AbfsClient.createPath(AbfsClient.java:477)
at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.createDirectory(AzureBlobFileSystemStore.java:825)
at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.mkdirs(AzureBlobFileSystem.java:900)
at com.databricks.common.filesystem.LokiFileSystem.mkdirs(LokiFileSystem.scala:274)
at com.databricks.sql.acl.fs.CredentialScopeFileSystem.mkdirs(CredentialScopeFileSystem.scala:260)
at com.databricks.spark.sql.streaming.AzureCheckpointFileManager.createCheckpointDirectory(DatabricksCheckpointFileManager.scala:316)
at com.databricks.spark.sql.streaming.DatabricksCheckpointFileManager.createCheckpointDirectory(DatabricksCheckpointFileManager.scala:88)
at org.apache.spark.sql.execution.streaming.ResolveWriteToStream$.resolveCheckpointLocation(ResolveWriteToStream.scala:145)
at org.apache.spark.sql.execution.streaming.ResolveWriteToStream$$anonfun$apply$1.applyOrElse(ResolveWriteToStream.scala:45)
at org.apache.spark.sql.execution.streaming.ResolveWriteToStream$$anonfun$apply$1.applyOrElse(ResolveWriteToStream.scala:41)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$2(AnalysisHelper.scala:219)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:83)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$1(AnalysisHelper.scala:219)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:400)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning(AnalysisHelper.scala:217)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning$(AnalysisHelper.scala:213)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsDownWithPruning(LogicalPlan.scala:39)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsWithPruning(AnalysisHelper.scala:102)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsWithPruning$(AnalysisHelper.scala:99)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsWithPruning(LogicalPlan.scala:39)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperators(AnalysisHelper.scala:79)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperators$(AnalysisHelper.scala:78)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:39)
at org.apache.spark.sql.execution.streaming.ResolveWriteToStream$.apply(ResolveWriteToStream.scala:41)
at org.apache.spark.sql.execution.streaming.ResolveWriteToStream$.apply(ResolveWriteToStream.scala:40)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$4(RuleExecutor.scala:312)
at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$3(RuleExecutor.scala:312)
at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
at scala.collection.immutable.List.foldLeft(List.scala:91)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:309)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeBatch$1(RuleExecutor.scala:292)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$9(RuleExecutor.scala:385)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$9$adapted(RuleExecutor.scala:385)
at scala.collection.immutable.List.foreach(List.scala:431)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:385)
at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:256)
at org.apache.spark.sql.catalyst.analysis.Analyzer.executeSameContext(Analyzer.scala:422)
at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$execute$1(Analyzer.scala:415)
at org.apache.spark.sql.catalyst.analysis.AnalysisContext$.withNewAnalysisContext(Analyzer.scala:329)
at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:415)
at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:348)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:248)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:166)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:248)
at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:400)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:407)
at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:399)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:247)
at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:394)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$4(QueryExecution.scala:582)
at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:1103)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:582)
at com.databricks.util.LexicalThreadLocal$Handle.runWith(LexicalThreadLocal.scala:63)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:578)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1175)
at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:578)
at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:241)
at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:240)
at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:324)
at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:417)
at org.apache.spark.sql.streaming.DataStreamWriter.startQuery(DataStreamWriter.scala:530)
at org.apache.spark.sql.streaming.DataStreamWriter.startInternal(DataStreamWriter.scala:462)
at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:275)
at org.apache.spark.sql.connect.planner.SparkConnectPlanner.handleWriteStreamOperationStart(SparkConnectPlanner.scala:3240)
at org.apache.spark.sql.connect.planner.SparkConnectPlanner.process(SparkConnectPlanner.scala:2698)
at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.handleCommand(ExecuteThreadRunner.scala:292)
at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1(ExecuteThreadRunner.scala:234)
at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1$adapted(ExecuteThreadRunner.scala:172)
at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$2(SessionHolder.scala:333)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1175)
at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$1(SessionHolder.scala:333)
at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:97)
at org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:84)
at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:234)
at org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:83)
at org.apache.spark.sql.connect.service.SessionHolder.withSession(SessionHolder.scala:332)
at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.executeInternal(ExecuteThreadRunner.scala:172)
at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.org$apache$spark$sql$connect$execution$ExecuteThreadRunner$$execute(ExecuteThreadRunner.scala:123)
at org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.$anonfun$run$2(ExecuteThreadRunner.scala:358)
at com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:45)
at com.databricks.unity.HandleImpl.runWith(UCSHandle.scala:104)
at com.databricks.unity.HandleImpl.$anonfun$runWithAndClose$1(UCSHandle.scala:109)
at scala.util.Using$.resource(Using.scala:269)
at com.databricks.unity.HandleImpl.runWithAndClose(UCSHandle.scala:108)
at org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.run(ExecuteThreadRunner.scala:357)
File <command-4134719625933011>, line 3
1 # cancel the running command above to stop the stream
2 # restart the stream once stopped, this time not removing the checkpoint dir
----> 3 start_stream()
File <command-4134719625933012>, line 24, in start_stream()
10 def start_stream():
11 """Starts streaming from the source table."""
12 (
13 spark.readStream
14 .format("delta")
15 .options(**{
16 "ignoreDeletes": "true",
17 "skipChangeCommits": "true",
18 "readChangeData": "false"
19 })
20 .table("bronze_dev.ignition.historian")
21 .writeStream.format("delta")
22 .foreachBatch(noop)
23 .option("checkpointLocation", checkpoint_dir)
---> 24 .start()
25 )
File /databricks/spark/python/pyspark/sql/connect/streaming/readwriter.py:641, in DataStreamWriter.start(self, path, format, outputMode, partitionBy, queryName, **options)
632 def start(
633 self,
634 path: Optional[str] = None,
(...)
639 **options: "OptionalPrimitiveType",
640 ) -> StreamingQuery:
--> 641 return self._start_internal(
642 path=path,
643 tableName=None,
644 format=format,
645 outputMode=outputMode,
646 partitionBy=partitionBy,
647 queryName=queryName,
648 **options,
649 )
File /databricks/spark/python/pyspark/sql/connect/streaming/readwriter.py:605, in DataStreamWriter._start_internal(self, path, tableName, format, outputMode, partitionBy, queryName, **options)
602 self._write_proto.table_name = tableName
604 cmd = self._write_stream.command(self._session.client)
--> 605 (_, properties) = self._session.client.execute_command(cmd)
607 start_result = cast(
608 pb2.WriteStreamOperationStartResult, properties["write_stream_operation_start_result"]
609 )
610 query = StreamingQuery(
611 session=self._session,
612 queryId=start_result.query_id.id,
613 runId=start_result.query_id.run_id,
614 name=start_result.name,
615 )
File /databricks/spark/python/pyspark/sql/connect/client/core.py:1205, in SparkConnectClient.execute_command(self, command, observations, extra_request_metadata)
1203 req.user_context.user_id = self._user_id
1204 req.plan.command.CopyFrom(command)
-> 1205 data, _, _, _, properties = self._execute_and_fetch(
1206 req, observations or {}, extra_request_metadata
1207 )
1208 if data is not None:
1209 return (data.to_pandas(), properties)
File /databricks/spark/python/pyspark/sql/connect/client/core.py:1627, in SparkConnectClient._execute_and_fetch(self, req, observations, extra_request_metadata, self_destruct)
1624 schema: Optional[StructType] = None
1625 properties: Dict[str, Any] = {}
-> 1627 for response in self._execute_and_fetch_as_iterator(
1628 req, observations, extra_request_metadata or []
1629 ):
1630 if isinstance(response, StructType):
1631 schema = response
File /databricks/spark/python/pyspark/sql/connect/client/core.py:1604, in SparkConnectClient._execute_and_fetch_as_iterator(self, req, observations, extra_request_metadata)
1602 yield from handle_response(b)
1603 except Exception as error:
-> 1604 self._handle_error(error)
File /databricks/spark/python/pyspark/sql/connect/client/core.py:1913, in SparkConnectClient._handle_error(self, error)
1911 self.thread_local.inside_error_handling = True
1912 if isinstance(error, grpc.RpcError):
-> 1913 self._handle_rpc_error(error)
1914 elif isinstance(error, ValueError):
1915 if "Cannot invoke RPC" in str(error) and "closed" in str(error):
File /databricks/spark/python/pyspark/sql/connect/client/core.py:1988, in SparkConnectClient._handle_rpc_error(self, rpc_error)
1985 info = error_details_pb2.ErrorInfo()
1986 d.Unpack(info)
-> 1988 raise convert_exception(
1989 info,
1990 status.message,
1991 self._fetch_enriched_error(info),
1992 self._display_server_stack_trace(),
1993 ) from None
1995 raise SparkConnectGrpcException(status.message) from None
1996 else:__
09-12-2024 05:03 AM
Hi !
Yes, I have the same issue since 2 days...
There is another forum talking about it : https://learn.microsoft.com/en-us/answers/questions/2007592/how-to-fix-the-specified-path-already-ex...
On my side, I create a ticket on the azure support. But still no solution. They propose to add this configuration : fs.azure.enable.mkdir.overwrite : true in the pipeline configuration. But it doesn't work for me.
09-12-2024 05:44 AM
Thanks for the sanity check @YoannBoyere. That particular config setting doesn't make a difference for me either.
I've commented on that ms thread as well, for good measure.
09-19-2024 02:10 AM
Apparently the issue was caused by a change in Azure, which was hotfixed cfr https://learn.microsoft.com/en-us/answers/questions/2007592/how-to-fix-the-specified-path-already-ex....
Checkpoints work fine now. We included a recovery attempt in our workflow that restarts streams while resetting checkpoints whenever they fail for whatever reason.
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group