alexott
Databricks Employee
Databricks Employee

There are several libraries for Scala and Python that help with writing unit tests for Spark code.

For Scala you can use following:

  • Built-in Spark test suite - it's designed to test all parts of Spark. It supports RDD, Dataframe/Dataset, Streaming APIs
  • spark-testing-base - supports both Scala & Python. Supports RDD, Dataframe/Dataset, and Streaming APIs.
  • spark-fast-tests - supports both Spark 2 & 3, very simple, easy to use API. Easy to combine with Scalatest

For Python you can use also following (in addition to the spark-testing-base)

  • chispa - Python port of spark-fast-tests
  • pytest-spark - simplifies Spark session creation, natively integrated with pytest

Code examples for all of this libraries could be found in one place.

Here is small example of using Chispa:

from chispa import *
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
 
 
spark = SparkSession.builder.master("local").appName("chispa").getOrCreate()
 
 
def remove_non_word_characters(col):
    return F.regexp_replace(col, "[^\\w\\s]+", "")
 
 
def test_removes_non_word_characters_short():
    data = [("jo&&se", "jose"), ("**li**", "li"), ("#::luisa", "luisa"), (None, None)]
    df = spark.createDataFrame(data, ["name", "expected_name"]).withColumn(
        "clean_name", remove_non_word_characters(F.col("name"))
    )
    assert_column_equality(df, "clean_name", "expected_name")
 
 
def test_remove_non_word_characters_nice_error():
    data = [("matt7", "matt"), ("bill&", "bill"), ("isabela*", "isabela"), (None, None)]
    df = spark.createDataFrame(data, ["name", "expected_name"]).withColumn(
        "clean_name", remove_non_word_characters(F.col("name"))
    )
    assert_column_equality(df, "clean_name", "expected_name")