Greetings @javeed , Youโre right to call out the friction between a PySpark DataFrame and many Python ML libraries like statsmodels; most Python ML stacks expect pandas, while Spark is distributed-first. Hereโs how to bridge that gap efficiently for 50GB time series workloads on Databricks.
What works at scale on Databricks
- Pandas API on Spark lets you write pandas-style code that executes on Spark, avoiding single-node pandas OOM and scaling out across a cluster while preserving familiar idioms. Import as
pyspark.pandas as ps and use it when you can keep your logic โpandas-like.โ
-
Pandas UDFs and pandas function APIs (applyInPandas, mapInPandas, cogroup.applyInPandas) let you run arbitrary Python/pandas code (including statsmodels) against Spark partitions or groups with Arrow-optimized transfer. This is the main interoperability pattern: you group by your series key(s), materialize each group as a pandas DataFrame inside the UDF, and run statsmodels there. Itโs proven in production and documented with examples and constraints.
-
Statsmodels with Spark via grouped Pandas UDFs: Databricksโ examples show OLS per group with statsmodels in a grouped Pandas UDF, and SARIMAX time-series forecasting scaled across thousands of SKUs using groupBy + applyInPandas. The same pattern applies to decomposition (trend/seasonal/residual) or ARIMA/ETS per series.
Recommended design pattern for time series at scale
Use โmany small modelsโ on grouped data with pandas function APIs:
-
Partition by series key and parallelize per series: df.repartition(n_tasks, key...).groupBy(key...).applyInPandas(fn, schema) so each series (or series group) is processed independently with your statsmodels code. Temporarily disabling AQE helps ensure one Spark task per group when maximizing parallelism in training steps.
-
Control per-group memory: All rows in a group are loaded to a worker before your function runs; keep groups bounded and watch skew. If a few series are huge, sub-sample, window, or split them to avoid OOM within the worker.
-
Tune Arrow batch size for pandas UDFs: Adjust spark.sql.execution.arrow.maxRecordsPerBatch when many columns (wide features) drive high JVM memory during Arrow conversion.
-
Mind timestamps: Spark stores timestamps as UTC; prefer pandas-native time series operations inside the UDF for best performance and correctness with time zones.
-
Log your runs: Use MLflow autologging to capture parameters/metrics/artifacts automatically; it supports statsmodels, scikit-learn, Spark MLlib, and others in Databricks Runtime ML.
When PySpark ML lacks the model you need
- Keep the data distributed, but run Python ML per group via pandas UDFs (statsmodels, sktime components that are series-local, etc.). This avoids a bulk conversion of the whole table to pandas, and lets you scale horizontally by adding workers.
-
Use distributed-capable alternatives where available: For tree/GBM, prefer XGBoostโs Spark estimators (xgboost.spark) to train models in parallel across the cluster instead of single-node xgboost Python APIs.
-
If single-node training is unavoidable, constrain to sampled subsets or per-series batch sizes that fit in memory, and parallelize across many series with pandas UDFs; cluster scale helps for large row counts, while adding columns often hurts moreโplan sizing accordingly.
-
Minimal pattern: statsmodels per series with applyInPandas
```python import pandas as pd from statsmodels.tsa.seasonal import seasonal_decompose from pyspark.sql.types import StructType, StructField, StringType, TimestampType, DoubleType
# Spark DataFrame: columns [series_id, ts, value] schema = StructType([ StructField("series_id", StringType(), False), StructField("ts", TimestampType(), False), StructField("trend", DoubleType(), True), StructField("seasonal", DoubleType(), True), StructField("resid", DoubleType(), True), ])
def decompose(pdf: pd.DataFrame) -> pd.DataFrame: pdf = pdf.sort_values("ts") # set frequency/period appropriate for your cadence, e.g., period=24 for hourly daily seasonality result = seasonal_decompose(pdf["value"], period=24, model="additive", extrapolate_trend="freq") out = pd.DataFrame({ "series_id": pdf["series_id"], "ts": pdf["ts"], "trend": result.trend, "seasonal": result.seasonal, "resid": result.resid }) return out
# Maximize parallelism by ensuring one task per series (optional) n_tasks = df.select("series_id").distinct().count() spark.conf.set("spark.databricks.optimizer.adaptive.enabled", "false") decomp = ( df.repartition(n_tasks, "series_id") .groupBy("series_id") .applyInPandas(decompose, schema) ) ```
Practical tips
- Prefer Pandas API on Spark for exploratory and feature engineering when it fits your workflow; itโs pandas-equivalent on Spark and avoids full pandas conversion.
-
Use pandas UDFs for the โbridgeโ to libraries that donโt support Spark DataFrames (statsmodels, SHAP, etc.), and tune partitions to use all cores across nodes efficiently.
-
Donโt collect 50GB to the driver; only move per-group batches as pandas via Arrow inside UDFs, and keep outputs as Spark DataFrames/Delta tables.
-
Track experiments with MLflow and, where possible, use Databricks Runtime ML for pre-installed ML libraries and optimized MLlib algorithms.
Hope this helps, Louis.