cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Filtering records for all values of an array in Spark

prachicsa
New Contributor

I am very new to Spark.

I have a very basic question. I have an array of values:

listofECtokens: Array[String] = Array(EC-17A5206955089011B, EC-17A5206955089011A)

I want to filter an RDD for all of these token values. I tried the following way:

val ECtokens = for (token <- listofECtokens) rddAll.filter(line => line.contains(token))

Output:

ECtokens: Unit = ()

I got an empty Unit even when there are records with these tokens. What am I doing wrong?

3 REPLIES 3

PohlPosition
New Contributor III
New Contributor III

If I understand you correctly, you have a large array of tokens, and you want to filter that large array against a smaller array of tokens.

You should convert these arrays into RDDs and then use the intersect() function to just return the tokens in common between the two lists:

val listofECtokens: Array[String] = Array("EC-17A5206955089011B", "EC-17A5206955089011A")

//Turn this array into an RDD

val listofECtokensRDD = sc.parallelize(listofECtokens)

//Create a bigger RDD of tokens

val biggerListofECtokensRDD = sc.parallelize(Array("EC-17A5206955089011B", "EC-17A5206955089011A", "EC-15B5206955089011A", "EC-12C5206955089011A"))

//Collect just the intersection of tokens between the two RDDs

val filteredRDD = biggerListofECtokensRDD.intersection(listofECtokensRDD).collect()

Please note that when using collect() all of the filtered data will be sent back to the driver machine. For small examples like this, it is acceptable. But for big data, you may run into out of memory errors.

NanditaDwivedi
New Contributor II

Did you find the solution?

__max
New Contributor III

Actually, the intersection transformation does deduplication. If you don't need it, you can just slightly modify your code:

val filteredRdd =  rddAll.filter(line => line.contains(token))

and send data of the rdd to your program by calling of an action like collect or take:

val result = filteredRdd.take(100)

The result is just regular Array.

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.