<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: Working with pyspark dataframe with machine learning libraries / statistical model libraries in Machine Learning</title>
    <link>https://community.databricks.com/t5/machine-learning/working-with-pyspark-dataframe-with-machine-learning-libraries/m-p/136522#M4381</link>
    <description>&lt;P&gt;Greetings&amp;nbsp;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/131266"&gt;@javeed&lt;/a&gt;&amp;nbsp;,&amp;nbsp; &amp;nbsp;You’re right to call out the friction between a PySpark DataFrame and many Python ML libraries like statsmodels; most Python ML stacks expect pandas, while Spark is distributed-first. Here’s how to bridge that gap efficiently for 50GB time series workloads on Databricks.&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;H3 class="paragraph"&gt;What works at scale on Databricks&lt;/H3&gt;
&lt;UL&gt;
&lt;LI class="paragraph"&gt;&lt;STRONG&gt;Pandas API on Spark&lt;/STRONG&gt; lets you write pandas-style code that executes on Spark, avoiding single-node pandas OOM and scaling out across a cluster while preserving familiar idioms. Import as &lt;CODE&gt;pyspark.pandas as ps&lt;/CODE&gt; and use it when you can keep your logic “pandas-like.”&lt;/LI&gt;
&lt;LI&gt;
&lt;DIV class="paragraph"&gt;&lt;STRONG&gt;Pandas UDFs and pandas function APIs (applyInPandas, mapInPandas, cogroup.applyInPandas)&lt;/STRONG&gt; let you run arbitrary Python/pandas code (including statsmodels) against Spark partitions or groups with Arrow-optimized transfer. This is the main interoperability pattern: you group by your series key(s), materialize each group as a pandas DataFrame inside the UDF, and run statsmodels there. It’s proven in production and documented with examples and constraints.&lt;/DIV&gt;
&lt;/LI&gt;
&lt;LI&gt;
&lt;DIV class="paragraph"&gt;&lt;STRONG&gt;Statsmodels with Spark via grouped Pandas UDFs&lt;/STRONG&gt;: Databricks’ examples show OLS per group with statsmodels in a grouped Pandas UDF, and SARIMAX time-series forecasting scaled across thousands of SKUs using groupBy + applyInPandas. The same pattern applies to decomposition (trend/seasonal/residual) or ARIMA/ETS per series.&lt;/DIV&gt;
&lt;/LI&gt;
&lt;/UL&gt;
&lt;H3 class="paragraph"&gt;Recommended design pattern for time series at scale&lt;/H3&gt;
&lt;DIV class="paragraph"&gt;Use “many small models” on grouped data with pandas function APIs:&lt;/DIV&gt;
&lt;UL&gt;
&lt;LI&gt;
&lt;DIV class="paragraph"&gt;&lt;STRONG&gt;Partition by series key and parallelize per series&lt;/STRONG&gt;: &lt;CODE&gt;df.repartition(n_tasks, key...).groupBy(key...).applyInPandas(fn, schema)&lt;/CODE&gt; so each series (or series group) is processed independently with your statsmodels code. Temporarily disabling AQE helps ensure one Spark task per group when maximizing parallelism in training steps.&lt;/DIV&gt;
&lt;/LI&gt;
&lt;LI&gt;
&lt;DIV class="paragraph"&gt;&lt;STRONG&gt;Control per-group memory&lt;/STRONG&gt;: All rows in a group are loaded to a worker before your function runs; keep groups bounded and watch skew. If a few series are huge, sub-sample, window, or split them to avoid OOM within the worker.&lt;/DIV&gt;
&lt;/LI&gt;
&lt;LI&gt;
&lt;DIV class="paragraph"&gt;&lt;STRONG&gt;Tune Arrow batch size for pandas UDFs&lt;/STRONG&gt;: Adjust &lt;CODE&gt;spark.sql.execution.arrow.maxRecordsPerBatch&lt;/CODE&gt; when many columns (wide features) drive high JVM memory during Arrow conversion.&lt;/DIV&gt;
&lt;/LI&gt;
&lt;LI&gt;
&lt;DIV class="paragraph"&gt;&lt;STRONG&gt;Mind timestamps&lt;/STRONG&gt;: Spark stores timestamps as UTC; prefer pandas-native time series operations inside the UDF for best performance and correctness with time zones.&lt;/DIV&gt;
&lt;/LI&gt;
&lt;LI&gt;
&lt;DIV class="paragraph"&gt;&lt;STRONG&gt;Log your runs&lt;/STRONG&gt;: Use &lt;STRONG&gt;MLflow autologging&lt;/STRONG&gt; to capture parameters/metrics/artifacts automatically; it supports statsmodels, scikit-learn, Spark MLlib, and others in Databricks Runtime ML.&lt;/DIV&gt;
&lt;/LI&gt;
&lt;/UL&gt;
&lt;H3 class="paragraph"&gt;When PySpark ML lacks the model you need&lt;/H3&gt;
&lt;UL&gt;
&lt;LI class="paragraph"&gt;&lt;STRONG&gt;Keep the data distributed, but run Python ML per group via pandas UDFs&lt;/STRONG&gt; (statsmodels, sktime components that are series-local, etc.). This avoids a bulk conversion of the whole table to pandas, and lets you scale horizontally by adding workers.&lt;/LI&gt;
&lt;LI&gt;
&lt;DIV class="paragraph"&gt;&lt;STRONG&gt;Use distributed-capable alternatives where available&lt;/STRONG&gt;: For tree/GBM, prefer XGBoost’s Spark estimators (&lt;CODE&gt;xgboost.spark&lt;/CODE&gt;) to train models in parallel across the cluster instead of single-node &lt;CODE&gt;xgboost&lt;/CODE&gt; Python APIs.&lt;/DIV&gt;
&lt;/LI&gt;
&lt;LI&gt;
&lt;DIV class="paragraph"&gt;&lt;STRONG&gt;If single-node training is unavoidable&lt;/STRONG&gt;, constrain to sampled subsets or per-series batch sizes that fit in memory, and parallelize across many series with pandas UDFs; cluster scale helps for large row counts, while adding columns often hurts more—plan sizing accordingly.&lt;/DIV&gt;
&lt;/LI&gt;
&lt;LI&gt;&amp;nbsp;&lt;/LI&gt;
&lt;/UL&gt;
&lt;H3 class="paragraph"&gt;Minimal pattern: statsmodels per series with applyInPandas&lt;/H3&gt;
&lt;DIV class="paragraph"&gt;&amp;nbsp;&lt;/DIV&gt;
&lt;DIV class="paragraph"&gt;```python import pandas as pd from statsmodels.tsa.seasonal import seasonal_decompose from pyspark.sql.types import StructType, StructField, StringType, TimestampType, DoubleType&lt;/DIV&gt;
&lt;DIV class="paragraph"&gt;# Spark DataFrame: columns [series_id, ts, value] schema = StructType([ StructField("series_id", StringType(), False), StructField("ts", TimestampType(), False), StructField("trend", DoubleType(), True), StructField("seasonal", DoubleType(), True), StructField("resid", DoubleType(), True), ])&lt;/DIV&gt;
&lt;DIV class="paragraph"&gt;def decompose(pdf: pd.DataFrame) -&amp;gt; pd.DataFrame: pdf = pdf.sort_values("ts") # set frequency/period appropriate for your cadence, e.g., period=24 for hourly daily seasonality result = seasonal_decompose(pdf["value"], period=24, model="additive", extrapolate_trend="freq") out = pd.DataFrame({ "series_id": pdf["series_id"], "ts": pdf["ts"], "trend": result.trend, "seasonal": result.seasonal, "resid": result.resid }) return out&lt;/DIV&gt;
&lt;DIV class="paragraph"&gt;# Maximize parallelism by ensuring one task per series (optional) n_tasks = df.select("series_id").distinct().count() spark.conf.set("spark.databricks.optimizer.adaptive.enabled", "false") decomp = ( df.repartition(n_tasks, "series_id") .groupBy("series_id") .applyInPandas(decompose, schema) ) ```&lt;/DIV&gt;
&lt;DIV class="paragraph"&gt;&amp;nbsp;&lt;/DIV&gt;
&lt;H3 class="paragraph"&gt;Practical tips&lt;/H3&gt;
&lt;UL&gt;
&lt;LI class="paragraph"&gt;&lt;STRONG&gt;Prefer Pandas API on Spark&lt;/STRONG&gt; for exploratory and feature engineering when it fits your workflow; it’s pandas-equivalent on Spark and avoids full pandas conversion.&lt;/LI&gt;
&lt;LI&gt;
&lt;DIV class="paragraph"&gt;&lt;STRONG&gt;Use pandas UDFs for the “bridge”&lt;/STRONG&gt; to libraries that don’t support Spark DataFrames (statsmodels, SHAP, etc.), and tune partitions to use all cores across nodes efficiently.&lt;/DIV&gt;
&lt;/LI&gt;
&lt;LI&gt;
&lt;DIV class="paragraph"&gt;&lt;STRONG&gt;Don’t collect 50GB to the driver&lt;/STRONG&gt;; only move per-group batches as pandas via Arrow inside UDFs, and keep outputs as Spark DataFrames/Delta tables.&lt;/DIV&gt;
&lt;/LI&gt;
&lt;LI&gt;
&lt;DIV class="paragraph"&gt;&lt;STRONG&gt;Track experiments with MLflow&lt;/STRONG&gt; and, where possible, use Databricks Runtime ML for pre-installed ML libraries and optimized MLlib algorithms.&lt;/DIV&gt;
&lt;/LI&gt;
&lt;/UL&gt;
&lt;DIV class="paragraph"&gt;&amp;nbsp;&lt;/DIV&gt;
&lt;DIV class="paragraph"&gt;Hope this helps, Louis.&lt;/DIV&gt;</description>
    <pubDate>Wed, 29 Oct 2025 09:15:59 GMT</pubDate>
    <dc:creator>Louis_Frolio</dc:creator>
    <dc:date>2025-10-29T09:15:59Z</dc:date>
    <item>
      <title>Working with pyspark dataframe with machine learning libraries / statistical model libraries</title>
      <link>https://community.databricks.com/t5/machine-learning/working-with-pyspark-dataframe-with-machine-learning-libraries/m-p/109932#M3963</link>
      <description>&lt;P&gt;Hi Team,&amp;nbsp;&lt;BR /&gt;&lt;BR /&gt;I am working with huge volume of data (50GB) and i decompose the time series data using the statsmodel.&lt;BR /&gt;&lt;BR /&gt;Having said that the major challenge i am facing is the compatibility of the pyspark dataframe with the machine learning algorithms. although the pysaprk.ml is available but many of the models are not available.&lt;BR /&gt;&lt;BR /&gt;My question is on a broader level ,How can we handle this compatibility issue ?&amp;nbsp;&lt;BR /&gt;&lt;BR /&gt;Converting pyspark to pandas has not been proven to be very efficient.&lt;/P&gt;</description>
      <pubDate>Wed, 12 Feb 2025 06:10:41 GMT</pubDate>
      <guid>https://community.databricks.com/t5/machine-learning/working-with-pyspark-dataframe-with-machine-learning-libraries/m-p/109932#M3963</guid>
      <dc:creator>javeed</dc:creator>
      <dc:date>2025-02-12T06:10:41Z</dc:date>
    </item>
    <item>
      <title>Re: Working with pyspark dataframe with machine learning libraries / statistical model libraries</title>
      <link>https://community.databricks.com/t5/machine-learning/working-with-pyspark-dataframe-with-machine-learning-libraries/m-p/136522#M4381</link>
      <description>&lt;P&gt;Greetings&amp;nbsp;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/131266"&gt;@javeed&lt;/a&gt;&amp;nbsp;,&amp;nbsp; &amp;nbsp;You’re right to call out the friction between a PySpark DataFrame and many Python ML libraries like statsmodels; most Python ML stacks expect pandas, while Spark is distributed-first. Here’s how to bridge that gap efficiently for 50GB time series workloads on Databricks.&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;H3 class="paragraph"&gt;What works at scale on Databricks&lt;/H3&gt;
