I am getting an error when the entity I need to store in a ValueState is a large array (over 15k-20k items). No error (and works correctly) if I trim the array to under 10k samples. The same error is raised when using it as a value for MapState or as an item in a ListState.
I tried multiple runs to see if there's a hard threshold when the error shows up - but there doesn't seem to be one; i.e. it can fail at 16k but it can also be ok and fail with the next record at 15k.
schema="samples ARRAY<FLOAT>"
File "/databricks/spark/python/pyspark/worker.py", line 2232, in main
process()
File "/databricks/spark/python/pyspark/worker.py", line 2224, in process
serializer.dump_stream(out_iter, outfile)
File "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 1292, in dump_stream
super().dump_stream(flatten_iterator(), stream)
File "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 600, in dump_stream
return ArrowStreamSerializer.dump_stream(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 118, in dump_stream
for batch in iterator:
File "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 593, in init_stream_yield_batches
for series in iterator:
File "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 1289, in flatten_iterator
for pdf in iter_pdf:
...
File "/databricks/spark/python/pyspark/sql/streaming/stateful_processor.py", line 68, in update
self._valueStateClient.update(self._stateName, newValue)
File "/databricks/spark/python/pyspark/sql/streaming/value_state_client.py", line 96, in update
raise PySparkRuntimeError(f"Error updating value state: " f"{response_message[1]}")
pyspark.errors.exceptions.base.PySparkRuntimeError: Error updating value state: invalid pickle opcode: 0