cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Filtering records for all values of an array in Spark

prachicsa
New Contributor

I am very new to Spark.

I have a very basic question. I have an array of values:

listofECtokens: Array[String] = Array(EC-17A5206955089011B, EC-17A5206955089011A)

I want to filter an RDD for all of these token values. I tried the following way:

val ECtokens = for (token <- listofECtokens) rddAll.filter(line => line.contains(token))

Output:

ECtokens: Unit = ()

I got an empty Unit even when there are records with these tokens. What am I doing wrong?

3 REPLIES 3

PohlPosition
New Contributor III

If I understand you correctly, you have a large array of tokens, and you want to filter that large array against a smaller array of tokens.

You should convert these arrays into RDDs and then use the intersect() function to just return the tokens in common between the two lists:

val listofECtokens: Array[String] = Array("EC-17A5206955089011B", "EC-17A5206955089011A")

//Turn this array into an RDD

val listofECtokensRDD = sc.parallelize(listofECtokens)

//Create a bigger RDD of tokens

val biggerListofECtokensRDD = sc.parallelize(Array("EC-17A5206955089011B", "EC-17A5206955089011A", "EC-15B5206955089011A", "EC-12C5206955089011A"))

//Collect just the intersection of tokens between the two RDDs

val filteredRDD = biggerListofECtokensRDD.intersection(listofECtokensRDD).collect()

Please note that when using collect() all of the filtered data will be sent back to the driver machine. For small examples like this, it is acceptable. But for big data, you may run into out of memory errors.

NanditaDwivedi
New Contributor II

Did you find the solution?

__max
New Contributor III

Actually, the intersection transformation does deduplication. If you don't need it, you can just slightly modify your code:

val filteredRdd =  rddAll.filter(line => line.contains(token))

and send data of the rdd to your program by calling of an action like collect or take:

val result = filteredRdd.take(100)

The result is just regular Array.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group