I am using script for CDC Merge in spark streaming. I wish to pass column values in selectExpr through a parameter as column names for each table would change. When I pass the columns and struct field through a string variable, I am getting error as ==> mismatched input ',' expecting
Below is the piece of code I am trying to parameterize.
var filteredMicroBatchDF=microBatchOutputDF
.selectExpr("col1","col2","struct(offset,KAFKA_TS) as otherCols" )
Reference to script being emulated:
I have tried like below by passing column names in a variable and then reading in the selectExpr from these variables:
val keyCols ="col1","col2"
val structCols ="struct(offset,KAFKA_TS) as otherCols"
var filteredMicroBatchDF=microBatchOutputDF.selectExpr(keyCols,structCols ).groupBy(keyCols).agg(max("otherCols").as("latest")).selectExpr(keyCols,"latest.*")
When I run the script it gives me error as
mismatched input ',' expecting <<EOF>>