cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

What libraries could be used for unit testing of the Spark code?

alexott
Valued Contributor II
Valued Contributor II

We need to add unit test cases for our code that we're writing using the Scala in Python. But we can't use the calls like `assertEqual` for comparing the content of DataFrames. Are any special libraries for that?

1 REPLY 1

alexott
Valued Contributor II
Valued Contributor II

There are several libraries for Scala and Python that help with writing unit tests for Spark code.

For Scala you can use following:

  • Built-in Spark test suite - it's designed to test all parts of Spark. It supports RDD, Dataframe/Dataset, Streaming APIs
  • spark-testing-base - supports both Scala & Python. Supports RDD, Dataframe/Dataset, and Streaming APIs.
  • spark-fast-tests - supports both Spark 2 & 3, very simple, easy to use API. Easy to combine with Scalatest

For Python you can use also following (in addition to the spark-testing-base)

  • chispa - Python port of spark-fast-tests
  • pytest-spark - simplifies Spark session creation, natively integrated with pytest

Code examples for all of this libraries could be found in one place.

Here is small example of using Chispa:

from chispa import *
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
 
 
spark = SparkSession.builder.master("local").appName("chispa").getOrCreate()
 
 
def remove_non_word_characters(col):
    return F.regexp_replace(col, "[^\\w\\s]+", "")
 
 
def test_removes_non_word_characters_short():
    data = [("jo&&se", "jose"), ("**li**", "li"), ("#::luisa", "luisa"), (None, None)]
    df = spark.createDataFrame(data, ["name", "expected_name"]).withColumn(
        "clean_name", remove_non_word_characters(F.col("name"))
    )
    assert_column_equality(df, "clean_name", "expected_name")
 
 
def test_remove_non_word_characters_nice_error():
    data = [("matt7", "matt"), ("bill&", "bill"), ("isabela*", "isabela"), (None, None)]
    df = spark.createDataFrame(data, ["name", "expected_name"]).withColumn(
        "clean_name", remove_non_word_characters(F.col("name"))
    )
    assert_column_equality(df, "clean_name", "expected_name")