cancel
Showing results for 
Search instead for 
Did you mean: 
Technical Blog
Explore in-depth articles, tutorials, and insights on data analytics and machine learning in the Databricks Technical Blog. Stay updated on industry trends, best practices, and advanced techniques.
cancel
Showing results for 
Search instead for 
Did you mean: 
josh_melton
New Contributor III
New Contributor III

In this short tutorial, we’ll implement an approach to making certain applyInPandas operations run many times faster. First, let's generate some dummy data for this example using Spark. For our example, we’ll create a function that returns a dataframe with the specified number of rows, devices, and trips. In this case, we’ll create five million rows for 100 devices that take 1,000 trips each, with some random “sensor_reading” data to process. If we paste this into Databricks, it should take just a moment to run:

 

from pyspark.sql.functions import rand
import pandas as pd

def generate_initial_df(num_rows, num_devices, num_trips):
   return (
       spark.range(num_rows)
       .withColumn('device_id', (rand()*num_devices).cast('int'))
       .withColumn('trip_id', (rand()*num_trips).cast('int'))
       .withColumn('sensor_reading', (rand()*1000))
       .drop('id')
   )

df = generate_initial_df(5000000, 100, 1000)
df.display()

 

Typically, to apply Pandas operations to groups within a Spark dataframe, we’d use applyInPandas like below. This can be helpful when you have a requirement to process data by some specific key(s), such as the groups of devices and trips. We could run custom aggregations on certain groups, or normalization per device and trip like in the example below:

 

def normalize_device_and_trip(pdf: pd.DataFrame) -> pd.DataFrame:
  reading = pdf.sensor_reading
  pdf['normalized_reading'] = reading.mean() / reading.std()
  return pdf

expected_schema = 'device_id int, trip_id int, sensor_reading long, normalized_reading long'
df.groupBy('device_id', 'trip_id').applyInPandas(normalize_device_and_trip, expected_schema).display()

 

Unfortunately, this may take more time to complete than expected given the small volume of data (roughly a little over a minute, depending on your cluster). In the background, Spark is using PyArrow to serialize each group into a Pandas dataframe and run the computation you defined on each group in parallel across the cluster. This is fine when you have a lot of data in each group. However, in this case, we know we have very few rows per group - just fifty rows per trip on average but fifty thousand per device_id:

 

print(df.count() / df.select('device_id').distinct().count()) # 50,000
print(df.count() / df.select('device_id', 'trip_id').distinct().count()) # ~50

 

When our groupings are too large to fit in memory for traditional Pandas to process, applyInPandas gives us an approach for distributing the groups of data across the cluster. This might help with our broader dataset containing many millions of rows, but isn't necessary if the grouping produces significantly less data volume than that. We get the best of in-memory processing and distributed processing by combining Spark’s custom aggregator, applyInPandas, with the traditional Pandas custom aggregator. In this example we’ll process each device_id group in parallel (distributed and serialized), then aggregate further on the trip_id (not distributed or serialized further):

 

def normalize_trip(pdf: pd.DataFrame) -> pd.DataFrame:
  reading = pdf.sensor_reading
  pdf['normalized_reading'] = reading.mean() / reading.std()
  return pdf

def normalize_device(pdf: pd.DataFrame) -> pd.DataFrame:
   return pdf.groupby('trip_id').apply(normalize_trip)

expected_schema = 'device_id int, trip_id int, sensor_reading long, normalized_reading long'
df.groupBy('device_id').applyInPandas(normalize_device, expected_schema).display()

 

This approach should run much more quickly (roughly three times faster, depending on the cluster). Note that applyInPandas still provides distributed processing for our larger group, while the simple Pandas apply provides aggregation for our smaller group. If the trip_id group suddenly grew too large to fit in memory, we’d face potential out of memory errors. If the size of the entire dataset suddenly shrank, we’d be incurring the cost of invoking Arrow more than necessary. Feel free to generate different sizes of data to see which groupings work better or worse. The balance may be tough to strike, but if you know your data extremely well and it won’t change dramatically over time, the combined grouping approaches can significantly improve performance and cost savings for your data processing.