There are several libraries for Scala and Python that help with writing unit tests for Spark code.
For Scala you can use following:
- Built-in Spark test suite - it's designed to test all parts of Spark. It supports RDD, Dataframe/Dataset, Streaming APIs
- spark-testing-base - supports both Scala & Python. Supports RDD, Dataframe/Dataset, and Streaming APIs.
- spark-fast-tests - supports both Spark 2 & 3, very simple, easy to use API. Easy to combine with Scalatest
For Python you can use also following (in addition to the spark-testing-base)
- chispa - Python port of spark-fast-tests
- pytest-spark - simplifies Spark session creation, natively integrated with pytest
Code examples for all of this libraries could be found in one place.
Here is small example of using Chispa:
from chispa import *
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("chispa").getOrCreate()
def remove_non_word_characters(col):
return F.regexp_replace(col, "[^\\w\\s]+", "")
def test_removes_non_word_characters_short():
data = [("jo&&se", "jose"), ("**li**", "li"), ("#::luisa", "luisa"), (None, None)]
df = spark.createDataFrame(data, ["name", "expected_name"]).withColumn(
"clean_name", remove_non_word_characters(F.col("name"))
)
assert_column_equality(df, "clean_name", "expected_name")
def test_remove_non_word_characters_nice_error():
data = [("matt7", "matt"), ("bill&", "bill"), ("isabela*", "isabela"), (None, None)]
df = spark.createDataFrame(data, ["name", "expected_name"]).withColumn(
"clean_name", remove_non_word_characters(F.col("name"))
)
assert_column_equality(df, "clean_name", "expected_name")