cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

from pyspark.ml.stat import KolmogorovSmirnovTest is not working on Serverless compute.

parthesh24
New Contributor II

 

Hi everyone,

I am trying to run a Kolmogorov–Smirnov (KS) test on a Spark DataFrame column in Databricks using the built-in pyspark.ml.stat.KolmogorovSmirnovTest. The goal is to apply the KS test directly on Spark DataFrame data without converting it into Pandas or NumPy.

Here’s the snippet I’m using:

from pyspark.ml.stat import KolmogorovSmirnovTest result = KolmogorovSmirnovTest.test(df, "value", "norm", 0.0, 1.0).collect()[0] print(result.statistic, result.pValue)

And the error I get is:

 

 
AssertionError:
File <command-8698289323550994>, line 26 23 else: 24 return df.sparkSession.createDataFrame([], "column S...

File /databricks/python/lib/python3.12/site-packages/pyspark/ml/stat.py:249, in KolmogorovSmirnovTest.test(dataset, sampleCol, distName, *params) 246 from pyspark.core.context import SparkContext 248 sc = SparkContext._active_spark_context --> 249 assert sc is not None 251 javaTestObj = getattr(_jvm(), "org.apache.spark.ml.stat.KolmogorovSmirnovTest") 252 dataset = _py2java(sc, dataset)



It seems like the KolmogorovSmirnovTest module isn’t supported in Serverless compute, or it behaves differently compared to standard clusters.

Has anyone faced this issue? Is KS test currently unsupported in Databricks Serverless, or is there a workaround?

Thanks in advance!

1 ACCEPTED SOLUTION

Accepted Solutions

szymon_dybczak
Esteemed Contributor III

If we're talking about this  KolmogorovSmirnovTest from this particular module -> pyspark.ml.stat - then no. The reason is explained in above answer.

When you look at soruce code we can clearly see sparkContext being used - so if you want to use it you have to change serverless to classic compute

class KolmogorovSmirnovTest:
    """
    Conduct the two-sided Kolmogorov Smirnov (KS) test for data sampled from a continuous
    distribution.

    By comparing the largest difference between the empirical cumulative
    distribution of the sample data and the theoretical distribution we can provide a test for the
    the null hypothesis that the sample data comes from that theoretical distribution.

    .. versionadded:: 2.4.0

    """

[docs]    @staticmethod
    def test(dataset: DataFrame, sampleCol: str, distName: str, *params: float) -> DataFrame:
        """
        Conduct a one-sample, two-sided Kolmogorov-Smirnov test for probability distribution
        equality. Currently supports the normal distribution, taking as parameters the mean and
        standard deviation.

        .. versionadded:: 2.4.0

        Parameters
        ----------
        dataset : :py:class:`pyspark.sql.DataFrame`
            a Dataset or a DataFrame containing the sample of data to test.
        sampleCol : str
            Name of sample column in dataset, of any numerical type.
        distName : str
            a `string` name for a theoretical distribution, currently only support "norm".
        params : float
            a list of `float` values specifying the parameters to be used for the theoretical
            distribution. For "norm" distribution, the parameters includes mean and variance.

        Returns
        -------
        A DataFrame that contains the Kolmogorov-Smirnov test result for the input sampled data.
        This DataFrame will contain a single Row with the following fields:

        - `pValue: Double`
        - `statistic: Double`

        Examples
        --------
        >>> from pyspark.ml.stat import KolmogorovSmirnovTest
        >>> dataset = [[-1.0], [0.0], [1.0]]
        >>> dataset = spark.createDataFrame(dataset, ['sample'])
        >>> ksResult = KolmogorovSmirnovTest.test(dataset, 'sample', 'norm', 0.0, 1.0).first()
        >>> round(ksResult.pValue, 3)
        1.0
        >>> round(ksResult.statistic, 3)
        0.175
        >>> dataset = [[2.0], [3.0], [4.0]]
        >>> dataset = spark.createDataFrame(dataset, ['sample'])
        >>> ksResult = KolmogorovSmirnovTest.test(dataset, 'sample', 'norm', 3.0, 1.0).first()
        >>> round(ksResult.pValue, 3)
        1.0
        >>> round(ksResult.statistic, 3)
        0.175
        """
        if is_remote():
            return invoke_helper_relation(
                "kolmogorovSmirnovTest",
                dataset,
                sampleCol,
                distName,
                ([float(p) for p in params], ArrayType(DoubleType())),
            )

        else:
            from pyspark.core.context import SparkContext

            sc = SparkContext._active_spark_context
            assert sc is not None

            javaTestObj = getattr(_jvm(), "org.apache.spark.ml.stat.KolmogorovSmirnovTest")
            dataset = _py2java(sc, dataset)
            params = [float(param) for param in params]  # type: ignore[assignment]
            return _java2py(
                sc,
                javaTestObj.test(
                    dataset,
                    sampleCol,
                    distName,
                    _jvm().PythonUtils.toSeq(params),
                ),
            )

 

So you can try to search for different implementation of KolomogorvSmirnovTest that doesn't use sparkContext. Try to search github and maybe you'll find somethig that will work for you. Maybe something like below:

KS-2Samp-PySparkSQL/ks_2samp_sparksql.py at master · Davi-Schumacher/KS-2Samp-PySparkSQL · GitHub

And keep in mind that serverless has other limitations as well:

Serverless compute limitations - Azure Databricks | Microsoft Learn

View solution in original post

3 REPLIES 3

szymon_dybczak
Esteemed Contributor III

 

Hi @parthesh24 ,

It looks more like KolmogorovSmirnovTest module under the hood is trying to access SparkContext which is not supported in serverless. 

szymon_dybczak_0-1759219675962.png

 

You can check it yourself by trying to use sparkContext in serverless 🙂

So, is there any way I can perform KolmogorovSmirnovTest in serverless compute?

szymon_dybczak
Esteemed Contributor III

If we're talking about this  KolmogorovSmirnovTest from this particular module -> pyspark.ml.stat - then no. The reason is explained in above answer.

When you look at soruce code we can clearly see sparkContext being used - so if you want to use it you have to change serverless to classic compute

class KolmogorovSmirnovTest:
    """
    Conduct the two-sided Kolmogorov Smirnov (KS) test for data sampled from a continuous
    distribution.

    By comparing the largest difference between the empirical cumulative
    distribution of the sample data and the theoretical distribution we can provide a test for the
    the null hypothesis that the sample data comes from that theoretical distribution.

    .. versionadded:: 2.4.0

    """

[docs]    @staticmethod
    def test(dataset: DataFrame, sampleCol: str, distName: str, *params: float) -> DataFrame:
        """
        Conduct a one-sample, two-sided Kolmogorov-Smirnov test for probability distribution
        equality. Currently supports the normal distribution, taking as parameters the mean and
        standard deviation.

        .. versionadded:: 2.4.0

        Parameters
        ----------
        dataset : :py:class:`pyspark.sql.DataFrame`
            a Dataset or a DataFrame containing the sample of data to test.
        sampleCol : str
            Name of sample column in dataset, of any numerical type.
        distName : str
            a `string` name for a theoretical distribution, currently only support "norm".
        params : float
            a list of `float` values specifying the parameters to be used for the theoretical
            distribution. For "norm" distribution, the parameters includes mean and variance.

        Returns
        -------
        A DataFrame that contains the Kolmogorov-Smirnov test result for the input sampled data.
        This DataFrame will contain a single Row with the following fields:

        - `pValue: Double`
        - `statistic: Double`

        Examples
        --------
        >>> from pyspark.ml.stat import KolmogorovSmirnovTest
        >>> dataset = [[-1.0], [0.0], [1.0]]
        >>> dataset = spark.createDataFrame(dataset, ['sample'])
        >>> ksResult = KolmogorovSmirnovTest.test(dataset, 'sample', 'norm', 0.0, 1.0).first()
        >>> round(ksResult.pValue, 3)
        1.0
        >>> round(ksResult.statistic, 3)
        0.175
        >>> dataset = [[2.0], [3.0], [4.0]]
        >>> dataset = spark.createDataFrame(dataset, ['sample'])
        >>> ksResult = KolmogorovSmirnovTest.test(dataset, 'sample', 'norm', 3.0, 1.0).first()
        >>> round(ksResult.pValue, 3)
        1.0
        >>> round(ksResult.statistic, 3)
        0.175
        """
        if is_remote():
            return invoke_helper_relation(
                "kolmogorovSmirnovTest",
                dataset,
                sampleCol,
                distName,
                ([float(p) for p in params], ArrayType(DoubleType())),
            )

        else:
            from pyspark.core.context import SparkContext

            sc = SparkContext._active_spark_context
            assert sc is not None

            javaTestObj = getattr(_jvm(), "org.apache.spark.ml.stat.KolmogorovSmirnovTest")
            dataset = _py2java(sc, dataset)
            params = [float(param) for param in params]  # type: ignore[assignment]
            return _java2py(
                sc,
                javaTestObj.test(
                    dataset,
                    sampleCol,
                    distName,
                    _jvm().PythonUtils.toSeq(params),
                ),
            )

 

So you can try to search for different implementation of KolomogorvSmirnovTest that doesn't use sparkContext. Try to search github and maybe you'll find somethig that will work for you. Maybe something like below:

KS-2Samp-PySparkSQL/ks_2samp_sparksql.py at master · Davi-Schumacher/KS-2Samp-PySparkSQL · GitHub

And keep in mind that serverless has other limitations as well:

Serverless compute limitations - Azure Databricks | Microsoft Learn

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now