Working with pyspark dataframe with machine learning libraries / statistical model libraries

javeed
New Contributor

Hi Team, 

I am working with huge volume of data (50GB) and i decompose the time series data using the statsmodel.

Having said that the major challenge i am facing is the compatibility of the pyspark dataframe with the machine learning algorithms. although the pysaprk.ml is available but many of the models are not available.

My question is on a broader level ,How can we handle this compatibility issue ? 

Converting pyspark to pandas has not been proven to be very efficient.

Louis_Frolio
Databricks Employee
Databricks Employee

Greetings @javeed ,   You’re right to call out the friction between a PySpark DataFrame and many Python ML libraries like statsmodels; most Python ML stacks expect pandas, while Spark is distributed-first. Here’s how to bridge that gap efficiently for 50GB time series workloads on Databricks.

 

What works at scale on Databricks

  • Pandas API on Spark lets you write pandas-style code that executes on Spark, avoiding single-node pandas OOM and scaling out across a cluster while preserving familiar idioms. Import as pyspark.pandas as ps and use it when you can keep your logic “pandas-like.”
  • Pandas UDFs and pandas function APIs (applyInPandas, mapInPandas, cogroup.applyInPandas) let you run arbitrary Python/pandas code (including statsmodels) against Spark partitions or groups with Arrow-optimized transfer. This is the main interoperability pattern: you group by your series key(s), materialize each group as a pandas DataFrame inside the UDF, and run statsmodels there. It’s proven in production and documented with examples and constraints.
  • Statsmodels with Spark via grouped Pandas UDFs: Databricks’ examples show OLS per group with statsmodels in a grouped Pandas UDF, and SARIMAX time-series forecasting scaled across thousands of SKUs using groupBy + applyInPandas. The same pattern applies to decomposition (trend/seasonal/residual) or ARIMA/ETS per series.

Recommended design pattern for time series at scale

Use “many small models” on grouped data with pandas function APIs:
  • Partition by series key and parallelize per series: df.repartition(n_tasks, key...).groupBy(key...).applyInPandas(fn, schema) so each series (or series group) is processed independently with your statsmodels code. Temporarily disabling AQE helps ensure one Spark task per group when maximizing parallelism in training steps.
  • Control per-group memory: All rows in a group are loaded to a worker before your function runs; keep groups bounded and watch skew. If a few series are huge, sub-sample, window, or split them to avoid OOM within the worker.
  • Tune Arrow batch size for pandas UDFs: Adjust spark.sql.execution.arrow.maxRecordsPerBatch when many columns (wide features) drive high JVM memory during Arrow conversion.
  • Mind timestamps: Spark stores timestamps as UTC; prefer pandas-native time series operations inside the UDF for best performance and correctness with time zones.
  • Log your runs: Use MLflow autologging to capture parameters/metrics/artifacts automatically; it supports statsmodels, scikit-learn, Spark MLlib, and others in Databricks Runtime ML.

When PySpark ML lacks the model you need

  • Keep the data distributed, but run Python ML per group via pandas UDFs (statsmodels, sktime components that are series-local, etc.). This avoids a bulk conversion of the whole table to pandas, and lets you scale horizontally by adding workers.
  • Use distributed-capable alternatives where available: For tree/GBM, prefer XGBoost’s Spark estimators (xgboost.spark) to train models in parallel across the cluster instead of single-node xgboost Python APIs.
  • If single-node training is unavoidable, constrain to sampled subsets or per-series batch sizes that fit in memory, and parallelize across many series with pandas UDFs; cluster scale helps for large row counts, while adding columns often hurts more—plan sizing accordingly.
  •  

Minimal pattern: statsmodels per series with applyInPandas

 
```python import pandas as pd from statsmodels.tsa.seasonal import seasonal_decompose from pyspark.sql.types import StructType, StructField, StringType, TimestampType, DoubleType
# Spark DataFrame: columns [series_id, ts, value] schema = StructType([ StructField("series_id", StringType(), False), StructField("ts", TimestampType(), False), StructField("trend", DoubleType(), True), StructField("seasonal", DoubleType(), True), StructField("resid", DoubleType(), True), ])
def decompose(pdf: pd.DataFrame) -> pd.DataFrame: pdf = pdf.sort_values("ts") # set frequency/period appropriate for your cadence, e.g., period=24 for hourly daily seasonality result = seasonal_decompose(pdf["value"], period=24, model="additive", extrapolate_trend="freq") out = pd.DataFrame({ "series_id": pdf["series_id"], "ts": pdf["ts"], "trend": result.trend, "seasonal": result.seasonal, "resid": result.resid }) return out
# Maximize parallelism by ensuring one task per series (optional) n_tasks = df.select("series_id").distinct().count() spark.conf.set("spark.databricks.optimizer.adaptive.enabled", "false") decomp = ( df.repartition(n_tasks, "series_id") .groupBy("series_id") .applyInPandas(decompose, schema) ) ```
 

Practical tips

  • Prefer Pandas API on Spark for exploratory and feature engineering when it fits your workflow; it’s pandas-equivalent on Spark and avoids full pandas conversion.
  • Use pandas UDFs for the “bridge” to libraries that don’t support Spark DataFrames (statsmodels, SHAP, etc.), and tune partitions to use all cores across nodes efficiently.
  • Don’t collect 50GB to the driver; only move per-group batches as pandas via Arrow inside UDFs, and keep outputs as Spark DataFrames/Delta tables.
  • Track experiments with MLflow and, where possible, use Databricks Runtime ML for pre-installed ML libraries and optimized MLlib algorithms.
 
Hope this helps, Louis.