cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

OversizedAllocationException with transformWithStateInPandas

twbde
New Contributor II

Hello,

I have a process that uses transformWithStateInPandas on a dataframe that has the content on entire files in on of the columns. Recently, the exception OversizedAllocationException has started happening. I have tried setting these configs in the job cluster's definitions, with no success.

spark.sql.execution.arrow.pyspark.enabled true
spark.sql.execution.arrow.transformWithStateInPandas.maxRecordsPerBatch 10
spark.sql.execution.arrow.useLargeVarTypes true
spark.sql.execution.pythonUDF.arrow.enabled true

I have also tried maxRecordsPerBatch at 1 and still got issues. And the message is always that is tried to allocate over 2GB. It's almost as if useLargeVarTypes is ignored for transformWithStateInPandas. I have also tried dissabling arrow with:

spark.sql.execution.arrow.pyspark.enabled false
spark.sql.execution.pythonUDF.arrow.enabled true

I am using Databricks runtime 17.3 LTS, so Spark 4.0.0. My code is python code with PySpark.

Thank you for any help you can give me.

1 ACCEPTED SOLUTION

Accepted Solutions

lingareddy_Alva
Esteemed Contributor

Hi @twbde 

This is a genuinely tricky problem. Here's the diagnosis and the best available workarounds:
Root Cause: useLargeVarTypes Is Not Wired Into transformWithStateInPandas
Your instinct is correct. The spark.sql.execution.arrow.useLargeVarTypes config is not respected by the transformWithStateInPandas serializer path. Looking at how Spark's Arrow infrastructure is built, useLargeVarTypes is plumbed through toPandas() / createDataFrame() and the general Pandas UDF serializers โ€” but transformWithStateInPandas has its own dedicated serializer (TransformWithStateInPandasSerializer), and the largeVarTypes flag is simply not passed through to its ArrowWriter.create(...) call

The result: Arrow still uses the standard VarCharVector / VarBinaryVector which have a hard 2GB cap per column per batch โ€” and even maxRecordsPerBatch=1 won't help if a single file's content already exceeds 2GB, or if the cumulative buffer for the column in a single batch hits that limit through Arrow's internal resizing logic.

Workarounds (in order of preference)
1. Pre-chunk large file content in your input stream (most reliable)
Before the data reaches transformWithStateInPandas, break the file-content column into chunks smaller than, say, 500MB each. Tag each chunk with a sequence number and the grouping key. Reassemble in the stateful processor using your state store.

Then reassemble in your StatefulProcessor's handleInputRows using a ListState or MapState keyed by chunk_seq, and only emit once all chunks have arrived.

2. Use BinaryType instead of StringType for the file content column
If your column is typed as StringType, Arrow uses VarCharVector. If you cast it to BinaryType, Arrow uses VarBinaryVector โ€” same 2GB wall, but sometimes the buffer growth patterns differ and you can at least get slightly further. More importantly, it makes chunking and reassembly more natural for binary file content.

 

LR

View solution in original post

2 REPLIES 2

lingareddy_Alva
Esteemed Contributor

Hi @twbde 

This is a genuinely tricky problem. Here's the diagnosis and the best available workarounds:
Root Cause: useLargeVarTypes Is Not Wired Into transformWithStateInPandas
Your instinct is correct. The spark.sql.execution.arrow.useLargeVarTypes config is not respected by the transformWithStateInPandas serializer path. Looking at how Spark's Arrow infrastructure is built, useLargeVarTypes is plumbed through toPandas() / createDataFrame() and the general Pandas UDF serializers โ€” but transformWithStateInPandas has its own dedicated serializer (TransformWithStateInPandasSerializer), and the largeVarTypes flag is simply not passed through to its ArrowWriter.create(...) call

The result: Arrow still uses the standard VarCharVector / VarBinaryVector which have a hard 2GB cap per column per batch โ€” and even maxRecordsPerBatch=1 won't help if a single file's content already exceeds 2GB, or if the cumulative buffer for the column in a single batch hits that limit through Arrow's internal resizing logic.

Workarounds (in order of preference)
1. Pre-chunk large file content in your input stream (most reliable)
Before the data reaches transformWithStateInPandas, break the file-content column into chunks smaller than, say, 500MB each. Tag each chunk with a sequence number and the grouping key. Reassemble in the stateful processor using your state store.

Then reassemble in your StatefulProcessor's handleInputRows using a ListState or MapState keyed by chunk_seq, and only emit once all chunks have arrived.

2. Use BinaryType instead of StringType for the file content column
If your column is typed as StringType, Arrow uses VarCharVector. If you cast it to BinaryType, Arrow uses VarBinaryVector โ€” same 2GB wall, but sometimes the buffer growth patterns differ and you can at least get slightly further. More importantly, it makes chunking and reassembly more natural for binary file content.

 

LR

Hello @lingareddy_Alva,

Thank you very much for your answer. I feel like the spark.sql.execution.arrow.useLargeVarTypes config not being respected by the transformWithStateInPandas serializer path is a bug. Is there somewhere I could report it to the Spark development team? I am unfortunatly not a Scala or Java dev and cannot send a patch myself. If I could, I would...

Regarding your suggestions to work around the issue, we are already using the binary type for the file contents. Spliting the files in chunks is unfortunatly not very feasable for us. However, I don't even use the file's content in the stateful processor, I use other columns around it. The file's content is use in a later stage of our pipeline, so I really only need to have it pass throught the stateful processor to have it available on the other side. I was thinking that it might be possible to split my stream in 2 branches, one that sends the relevant columns to the stateful processor and the file content goes "around" it and the the 2 branches are stiched back together with a join. I am however worried about join 2 streaming datasets together, especially when there is statefulness involved.

Thank you again for your help. I will wait for a few days before marking your answer as accepted in case you have the time to write me back, because it really did answer my original question regarding the OversizedAllocationException exception.

Best regards

TWBDE