Snowflake/GCP error: Premature end of chunk coded message body: closing chunk expected

hamzatazib96
Databricks Partner

Hello all,

I've been experiencing the error described below, where I try to query a table from Snowflake which is about ~5.5B rows and ~30columns, and it fails almost systematically; specifically, either the Spark Job doesn't even start or I get the standard error below.

I know I can query similar sized datasets because I've done it in the past on a different project (much larger data), but that was with Azure Databricks, not GCP Databricks.

My setup is as follows:

  • Databricks Runtime 10.4LTS
  • 2-6 n1-standard-64 workers (autoscale), these are 240GB and 64cores each
  • n1-standard-64 driver as well

What I've tried:

  • Found this GitHub thread that suggested downgrading snowflake connector, so tried Databricks Runtime 9.1 and it still wasn't working and gave me the same error
  • Tried other more recent runtimes: also wasn't working
  • Only thing that worked was selecting a few columns and only keeping ~22-23 instead of 30, and that made the query run through (I don't believe I should have any such problems though)
  • The query runs perfectly fine when directly run on snowflake

Below is the standard error from the Cluster:

Py4JJavaError                             Traceback (most recent call last)
<command-3149904745081202> in <module>
     10 print(df_trx_with_dept.columns)
     11 print("Started writing trx_with_dept data with repartition")
---> 12 df_trx_with_dept.write.format("parquet").mode("overwrite").save(
     13     "gs://crs-tenant147/ds/data/pre_processed/20220630_transaction_detailed_with_dept_filtered_052021_052022.parquet"
     14 )
 
/databricks/spark/python/pyspark/sql/readwriter.py in save(self, path, format, mode, partitionBy, **options)
    738             self._jwrite.save()
    739         else:
--> 740             self._jwrite.save(path)
    741 
    742     @since(1.4)
 
/databricks/spark/python/lib/py4j-0.10.9.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1302 
   1303         answer = self.gateway_client.send_command(command)
-> 1304         return_value = get_return_value(
   1305             answer, self.gateway_client, self.target_id, self.name)
   1306 
 
/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
    115     def deco(*a, **kw):
    116         try:
--> 117             return f(*a, **kw)
    118         except py4j.protocol.Py4JJavaError as e:
    119             converted = convert_exception(e.java_exception)
 
/databricks/spark/python/lib/py4j-0.10.9.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    324             value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325             if answer[1] == REFERENCE_TYPE:
--> 326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
    328                     format(target_id, ".", name), value)
 
Py4JJavaError: An error occurred while calling o726.save.
: net.snowflake.client.jdbc.SnowflakeSQLException: JDBC driver encountered communication error. Message: Exception encountered when executing statement: Premature end of chunk coded message body: closing chunk expected.
	at net.snowflake.client.jdbc.SnowflakeStatementV1.executeQueryInternal(SnowflakeStatementV1.java:245)
	at net.snowflake.client.jdbc.SnowflakePreparedStatementV1.executeQuery(SnowflakePreparedStatementV1.java:117)
	at net.snowflake.spark.snowflake.JDBCWrapper.$anonfun$executePreparedQueryInterruptibly$1(SnowflakeJDBCWrapper.scala:330)
	at net.snowflake.spark.snowflake.JDBCWrapper.$anonfun$executeInterruptibly$2(SnowflakeJDBCWrapper.scala:368)
	at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
	at scala.util.Success.$anonfun$map$1(Try.scala:255)
	at scala.util.Success.map(Try.scala:213)
	at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
	at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
	at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Thanks for your help!