cancel
Showing results for 
Search instead for 
Did you mean: 
Data Governance
Join discussions on data governance practices, compliance, and security within the Databricks Community. Exchange strategies and insights to ensure data integrity and regulatory compliance.
cancel
Showing results for 
Search instead for 
Did you mean: 

Pyspark SQL Write Error: SparkConnectGrpcException (com.microsoft.sqlserver.jdbc.SQLServerException)

MarthinusB
New Contributor

When attempting to write a dataframe to SQL Server using the provided "sqlserver" connector or the old "jdbc" one, it seems to be breaking due to old values used to build the dataframe. The code I'm using to write is:

```python

(
    df.write.format("sqlserver")
    .mode("append")
    .option("host", f"{host}.database.windows.net")
    .option("port", "1433")
    .option("user", "datahub")
    .option("password", "{password}")
    .option("database", "{db}")
    .option("dbtable", f"[{schema}].[{table}]")
    .save()
)```

And it works fine sometimes, but for some dataframes, it throws: 

```
SparkConnectGrpcException: (com.microsoft.sqlserver.jdbc.SQLServerException) The identifier that starts with 'MapType(StringType(), MapType(StringType(), StructType([StructField('factText', StringType(), True), StructField('factSource', A' is too long. Maximum length is 128. JVM stacktrace: com.microsoft.sqlserver.jdbc.SQLServerException at com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:265) at com.microsoft.sqlserver.jdbc.SQLServerStatement.getNextResult(SQLServerStatement.java:1676) at com.microsoft.sqlserver.jdbc.SQLServerStatement.doExecuteStatement(SQLServerStatement.java:907) at com.microsoft.sqlserver.jdbc.SQLServerStatement$StmtExecCmd.doExecute(SQLServerStatement.java:802) at com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer.java:7627) at com.microsoft.sqlserver.jdbc.SQLServerConnection.executeCommand(SQLServerConnection.java:3916) at com.microsoft.sqlserver.jdbc.SQLServerStatement.executeCommand(SQLServerStatement.java:268) at com.microsoft.sqlserver.jdbc.SQLServerStatement.executeStatement(SQLServerStatement.java:242) at com.microsoft.sqlserver.jdbc.SQLServerStatement.executeUpdate(SQLServerStatement.java:742) at org.apache.spark.sql.jdbc.JdbcDialect.createTable(JdbcDialects.scala:193) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.createTable(JdbcUtils.scala:939) at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.saveTableToJDBC(JdbcRelationProvider.scala:88) at org.apache.spark.sql.execution.datasources.jdbc.JdbcDialectRelationProvider.createRelation(JdbcRelationProvider.scala:123) at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:50) at org.apache.spark.sql.execution.command.ExecutedCommandExec.$anonfun$sideEffectResult$2(commands.scala:84) at org.apache.spark.sql.execution.SparkPlan.runCommandWithAetherOff(SparkPlan.scala:178) at org.apache.spark.sql.execution.SparkPlan.runCommandInAetherOrSpark(SparkPlan.scala:189) at org.apache.spark.sql.execution.command.ExecutedCommandExec.$anonfun$sideEffectResult$1(commands.scala:84) at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:81) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:80) at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:94) at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.$anonfun$applyOrElse$4(QueryExecution.scala:358) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:166) at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.$anonfun$applyOrElse$3(QueryExecution.scala:358) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$9(SQLExecution.scala:392) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:700) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:277) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1175) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:164) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:637) at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.$anonfun$applyOrElse$2(QueryExecution.scala:357) at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:1103) at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.$anonfun$applyOrElse$1(QueryExecution.scala:353) at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$withMVTagsIfNecessary(QueryExecution.scala:312) at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.applyOrElse(QueryExecution.scala:350) at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.applyOrElse(QueryExecution.scala:334) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:505) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:83) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:505) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:39) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:343) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:339) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:39) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:39) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:481) at org.apache.spark.sql.execution.QueryExecution.$anonfun$eagerlyExecuteCommands$1(QueryExecution.scala:334) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:400) at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:334) at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:271) at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:268) at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:429) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:1040) at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:444) at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:406) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:272) at org.apache.spark.sql.connect.planner.SparkConnectPlanner.handleWriteOperation(SparkConnectPlanner.scala:3046) at org.apache.spark.sql.connect.planner.SparkConnectPlanner.process(SparkConnectPlanner.scala:2688) at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.handleCommand(ExecuteThreadRunner.scala:292) at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1(ExecuteThreadRunner.scala:234) at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1$adapted(ExecuteThreadRunner.scala:172) at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$2(SessionHolder.scala:333) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1175) at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$1(SessionHolder.scala:333) at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:97) at org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:84) at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:234) at org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:83) at org.apache.spark.sql.connect.service.SessionHolder.withSession(SessionHolder.scala:332) at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.executeInternal(ExecuteThreadRunner.scala:172) at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.org$apache$spark$sql$connect$execution$ExecuteThreadRunner$$execute(ExecuteThreadRunner.scala:123) at org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.$anonfun$run$2(ExecuteThreadRunner.scala:358) at com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:45) at com.databricks.unity.HandleImpl.runWith(UCSHandle.scala:103) at com.databricks.unity.HandleImpl.$anonfun$runWithAndClose$1(UCSHandle.scala:108) at scala.util.Using$.resource(Using.scala:269) at com.databricks.unity.HandleImpl.runWithAndClose(UCSHandle.scala:107) at org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.run(ExecuteThreadRunner.scala:357)
```

The MapType column causing an error is a column I had built up in a dataframe before, but I've filtered it out in the actual dataframe that I'm writing to sql (which only has string columns and has no issues).

What's strange is after some time it sometimes just writes successfully, which makes me think in the write it's accessing some metadata of the table that gets dropped over time, but I'm not perfectly sure. I've tried caching the dataframe, writing to local file and reading in again, but nothing seems to work.

1 REPLY 1

Brahmareddy
Honored Contributor

Hi @MarthinusB, How are you doing today?

To overcome this, Give a try explicitly defining the schema for your DataFrame to ensure no unwanted metadata is retained. You can also recreate the DataFrame using rdd.toDF() or checkpoint it to drop unnecessary lineage. Explicitly drop any unused columns and use cache() with a count to materialize changes before writing. Finally, ensure you're using the correct Spark connector versions for SQL Server.

Let me know if it works and have a good day.

Regards,

Brahma

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group