How to pass column names in selectExpr through one or more string parameters in spark using scala?
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
10-26-2019 08:28 PM
I am using script for CDC Merge in spark streaming. I wish to pass column values in selectExpr through a parameter as column names for each table would change. When I pass the columns and struct field through a string variable, I am getting error as ==> mismatched input ',' expecting
Below is the piece of code I am trying to parameterize.
var filteredMicroBatchDF=microBatchOutputDF
.selectExpr("col1","col2","struct(offset,KAFKA_TS) as otherCols" )
.groupBy("col1","col2").agg(max("otherCols").as("latest"))
.selectExpr("col1","col2","latest.*")
Reference to script being emulated:
https://docs.databricks.com/_static/notebooks/merge-in-cdc.html
I have tried like below by passing column names in a variable and then reading in the selectExpr from these variables:
val keyCols ="col1","col2"
val structCols ="struct(offset,KAFKA_TS) as otherCols"
var filteredMicroBatchDF=microBatchOutputDF.selectExpr(keyCols,structCols ).groupBy(keyCols).agg(max("otherCols").as("latest")).selectExpr(keyCols,"latest.*")
When I run the script it gives me error as
org.apache.spark.sql.streaming.StreamingQueryException:mismatched input ',' expecting <<EOF>>
- Labels:
-
Scala spark
-
Spark--dataframe
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
10-28-2019 10:40 PM
Hi @Swapan Swapandeep Marwaha,
Can you pass them as a Seq as in below code,
keyCols = Seq("col1", "col2"), structCols = Seq("struct(offset,KAFKA_TS) as otherCols")
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
10-29-2019 06:11 PM
Hi @shyamspr,
Yes, I tried like this and it works but the way I want is to pass the column names inside Seq by reading from a widget or a parameter file and when I do I get the error.
I have updated the above post in stackoverflow with the code I tried and the error I am getting. Would appreciate if you could take a look and suggest any if you have any ideas to resolve this.
Thank You!