I am using script for CDC Merge in spark streaming. I wish to pass column values in selectExpr through a parameter as column names for each table would change. When I pass the columns and struct field through a string variable, I am getting error as ==> mismatched input ',' expecting
Below is the piece of code I am trying to parameterize.
var filteredMicroBatchDF=microBatchOutputDF
.selectExpr("col1","col2","struct(offset,KAFKA_TS) as otherCols" )
.groupBy("col1","col2").agg(max("otherCols").as("latest"))
.selectExpr("col1","col2","latest.*")
Reference to script being emulated:
https://docs.databricks.com/_static/notebooks/merge-in-cdc.html
I have tried like below by passing column names in a variable and then reading in the selectExpr from these variables:
val keyCols ="col1","col2"
val structCols ="struct(offset,KAFKA_TS) as otherCols"
var filteredMicroBatchDF=microBatchOutputDF.selectExpr(keyCols,structCols ).groupBy(keyCols).agg(max("otherCols").as("latest")).selectExpr(keyCols,"latest.*")
When I run the script it gives me error as
org.apache.spark.sql.streaming.StreamingQueryException:
mismatched input ',' expecting <<EOF>>