cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

What libraries could be used for unit testing of the Spark code?

alexott
Valued Contributor II
Valued Contributor II

We need to add unit test cases for our code that we're writing using the Scala in Python. But we can't use the calls like `assertEqual` for comparing the content of DataFrames. Are any special libraries for that?

1 REPLY 1

alexott
Valued Contributor II
Valued Contributor II

There are several libraries for Scala and Python that help with writing unit tests for Spark code.

For Scala you can use following:

  • Built-in Spark test suite - it's designed to test all parts of Spark. It supports RDD, Dataframe/Dataset, Streaming APIs
  • spark-testing-base - supports both Scala & Python. Supports RDD, Dataframe/Dataset, and Streaming APIs.
  • spark-fast-tests - supports both Spark 2 & 3, very simple, easy to use API. Easy to combine with Scalatest

For Python you can use also following (in addition to the spark-testing-base)

  • chispa - Python port of spark-fast-tests
  • pytest-spark - simplifies Spark session creation, natively integrated with pytest

Code examples for all of this libraries could be found in one place.

Here is small example of using Chispa:

from chispa import *
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
 
 
spark = SparkSession.builder.master("local").appName("chispa").getOrCreate()
 
 
def remove_non_word_characters(col):
    return F.regexp_replace(col, "[^\\w\\s]+", "")
 
 
def test_removes_non_word_characters_short():
    data = [("jo&&se", "jose"), ("**li**", "li"), ("#::luisa", "luisa"), (None, None)]
    df = spark.createDataFrame(data, ["name", "expected_name"]).withColumn(
        "clean_name", remove_non_word_characters(F.col("name"))
    )
    assert_column_equality(df, "clean_name", "expected_name")
 
 
def test_remove_non_word_characters_nice_error():
    data = [("matt7", "matt"), ("bill&", "bill"), ("isabela*", "isabela"), (None, None)]
    df = spark.createDataFrame(data, ["name", "expected_name"]).withColumn(
        "clean_name", remove_non_word_characters(F.col("name"))
    )
    assert_column_equality(df, "clean_name", "expected_name")

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group