&lt;UL&gt;
&lt;LI class="paragraph"&gt;&lt;STRONG&gt;Pandas API on Spark&lt;/STRONG&gt; lets you write pandas-style code that executes on Spark, avoiding single-node pandas OOM and scaling out across a cluster while preserving familiar idioms. Import as &lt;CODE&gt;pyspark.pandas as ps&lt;/CODE&gt; and use it when you can keep your logic “pandas-like.”&lt;/LI&gt;
&lt;LI&gt;
&lt;DIV class="paragraph"&gt;&lt;STRONG&gt;Pandas UDFs and pandas function APIs (applyInPandas, mapInPandas, cogroup.applyInPandas)&lt;/STRONG&gt; let you run arbitrary Python/pandas code (including statsmodels) against Spark partitions or groups with Arrow-optimized transfer. This is the main interoperability pattern: you group by your series key(s), materialize each group as a pandas DataFrame inside the UDF, and run statsmodels there. It’s proven in production and documented with examples and constraints.&lt;/DIV&gt;
&lt;/LI&gt;
&lt;LI&gt;
&lt;DIV class="paragraph"&gt;&lt;STRONG&gt;Statsmodels with Spark via grouped Pandas UDFs&lt;/STRONG&gt;: Databricks’ examples show OLS per group with statsmodels in a grouped Pandas UDF, and SARIMAX time-series forecasting scaled across thousands of SKUs using groupBy + applyInPandas. The same pattern applies to decomposition (trend/seasonal/residual) or ARIMA/ETS per series.&lt;/DIV&gt;
&lt;/LI&gt;
&lt;/UL&gt;
&lt;H3 class="paragraph"&gt;Recommended design pattern for time series at scale&lt;/H3&gt;
&lt;DIV class="paragraph"&gt;Use “many small models” on grouped data with pandas function APIs:&lt;/DIV&gt;
&lt;UL&gt;
&lt;LI&gt;
&lt;DIV class="paragraph"&gt;&lt;STRONG&gt;Partition by series key and parallelize per series&lt;/STRONG&gt;: &lt;CODE&gt;df.repartition(n_tasks, key...).groupBy(key...).applyInPandas(fn, schema)&lt;/CODE&gt; so each series (or series group) is processed independently with your statsmodels code. Temporarily disabling AQE helps ensure one Spark task per group when maximizing parallelism in training steps.&lt;/DIV&gt;
&lt;/LI&gt;
&lt;LI&gt;
&lt;DIV class="paragraph"&gt;&lt;STRONG&gt;Control per-group memory&lt;/STRONG&gt;: All rows in a group are loaded to a worker before your function runs; keep groups bounded and watch skew. If a few series are huge, sub-sample, window, or split them to avoid OOM within the worker.&lt;/DIV&gt;
&lt;/LI&gt;
&lt;LI&gt;
&lt;DIV class="paragraph"&gt;&lt;STRONG&gt;Tune Arrow batch size for pandas UDFs&lt;/STRONG&gt;: Adjust &lt;CODE&gt;spark.sql.execution.arrow.maxRecordsPerBatch&lt;/CODE&gt; when many columns (wide features) drive high JVM memory during Arrow conversion.&lt;/DIV&gt;
&lt;/LI&gt;
&lt;LI&gt;
&lt;DIV class="paragraph"&gt;&lt;STRONG&gt;Mind timestamps&lt;/STRONG&gt;: Spark stores timestamps as UTC; prefer pandas-native time series operations inside the UDF for best performance and correctness with time zones.&lt;/DIV&gt;
&lt;/LI&gt;
&lt;LI&gt;
&lt;DIV class="paragraph"&gt;&lt;STRONG&gt;Log your runs&lt;/STRONG&gt;: Use &lt;STRONG&gt;MLflow autologging&lt;/STRONG&gt; to capture parameters/metrics/artifacts automatically; it supports statsmodels, scikit-learn, Spark MLlib, and others in Databricks Runtime ML.&lt;/DIV&gt;
&lt;/LI&gt;
&lt;/UL&gt;
&lt;H3 class="paragraph"&gt;When PySpark ML lacks the model you need&lt;/H3&gt;
&lt;UL&gt;
&lt;LI class="paragraph"&gt;&lt;STRONG&gt;Keep the data distributed, but run Python ML per group via pandas UDFs&lt;/STRONG&gt; (statsmodels, sktime components that are series-local, etc.). This avoids a bulk conversion of the whole table to pandas, and lets you scale horizontally by adding workers.&lt;/LI&gt;
&lt;LI&gt;
&lt;DIV class="paragraph"&gt;&lt;STRONG&gt;Use distributed-capable alternatives where available&lt;/STRONG&gt;: For tree/GBM, prefer XGBoost’s Spark estimators (&lt;CODE&gt;xgboost.spark&lt;/CODE&gt;) to train models in parallel across the cluster instead of single-node &lt;CODE&gt;xgboost&lt;/CODE&gt; Python APIs.&lt;/DIV&gt;
&lt;/LI&gt;
&lt;LI&gt;
&lt;DIV class="paragraph"&gt;&lt;STRONG&gt;If single-node training is unavoidable&lt;/STRONG&gt;, constrain to sampled subsets or per-series batch sizes that fit in memory, and parallelize across many series with pandas UDFs; cluster scale helps for large row counts, while adding columns often hurts more—plan sizing accordingly.&lt;/DIV&gt;
&lt;/LI&gt;
&lt;LI&gt;&amp;nbsp;&lt;/LI&gt;
&lt;/UL&gt;
&lt;H3 class="paragraph"&gt;Minimal pattern: statsmodels per series with applyInPandas&lt;/H3&gt;
&lt;DIV class="paragraph"&gt;&amp;nbsp;&lt;/DIV&gt;
&lt;DIV class="paragraph"&gt;```python import pandas as pd from statsmodels.tsa.seasonal import seasonal_decompose from pyspark.sql.types import StructType, StructField, StringType, TimestampType, DoubleType&lt;/DIV&gt;
&lt;DIV class="paragraph"&gt;# Spark DataFrame: columns [series_id, ts, value] schema = StructType([ StructField("series_id", StringType(), False), StructField("ts", TimestampType(), False), StructField("trend", DoubleType(), True), StructField("seasonal", DoubleType(), True), StructField("resid", DoubleType(), True), ])&lt;/DIV&gt;
&lt;DIV class="paragraph"&gt;def decompose(pdf: pd.DataFrame) -&amp;gt; pd.DataFrame: pdf = pdf.sort_values("ts") # set frequency/period appropriate for your cadence, e.g., period=24 for hourly daily seasonality result = seasonal_decompose(pdf["value"], period=24, model="additive", extrapolate_trend="freq") out = pd.DataFrame({ "series_id": pdf["series_id"], "ts": pdf["ts"], "trend": result.trend, "seasonal": result.seasonal, "resid": result.resid }) return out&lt;/DIV&gt;
&lt;DIV class="paragraph"&gt;# Maximize parallelism by ensuring one task per series (optional) n_tasks = df.select("series_id").distinct().count() spark.conf.set("spark.databricks.optimizer.adaptive.enabled", "false") decomp = ( df.repartition(n_tasks, "series_id") .groupBy("series_id") .applyInPandas(decompose, schema) ) ```&lt;/DIV&gt;
&lt;DIV class="paragraph"&gt;&amp;nbsp;&lt;/DIV&gt;
&lt;H3 class="paragraph"&gt;Practical tips&lt;/H3&gt;
&lt;UL&gt;
&lt;LI class="paragraph"&gt;&lt;STRONG&gt;Prefer Pandas API on Spark&lt;/STRONG&gt; for exploratory and feature engineering when it fits your workflow; it’s pandas-equivalent on Spark and avoids full pandas conversion.&lt;/LI&gt;
&lt;LI&gt;
&lt;DIV class="paragraph"&gt;&lt;STRONG&gt;Use pandas UDFs for the “bridge”&lt;/STRONG&gt; to libraries that don’t support Spark DataFrames (statsmodels, SHAP, etc.), and tune partitions to use all cores across nodes efficiently.&lt;/DIV&gt;
&lt;/LI&gt;
&lt;LI&gt;
&lt;DIV class="paragraph"&gt;&lt;STRONG&gt;Don’t collect 50GB to the driver&lt;/STRONG&gt;; only move per-group batches as pandas via Arrow inside UDFs, and keep outputs as Spark DataFrames/Delta tables.&lt;/DIV&gt;
&lt;/LI&gt;
&lt;LI&gt;
&lt;DIV class="paragraph"&gt;&lt;STRONG&gt;Track experiments with MLflow&lt;/STRONG&gt; and, where possible, use Databricks Runtime ML for pre-installed ML libraries and optimized MLlib algorithms.&lt;/DIV&gt;
&lt;/LI&gt;
&lt;/UL&gt;
&lt;DIV class="paragraph"&gt;&amp;nbsp;&lt;/DIV&gt;
&lt;DIV class="paragraph"&gt;Hope this helps, Louis.&lt;/DIV&gt;</description>
      <pubDate>Wed, 29 Oct 2025 09:15:59 GMT</pubDate>
      <guid>https://community.databricks.com/t5/machine-learning/working-with-pyspark-dataframe-with-machine-learning-libraries/m-p/136522#M4381</guid>
      <dc:creator>Louis_Frolio</dc:creator>
      <dc:date>2025-10-29T09:15:59Z</dc:date>
    </item>
  </channel>
</rss>

