Hello all,
I've been experiencing the error described below, where I try to query a table from Snowflake which is about ~5.5B rows and ~30columns, and it fails almost systematically; specifically, either the Spark Job doesn't even start or I get the standard error below.
I know I can query similar sized datasets because I've done it in the past on a different project (much larger data), but that was with Azure Databricks, not GCP Databricks.
My setup is as follows:
- Databricks Runtime 10.4LTS
- 2-6 n1-standard-64 workers (autoscale), these are 240GB and 64cores each
- n1-standard-64 driver as well
What I've tried:
- Found this GitHub thread that suggested downgrading snowflake connector, so tried Databricks Runtime 9.1 and it still wasn't working and gave me the same error
- Tried other more recent runtimes: also wasn't working
- Only thing that worked was selecting a few columns and only keeping ~22-23 instead of 30, and that made the query run through (I don't believe I should have any such problems though)
- The query runs perfectly fine when directly run on snowflake
Below is the standard error from the Cluster:
Py4JJavaError Traceback (most recent call last)
<command-3149904745081202> in <module>
10 print(df_trx_with_dept.columns)
11 print("Started writing trx_with_dept data with repartition")
---> 12 df_trx_with_dept.write.format("parquet").mode("overwrite").save(
13 "gs://crs-tenant147/ds/data/pre_processed/20220630_transaction_detailed_with_dept_filtered_052021_052022.parquet"
14 )
/databricks/spark/python/pyspark/sql/readwriter.py in save(self, path, format, mode, partitionBy, **options)
738 self._jwrite.save()
739 else:
--> 740 self._jwrite.save(path)
741
742 @since(1.4)
/databricks/spark/python/lib/py4j-0.10.9.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
1302
1303 answer = self.gateway_client.send_command(command)
-> 1304 return_value = get_return_value(
1305 answer, self.gateway_client, self.target_id, self.name)
1306
/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
115 def deco(*a, **kw):
116 try:
--> 117 return f(*a, **kw)
118 except py4j.protocol.Py4JJavaError as e:
119 converted = convert_exception(e.java_exception)
/databricks/spark/python/lib/py4j-0.10.9.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
325 if answer[1] == REFERENCE_TYPE:
--> 326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
328 format(target_id, ".", name), value)
Py4JJavaError: An error occurred while calling o726.save.
: net.snowflake.client.jdbc.SnowflakeSQLException: JDBC driver encountered communication error. Message: Exception encountered when executing statement: Premature end of chunk coded message body: closing chunk expected.
at net.snowflake.client.jdbc.SnowflakeStatementV1.executeQueryInternal(SnowflakeStatementV1.java:245)
at net.snowflake.client.jdbc.SnowflakePreparedStatementV1.executeQuery(SnowflakePreparedStatementV1.java:117)
at net.snowflake.spark.snowflake.JDBCWrapper.$anonfun$executePreparedQueryInterruptibly$1(SnowflakeJDBCWrapper.scala:330)
at net.snowflake.spark.snowflake.JDBCWrapper.$anonfun$executeInterruptibly$2(SnowflakeJDBCWrapper.scala:368)
at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
at scala.util.Success.$anonfun$map$1(Try.scala:255)
at scala.util.Success.map(Try.scala:213)
at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Thanks for your help!