3 weeks ago
Hi,
We are planning to re-write our application ( which was originally running in R) in python. We chose to use Polars as they seems to be faster than pandas. We have functions written in R which we are planning to convert to Python.
However in one of the recent trainings by databrcks that I attended, I got the idea that when using python functions, Spark does not distribute the work to worker node and it will be run on driver. I understand that is the case with pandas. So would like to know if running functions written in Polars also have the same disadvantage.
what are the alternatives to this?
our data is kind of time series..that is, the order of the rows is important. so when we tried converting the code to sparkR the results were not as expected.
Is there an recommended approach for this type of data?
Warm Regards
Manjusha
3 weeks ago - last edited 3 weeks ago
Polars and pandas don’t run on the worker nodes, so you won’t get the benefits of Databricks/Spark parallelism. If your data is small enough to fit on a single driver node, you can continue to use them. If you don’t want to do any refactoring, you might choose a larger driver node (a single-node cluster).
If the data is large and you want to benefit from Databricks/Spark parallelism, consider using the pyspark pandas API (pandas API on Spark) instead of plain pandas or Polars. Check whether the methods you use in your current codebase have equivalents in pandas on Spark. Here is the documentation: https://docs.databricks.com/aws/en/pandas/pandas-on-spark
3 weeks ago
@Manjusha , Great foundation from @pradeep_singh. Let me build on that with some specifics, especially around the time series and row ordering challenge.
Where Polars actually runs in Databricks
The statement you heard in training is about this boundary: once you call spark_df.toPandas() (or collect(), or create a Polars DataFrame directly), all subsequent work happens on the driver process, not the workers. Polars, like pandas, has no integration with Spark's execution engine. It doesn't matter how fast Polars is on a single node. Spark can't distribute it.
Spark distributes Python code only when that code is expressed as Spark transformations: built-in functions, SQL expressions, UDFs, pandas UDFs, applyInPandas, etc. Anything outside of Spark DataFrames/RDDs stays on the driver.
So: Polars on Databricks = single-node (driver) compute, same as pandas.
Why SparkR gave unexpected results
In a distributed system there is no inherent global row order. Spark splits your data across worker nodes, and unless you explicitly define the ordering, you'll get nondeterministic results. That's almost certainly what happened with your SparkR port. The fix is to encode your time series assumptions explicitly rather than relying on implicit row order.
What to try first: window functions
Before reaching for anything else, see how far you can get with native PySpark window functions. These are fully distributed, performant, and map directly to most time series patterns from R:
from pyspark.sql import functions as F, Window
w = Window.partitionBy("entity_id").orderBy("timestamp")
df_with_lag = (
df
.withColumn("prev_value", F.lag("value", 1).over(w))
.withColumn("diff", F.col("value") - F.col("prev_value"))
)
Common mappings: lag, lead, row_number, dense_rank, cumulative sums, and rolling windows via rangeBetween on a timestamp or sequence column. Partition by whatever constitutes a "series" in your domain (device, account, sensor, etc.) and order by the time column.
Second option: pandas API on Spark
If your transformations are closer to pandas/R idioms and hard to express in pure window functions, pandas API on Spark gives you a pandas-like syntax with distributed execution under the hood. It supports groupby + aggregation, sort_values, and rolling operations. Worth checking whether the specific methods you rely on have equivalents. Documentation: https://docs.databricks.com/aws/en/pandas/pandas-on-spark
Where something isn't available in pandas API on Spark, you can fall back to native PySpark window functions for that piece.
Fallback for complex per-group logic: applyInPandas
If your per-entity logic is complicated enough that window functions and pandas API on Spark can't express it cleanly, groupBy().applyInPandas is the hybrid pattern. Spark partitions data by your group key, ships each group to a worker, and runs your Python function in parallel:
import pandas as pd
def process_series(pdf: pd.DataFrame) -> pd.DataFrame:
pdf = pdf.sort_values("timestamp")
# your time series logic here
return pdf
schema = ... # Spark schema of the returned DataFrame
result = (
df
.groupBy("entity_id")
.applyInPandas(process_series, schema=schema)
)
You get parallelism across groups without losing ordering within each group. You can even convert to Polars inside the function (pandas to Polars, run your logic, convert back to pandas), though there's conversion overhead per group.
Documentation: https://docs.databricks.com/en/pandas/pandas-function-apis.html
If your data is one single time series with no natural grouping
That's the harder case. If the entire dataset is one ordered sequence with no way to partition into independent groups, distributed processing doesn't help because the work is inherently serial. In that situation, a larger single-node driver running Polars or pandas is the pragmatic answer.
Putting it together
The decision tree:
applyInPandas to run arbitrary Python per group, in parallel.That gives you correct time series behavior (because ordering and partitioning are explicit), Spark parallelism where possible, and a Python-first environment.
Hope this helps, Louis.
3 weeks ago
Thank you @Louis_Frolio and @pradeep_singh for the detailed explanation. I will discuss your inputs with the team and get back in case we have more question..
3 weeks ago
@Manjusha , if you are happy with my response please click on "Accept as Solution." It will help others trust the guidance.