cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Machine Learning
Dive into the world of machine learning on the Databricks platform. Explore discussions on algorithms, model training, deployment, and more. Connect with ML enthusiasts and experts.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Working with pyspark dataframe with machine learning libraries / statistical model libraries

javeed
New Contributor

Hi Team, 

I am working with huge volume of data (50GB) and i decompose the time series data using the statsmodel.

Having said that the major challenge i am facing is the compatibility of the pyspark dataframe with the machine learning algorithms. although the pysaprk.ml is available but many of the models are not available.

My question is on a broader level ,How can we handle this compatibility issue ? 

Converting pyspark to pandas has not been proven to be very efficient.

1 REPLY 1

Louis_Frolio
Databricks Employee
Databricks Employee

Greetings @javeed ,   Youโ€™re right to call out the friction between a PySpark DataFrame and many Python ML libraries like statsmodels; most Python ML stacks expect pandas, while Spark is distributed-first. Hereโ€™s how to bridge that gap efficiently for 50GB time series workloads on Databricks.

 

What works at scale on Databricks

  • Pandas API on Spark lets you write pandas-style code that executes on Spark, avoiding single-node pandas OOM and scaling out across a cluster while preserving familiar idioms. Import as pyspark.pandas as ps and use it when you can keep your logic โ€œpandas-like.โ€
  • Pandas UDFs and pandas function APIs (applyInPandas, mapInPandas, cogroup.applyInPandas) let you run arbitrary Python/pandas code (including statsmodels) against Spark partitions or groups with Arrow-optimized transfer. This is the main interoperability pattern: you group by your series key(s), materialize each group as a pandas DataFrame inside the UDF, and run statsmodels there. Itโ€™s proven in production and documented with examples and constraints.
  • Statsmodels with Spark via grouped Pandas UDFs: Databricksโ€™ examples show OLS per group with statsmodels in a grouped Pandas UDF, and SARIMAX time-series forecasting scaled across thousands of SKUs using groupBy + applyInPandas. The same pattern applies to decomposition (trend/seasonal/residual) or ARIMA/ETS per series.

Recommended design pattern for time series at scale

Use โ€œmany small modelsโ€ on grouped data with pandas function APIs:
  • Partition by series key and parallelize per series: df.repartition(n_tasks, key...).groupBy(key...).applyInPandas(fn, schema) so each series (or series group) is processed independently with your statsmodels code. Temporarily disabling AQE helps ensure one Spark task per group when maximizing parallelism in training steps.
  • Control per-group memory: All rows in a group are loaded to a worker before your function runs; keep groups bounded and watch skew. If a few series are huge, sub-sample, window, or split them to avoid OOM within the worker.
  • Tune Arrow batch size for pandas UDFs: Adjust spark.sql.execution.arrow.maxRecordsPerBatch when many columns (wide features) drive high JVM memory during Arrow conversion.
  • Mind timestamps: Spark stores timestamps as UTC; prefer pandas-native time series operations inside the UDF for best performance and correctness with time zones.
  • Log your runs: Use MLflow autologging to capture parameters/metrics/artifacts automatically; it supports statsmodels, scikit-learn, Spark MLlib, and others in Databricks Runtime ML.

When PySpark ML lacks the model you need

  • Keep the data distributed, but run Python ML per group via pandas UDFs (statsmodels, sktime components that are series-local, etc.). This avoids a bulk conversion of the whole table to pandas, and lets you scale horizontally by adding workers.
  • Use distributed-capable alternatives where available: For tree/GBM, prefer XGBoostโ€™s Spark estimators (xgboost.spark) to train models in parallel across the cluster instead of single-node xgboost Python APIs.
  • If single-node training is unavoidable, constrain to sampled subsets or per-series batch sizes that fit in memory, and parallelize across many series with pandas UDFs; cluster scale helps for large row counts, while adding columns often hurts moreโ€”plan sizing accordingly.
  •  

Minimal pattern: statsmodels per series with applyInPandas

 
```python import pandas as pd from statsmodels.tsa.seasonal import seasonal_decompose from pyspark.sql.types import StructType, StructField, StringType, TimestampType, DoubleType
# Spark DataFrame: columns [series_id, ts, value] schema = StructType([ StructField("series_id", StringType(), False), StructField("ts", TimestampType(), False), StructField("trend", DoubleType(), True), StructField("seasonal", DoubleType(), True), StructField("resid", DoubleType(), True), ])
def decompose(pdf: pd.DataFrame) -> pd.DataFrame: pdf = pdf.sort_values("ts") # set frequency/period appropriate for your cadence, e.g., period=24 for hourly daily seasonality result = seasonal_decompose(pdf["value"], period=24, model="additive", extrapolate_trend="freq") out = pd.DataFrame({ "series_id": pdf["series_id"], "ts": pdf["ts"], "trend": result.trend, "seasonal": result.seasonal, "resid": result.resid }) return out
# Maximize parallelism by ensuring one task per series (optional) n_tasks = df.select("series_id").distinct().count() spark.conf.set("spark.databricks.optimizer.adaptive.enabled", "false") decomp = ( df.repartition(n_tasks, "series_id") .groupBy("series_id") .applyInPandas(decompose, schema) ) ```
 

Practical tips

  • Prefer Pandas API on Spark for exploratory and feature engineering when it fits your workflow; itโ€™s pandas-equivalent on Spark and avoids full pandas conversion.
  • Use pandas UDFs for the โ€œbridgeโ€ to libraries that donโ€™t support Spark DataFrames (statsmodels, SHAP, etc.), and tune partitions to use all cores across nodes efficiently.
  • Donโ€™t collect 50GB to the driver; only move per-group batches as pandas via Arrow inside UDFs, and keep outputs as Spark DataFrames/Delta tables.
  • Track experiments with MLflow and, where possible, use Databricks Runtime ML for pre-installed ML libraries and optimized MLlib algorithms.
 
Hope this helps, Louis.