When attempting to write a dataframe to SQL Server using the provided "sqlserver" connector or the old "jdbc" one, it seems to be breaking due to old values used to build the dataframe. The code I'm using to write is:
```python
(
df.write.format("sqlserver")
.mode("append")
.option("host", f"{host}.database.windows.net")
.option("port", "1433")
.option("user", "datahub")
.option("password", "{password}")
.option("database", "{db}")
.option("dbtable", f"[{schema}].[{table}]")
.save()
)```
And it works fine sometimes, but for some dataframes, it throws:
```
SparkConnectGrpcException: (com.microsoft.sqlserver.jdbc.SQLServerException) The identifier that starts with 'MapType(StringType(), MapType(StringType(), StructType([StructField('factText', StringType(), True), StructField('factSource', A' is too long. Maximum length is 128. JVM stacktrace: com.microsoft.sqlserver.jdbc.SQLServerException at com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:265) at com.microsoft.sqlserver.jdbc.SQLServerStatement.getNextResult(SQLServerStatement.java:1676) at com.microsoft.sqlserver.jdbc.SQLServerStatement.doExecuteStatement(SQLServerStatement.java:907) at com.microsoft.sqlserver.jdbc.SQLServerStatement$StmtExecCmd.doExecute(SQLServerStatement.java:802) at com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer.java:7627) at com.microsoft.sqlserver.jdbc.SQLServerConnection.executeCommand(SQLServerConnection.java:3916) at com.microsoft.sqlserver.jdbc.SQLServerStatement.executeCommand(SQLServerStatement.java:268) at com.microsoft.sqlserver.jdbc.SQLServerStatement.executeStatement(SQLServerStatement.java:242) at com.microsoft.sqlserver.jdbc.SQLServerStatement.executeUpdate(SQLServerStatement.java:742) at org.apache.spark.sql.jdbc.JdbcDialect.createTable(JdbcDialects.scala:193) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.createTable(JdbcUtils.scala:939) at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.saveTableToJDBC(JdbcRelationProvider.scala:88) at org.apache.spark.sql.execution.datasources.jdbc.JdbcDialectRelationProvider.createRelation(JdbcRelationProvider.scala:123) at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:50) at org.apache.spark.sql.execution.command.ExecutedCommandExec.$anonfun$sideEffectResult$2(commands.scala:84) at org.apache.spark.sql.execution.SparkPlan.runCommandWithAetherOff(SparkPlan.scala:178) at org.apache.spark.sql.execution.SparkPlan.runCommandInAetherOrSpark(SparkPlan.scala:189) at org.apache.spark.sql.execution.command.ExecutedCommandExec.$anonfun$sideEffectResult$1(commands.scala:84) at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:81) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:80) at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:94) at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.$anonfun$applyOrElse$4(QueryExecution.scala:358) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:166) at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.$anonfun$applyOrElse$3(QueryExecution.scala:358) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$9(SQLExecution.scala:392) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:700) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:277) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1175) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:164) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:637) at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.$anonfun$applyOrElse$2(QueryExecution.scala:357) at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:1103) at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.$anonfun$applyOrElse$1(QueryExecution.scala:353) at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$withMVTagsIfNecessary(QueryExecution.scala:312) at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.applyOrElse(QueryExecution.scala:350) at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.applyOrElse(QueryExecution.scala:334) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:505) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:83) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:505) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:39) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:343) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:339) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:39) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:39) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:481) at org.apache.spark.sql.execution.QueryExecution.$anonfun$eagerlyExecuteCommands$1(QueryExecution.scala:334) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:400) at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:334) at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:271) at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:268) at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:429) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:1040) at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:444) at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:406) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:272) at org.apache.spark.sql.connect.planner.SparkConnectPlanner.handleWriteOperation(SparkConnectPlanner.scala:3046) at org.apache.spark.sql.connect.planner.SparkConnectPlanner.process(SparkConnectPlanner.scala:2688) at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.handleCommand(ExecuteThreadRunner.scala:292) at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1(ExecuteThreadRunner.scala:234) at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1$adapted(ExecuteThreadRunner.scala:172) at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$2(SessionHolder.scala:333) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1175) at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$1(SessionHolder.scala:333) at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:97) at org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:84) at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:234) at org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:83) at org.apache.spark.sql.connect.service.SessionHolder.withSession(SessionHolder.scala:332) at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.executeInternal(ExecuteThreadRunner.scala:172) at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.org$apache$spark$sql$connect$execution$ExecuteThreadRunner$$execute(ExecuteThreadRunner.scala:123) at org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.$anonfun$run$2(ExecuteThreadRunner.scala:358) at com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:45) at com.databricks.unity.HandleImpl.runWith(UCSHandle.scala:103) at com.databricks.unity.HandleImpl.$anonfun$runWithAndClose$1(UCSHandle.scala:108) at scala.util.Using$.resource(Using.scala:269) at com.databricks.unity.HandleImpl.runWithAndClose(UCSHandle.scala:107) at org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.run(ExecuteThreadRunner.scala:357)
```
The MapType column causing an error is a column I had built up in a dataframe before, but I've filtered it out in the actual dataframe that I'm writing to sql (which only has string columns and has no issues).
What's strange is after some time it sometimes just writes successfully, which makes me think in the write it's accessing some metadata of the table that gets dropped over time, but I'm not perfectly sure. I've tried caching the dataframe, writing to local file and reading in again, but nothing seems to work.