cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

What libraries could be used for unit testing of the Spark code?

alexott
Valued Contributor II
Valued Contributor II

We need to add unit test cases for our code that we're writing using the Scala in Python. But we can't use the calls like `assertEqual` for comparing the content of DataFrames. Are any special libraries for that?

1 REPLY 1

alexott
Valued Contributor II
Valued Contributor II

There are several libraries for Scala and Python that help with writing unit tests for Spark code.

For Scala you can use following:

  • Built-in Spark test suite - it's designed to test all parts of Spark. It supports RDD, Dataframe/Dataset, Streaming APIs
  • spark-testing-base - supports both Scala & Python. Supports RDD, Dataframe/Dataset, and Streaming APIs.
  • spark-fast-tests - supports both Spark 2 & 3, very simple, easy to use API. Easy to combine with Scalatest

For Python you can use also following (in addition to the spark-testing-base)

  • chispa - Python port of spark-fast-tests
  • pytest-spark - simplifies Spark session creation, natively integrated with pytest

Code examples for all of this libraries could be found in one place.

Here is small example of using Chispa:

from chispa import *
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
 
 
spark = SparkSession.builder.master("local").appName("chispa").getOrCreate()
 
 
def remove_non_word_characters(col):
    return F.regexp_replace(col, "[^\\w\\s]+", "")
 
 
def test_removes_non_word_characters_short():
    data = [("jo&&se", "jose"), ("**li**", "li"), ("#::luisa", "luisa"), (None, None)]
    df = spark.createDataFrame(data, ["name", "expected_name"]).withColumn(
        "clean_name", remove_non_word_characters(F.col("name"))
    )
    assert_column_equality(df, "clean_name", "expected_name")
 
 
def test_remove_non_word_characters_nice_error():
    data = [("matt7", "matt"), ("bill&", "bill"), ("isabela*", "isabela"), (None, None)]
    df = spark.createDataFrame(data, ["name", "expected_name"]).withColumn(
        "clean_name", remove_non_word_characters(F.col("name"))
    )
    assert_column_equality(df, "clean_name", "expected_name")

Join 100K+ Data Experts: Register Now & Grow with Us!

Excited to expand your horizons with us? Click here to Register and begin your journey to success!

Already a member? Login and join your local regional user group! If there isn’t one near you, fill out this form and we’ll create one for you to join!