cancel
Showing results for 
Search instead for 
Did you mean: 
Technical Blog
Explore in-depth articles, tutorials, and insights on data analytics and machine learning in the Databricks Technical Blog. Stay updated on industry trends, best practices, and advanced techniques.
cancel
Showing results for 
Search instead for 
Did you mean: 
josh_melton
Databricks Employee
Databricks Employee

Introduction

If you’re coming from a Pandas background, moving from the simple Pandas on Spark API into the more flexible Pandas function paradigms can be very intimidating. More importantly, it’s very confusing to distinguish between what appears to be many, many approaches and it’s challenging to find one document that explains them all. A terrific blog post from our colleague TJ which discusses a specific use case for mapInPandas also contains an illuminating diagram (below). This blog expands upon this diagram and makes it clear where and how to use Pandas UDFs, applyInPandas, and mapInPandas in your solutions. For the end to end example, clone this notebook.

josh_melton_2-1719324884173.pngjosh_melton_3-1719324884173.png

Importantly, it should be noted that using native Spark functions will be faster than distributing Pandas operations. Only distribute Pandas functions as a fallback for when you can’t use Spark due to migrating custom Pandas logic or a library that’s only implemented in Pandas. Similarly, if you’re distributing simple data transformations in Pandas you can try Pandas on Spark

However, at times you’ll need to run more complex operations than the Pandas on Spark API handles or use a library which isn’t implemented natively in Spark. At these times, you’ll want to combine the distributed processing power of Spark with the flexibility of Pandas by using Pandas UDFs, applyInPandas, or mapInPandas. 

Each of these approaches allows you to operate on standard Pandas objects (DataFrames and Series) in a custom Python function. Deciding between the three approaches is largely a consequence of the input and output you expect in your custom function, as shown in the table below. If you expect one output row for each input row, use a Pandas UDF. If you expect to process your input in some specific grouping, use applyInPandas. If you expect to have many output rows for each input row, use mapInPandas.

Input

Output

Method

One Row

One Row

Pandas UDF

Many Rows

One Row

applyInPandas

One Row

Many Rows

mapInPandas

To illustrate these concepts we’ll use a simple example of each. To start, we’ll create a randomly generated Spark dataframe like below:

 

 

from pyspark.sql.functions import rand, pandas_udf, col
import pandas as pd


def generate_initial_df(num_rows, num_devices, num_trips):
   return (
       spark.range(num_rows)
       .withColumn('device_id', (rand()*num_devices).cast('int'))
       .withColumn('trip_id', (rand()*num_trips).cast('int'))
       .withColumn('sensor_reading', (rand()*1000))
       .drop('id')
   )

df = generate_initial_df(5000000, 10000, 50)
df.display()

 

 

pandas_udf

Next, we’ll show an example of using a Pandas UDF to calculate the square root of the sensor_reading column. Once we’ve defined the function and decorated it with @pandas_udf, we can now use it like we would a normal Spark function. Note that this example operates on one column at a time, but Pandas UDFs can be very flexible in the data structures they use. Pandas UDFs are commonly used to return predictions made by machine learning models. For more details and examples, check our documentation

 

 

@pandas_udf('double')
def calculate_sqrt(sensor_reading: pd.Series) -> pd.Series:
   return sensor_reading.apply(lambda x: x**0.5)

df = df.withColumn('sqrt_reading', calculate_sqrt(col('sensor_reading')))
df.display()

 

 

applyInPandas

Another technique for distributing Pandas operations is applyInPandas. We can use applyInPandas for operations that we want to run on individual groups in parallel, such as by device_id. Common uses include custom aggregations, normalizing per grouping, or training a machine learning model per grouping. In this example, we’ll run a custom aggregation in Pandas which reduces the granularity of our DataFrame down to the device_id column. The trip_id column will be converted into a list of the values per device_id, and the sensor_reading and sqrt_reading columns will be averaged for each device_id. The output will be one row per device_id. Importantly, applyInPandas requires your function to accept and return a Pandas DataFrame, and the schema of the returned DataFrame must be defined ahead of time so that PyArrow can serialize it efficiently. For an example of using applyInPandas to train models for each grouping of some key, check notebook four in this solution accelerator.

 

 

def denormalize(pdf: pd.DataFrame) -> pd.DataFrame:
  aggregated_df = pdf.groupby('device_id', as_index=False).agg(
      {'trip_id': lambda x: list(x), 'sensor_reading': 'mean', 'sqrt_reading': 'mean'}
  )
  return aggregated_df

expected_schema = 'device_id int, trip_id array<int>, sensor_reading long, sqrt_reading long'
df = df.groupBy('device_id').applyInPandas(denormalize, schema=expected_schema)
df.display()

 

 

mapInPandas

The final approach to distributing custom Pandas functions is mapInPandas. In our mapInPandas function, we can return many rows for each input row, meaning it operates in an opposite manner to applyInPandas. We’ll use a Python Iterator for this, allowing us to be flexible in how many rows we yield. In our simple example below, we’ll convert the granularity of the DataFrame back to one row per combination of trip_id and device_id. Note that this example is illustrative - we could simply use Spark’s native explode() function and get the same result but more performant. For a more realistic use of this approach, read the blog post referenced above which describes how to use mapInPandas to process uncommon file types. 

 

 

from collections.abc import Iterator


def renormalize(itr: Iterator[pd.DataFrame]) -> Iterator[pd.DataFrame]:
   for pdf in itr:
       # Unpack the list of values from the trip_id column into their own rows
       pdf = pdf.explode('trip_id')
       yield pdf

expected_schema = 'device_id int, trip_id int, sensor_reading long, sqrt_reading long'
df = df.mapInPandas(renormalize, schema=expected_schema)

 

 

Conclusion

In conclusion, we’ve outlined how to use custom Python functions to operate on Pandas objects in a distributed Spark environment. We’ve covered simple examples and left links to detailed resources for further investigation of any particular approach. We should now have an understanding of the use cases for Pandas UDFs, applyInPandas, and mapInPandas and be able to apply custom Pandas logic in parallel!

Contributors