transformWithStateInPandas. Invalid pickle opcode when updating ValueState with large (float) array

VaDim
New Contributor III

I am getting an error when the entity I need to store in a ValueState is a large array (over 15k-20k items). No error (and works correctly) if I trim the array to under 10k samples. The same error is raised when using it as a value for MapState or as an item in a ListState.
I tried multiple runs to see if there's a hard threshold when the error shows up - but there doesn't seem to be one; i.e. it can fail at 16k but it can also be ok and fail with the next record at 15k.

schema="samples ARRAY<FLOAT>"

 

  File "/databricks/spark/python/pyspark/worker.py", line 2232, in main
    process()
  File "/databricks/spark/python/pyspark/worker.py", line 2224, in process
    serializer.dump_stream(out_iter, outfile)
  File "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 1292, in dump_stream
    super().dump_stream(flatten_iterator(), stream)
  File "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 600, in dump_stream
    return ArrowStreamSerializer.dump_stream(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 118, in dump_stream
    for batch in iterator:
  File "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 593, in init_stream_yield_batches
    for series in iterator:
  File "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 1289, in flatten_iterator
    for pdf in iter_pdf:

...

  File "/databricks/spark/python/pyspark/sql/streaming/stateful_processor.py", line 68, in update
    self._valueStateClient.update(self._stateName, newValue)
  File "/databricks/spark/python/pyspark/sql/streaming/value_state_client.py", line 96, in update
    raise PySparkRuntimeError(f"Error updating value state: " f"{response_message[1]}")
pyspark.errors.exceptions.base.PySparkRuntimeError: Error updating value state: invalid pickle opcode: 0