cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

mapInPandas returning an intermittent error related to data type interconversion

chinmay0924
New Contributor III

```

File "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 346, in _create_array return pa.Array.from_pandas( ^^^^^^^^^^^^^^^^^^^^^ File "pyarrow/array.pxi", line 1126, in pyarrow.lib.Array.from_pandas File "pyarrow/array.pxi", line 360, in pyarrow.lib.array File "pyarrow/array.pxi", line 87, in pyarrow.lib._ndarray_to_array File "pyarrow/error.pxi", line 92, in pyarrow.lib.check_status pyarrow.lib.ArrowInvalid: Could not convert 'id' with type str: tried to convert to int64

The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/databricks/spark/python/pyspark/worker.py", line 2228, in main process() File "/databricks/spark/python/pyspark/worker.py", line 2220, in process serializer.dump_stream(out_iter, outfile) File "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 600, in dump_stream return ArrowStreamSerializer.dump_stream( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 118, in dump_stream for batch in iterator: File "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 594, in init_stream_yield_batches batch = self._create_batch(series) ^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 578, in _create_batch arrs.append(self._create_struct_array(s, t)) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 523, in _create_struct_array self._create_array( File "/databricks/spark/python/pyspark/sql/pandas/serializers.py", line 374, in _create_array raise PySparkValueError(error_msg % (series.dtype, series.name, arrow_type)) from e pyspark.errors.exceptions.base.PySparkValueError: Exception thrown when converting pandas.Series (object) with name 'id' to Arrow Array (int64).

```

I am getting this error no trying to use the `.mapInPandas` function on a spark dataframe with the following schema,
```

StructType([StructField('id', LongType(), True), StructField('name', StringType(), True), StructField('jobRole', StringType(), True), StructField('ssn', StringType(), True), StructField('dt_id', LongType(), False)])
```
 
I naturally assumed that the presence of a LongType column is causing the issue, so I created another table with a LongType column, with the following schema,
```
StructType([StructField('id', LongType(), True), StructField('ssn', StringType(), True), StructField('dt_id', LongType(), False)])
```
and usage of `.mapInPandas` function is not giving the errors.

What is the reason for this discrepancy?

 

 

4 REPLIES 4

Raghavan93513
Databricks Employee
Databricks Employee

This issue can occur when there is a schema mismatch with the struct.

Raghavan93513
Databricks Employee
Databricks Employee

Could you show me how the mapInPandas() is used in the code?
Does the data (or even schema) change dynamically?

The data in one of the columns is modified in the function that is passed to `mapInPandas`,

def df_process_function(df):
schema = df.schema
def partition_process(iterator):
for pdf in iterator:
processed_records = partition_function(pdf)
yield pd.DataFrame(processed_records)
return df.mapInPandas(process_partition, schema=schema)

But the column that is shown in the error is different from that is modified.

The schema of the spark dataframe is not modified dynamically.

Raghavan93513
Databricks Employee
Databricks Employee

Hi @chinmay0924 Good day!

Could you please confirm the following:

  • Does the ID column incorrectly contain strings, which PyArrow fails to convert to integers (int64)?
  • Are the data processed in both dataframes the exact same?

Additionally, could you provide us with the output for the following code:

from pyspark.sql.functions import col
# rows whose id fails the cast (or is blank/NaN/non-numeric)
bad = df.filter(
col('id').isNotNull() & # keep out the already-null originals (optional)
col('id').cast('bigint').isNull()
)
bad.show()

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now