I am building schema for an incoming avro file(json message) and creating a final dataframe for it. The schema built looks fine as per the json sample message provided but I am getting null values in all the fields. Can somebody look at this code and tell me if I am doing anything wrong?
import org.apache.spark.sql.types._
def buildSchema(): StructType = {
return new StructType()
.add("data", new StructType()
.add("schemaVersion", IntegerType)
// .add("timeStamp", StringType)
// .add("messageState", StringType)
// .add("eventId", LongType)
// .add("eventTimeStamp", StringType)
// .add("machineDetail", new StructType()
// .add("serialNumber", StringType)
// .add("name", StringType)
// )
// .add("serviceMeterHours", new StructType()
// .add("value", IntegerType)
// .add("unit", StringType)
// )
// .add("eventDetail", new StructType()
// .add("name", StringType)
// .add("typeId", StringType)
// .add("typeDescription", StringType)
// .add("severity", StringType)
// .add("description", StringType)
// .add("severityDescription", StringType)
// )
// .add("failureModeDetail", new StructType()
// .add("id", IntegerType)
// .add("description", StringType)
// )
// .add("durationSeconds", DoubleType)
// .add("tolerance", new StructType()
// .add("trigger", new StructType()
// .add("value", DoubleType)
// .add("reason", StringType)
// .add("unit", StringType)
// )
// .add("worst", new StructType()
// .add("value", DoubleType)
// .add("reason", StringType)
// .add("unit", StringType)
// )
// )
// .add("sourceDetails", new StructType()
// .add("id", IntegerType)
// .add("description", StringType)
// )
// .add("positionDetails", new StructType()
// .add("global", new StructType()
// .add("lat", DoubleType)
// .add("lon", DoubleType)
// .add("elv", DoubleType)
// )
)
)
}
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.{col, explode, from_json, substring}
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.{callUDF, from_json, substring}
val mySchema = buildSchema()
def healthdataeventStream(stream: DataFrame, schema: StructType😞 DataFrame = {
val hdestream = stream
.withColumn("Partition", substring(input_file_name(), -6, 1))
.withColumn("Body", from_json(col("Body").cast("string"), schema))
.withColumn("Data", col("Body.data"))
val flattenedData = hdestream.select(
$"Data.schemaVersion",
$"Data.timeStamp",
$"Data.messageState",
$"Data.eventId",
$"Data.eventTimeStamp",
$"Data.machineDetail.serialNumber",
$"Data.machineDetail.name",
$"Data.serviceMeterHours.value",
$"Data.serviceMeterHours.unit",
$"Data.eventDetail.name",
$"Data.eventDetail.description",
$"Data.eventDetail.typeId",
$"Data.eventDetail.typeDescription",
$"Data.eventDetail.severity",
$"Data.eventDetail.severityDescription",
$"Data.failureModeDetail.id",
$"Data.failureModeDetail.description",
$"Data.durationSeconds",
$"Data.tolerance.trigger.value".as("tolerance_trigger_value"),
$"Data.tolerance.trigger.reason".as("tolerance_trigger_reason"),
$"Data.tolerance.trigger.unit".as("tolerance_trigger_unit"),
$"Data.tolerance.worst.value".as("tolerance_worst_value"),
$"Data.tolerance.worst.reason".as("tolerance_worst_reason"),
$"Data.tolerance.worst.unit".as("tolerance_worst_unit"),
$"Data.sourceDetails.id",
$"Data.sourceDetails.description",
$"Data.positionDetails.global.lat",
$"Data.positionDetails.global.lon",
$"Data.positionDetails.global.elv"
)
flattenedData
}