cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Schema definition help in scala notebook in databricks !!!!!!!1

Ruby8376
Valued Contributor

I am building schema for an incoming avro file(json message) and creating a final dataframe for it. The schema built looks fine as per the json sample message provided but I am getting null values in all the fields. Can somebody look at this code and tell me if I am doing anything wrong?

this is json message:{
"schemaVersion": 4,
"timeStamp": "2021-10-05T08:39:03.201+05:30",
"messageState": "new",
"eventId": 28901,
"eventTimeStamp": "2021-10-05T08:39:03.174+05:30",
"machineDetail": {
"serialNumber": "ERS00075",
"name": "TRK15"
},
"serviceMeterHours": {
"value": 754,
"unit": "hr"
},
"eventDetail": {
"name": "TEST-Machine Maintenance Event Activate 2554",
"description": "TEST-Jacket Water to Engine Oil Temp Low Warning",
"typeId": "1",
"typeDescription": "Low",
"severity": "1",
"severityDescription": "Maintenance"
},
"failureModeDetail": {
"id": 21,
"description": "Data erratic, intermittent or incorrect."
},
"durationSeconds": 18894.0,
"tolerance": {
"trigger": {
"value": 635.56,
"reason": "High",
"unit": "t"
},
"worst": {
"value": 433.94,
"reason": "High",
"unit": "t"
}
},
"sourceDetails": {
"id": 279,
"description": "Alarm Module #1"
},
"positionDetails": {
"global": {
"lat": 33.200424,
"lon": 435.99,
"elv": 69.0
}
}
}

 

My code:

import org.apache.spark.sql.types._

def buildSchema(): StructType = {
  return new StructType()
  .add("data", new StructType()
    .add("schemaVersion", IntegerType)
    // .add("timeStamp", StringType)
    // .add("messageState", StringType)
    // .add("eventId", LongType)
    // .add("eventTimeStamp", StringType)
  //   .add("machineDetail", new StructType()
  //     .add("serialNumber", StringType)
  //     .add("name", StringType)
  //   )
  //   .add("serviceMeterHours", new StructType()
  //     .add("value", IntegerType)
  //     .add("unit", StringType)
  //   )
  //   .add("eventDetail", new StructType()
  //     .add("name", StringType)
  //     .add("typeId", StringType)
  //     .add("typeDescription", StringType)
  //     .add("severity", StringType)
  //     .add("description", StringType)
  //     .add("severityDescription", StringType)
  //   )
  //   .add("failureModeDetail", new StructType()
  //     .add("id", IntegerType)
  //     .add("description", StringType)
  //   )
  //   .add("durationSeconds", DoubleType)
  //   .add("tolerance", new StructType()
  //     .add("trigger", new StructType()
  //       .add("value", DoubleType)
  //       .add("reason", StringType)
  //       .add("unit", StringType)
  //     )
  //     .add("worst", new StructType()
  //       .add("value", DoubleType)
  //       .add("reason", StringType)
  //       .add("unit", StringType)
  //     )
  //   )
  //   .add("sourceDetails", new StructType()
  //     .add("id", IntegerType)
  //     .add("description", StringType)
  //   )
  //   .add("positionDetails", new StructType()
  //     .add("global", new StructType()
  //       .add("lat", DoubleType)
  //       .add("lon", DoubleType)
  //       .add("elv", DoubleType)
  //     )
   )
   )
}
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.{col, explode, from_json, substring}
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.{callUDF, from_json, substring}

val mySchema = buildSchema()

def healthdataeventStream(stream: DataFrame, schema: StructType😞 DataFrame = {
  val hdestream = stream
    .withColumn("Partition", substring(input_file_name(), -6, 1))
    .withColumn("Body", from_json(col("Body").cast("string"), schema))
    .withColumn("Data", col("Body.data"))

  val flattenedData = hdestream.select(
    $"Data.schemaVersion",
    $"Data.timeStamp",
    $"Data.messageState",
    $"Data.eventId",
    $"Data.eventTimeStamp",
    $"Data.machineDetail.serialNumber",
    $"Data.machineDetail.name",
    $"Data.serviceMeterHours.value",
    $"Data.serviceMeterHours.unit",
    $"Data.eventDetail.name",
    $"Data.eventDetail.description",
    $"Data.eventDetail.typeId",
    $"Data.eventDetail.typeDescription",
    $"Data.eventDetail.severity",
    $"Data.eventDetail.severityDescription",
    $"Data.failureModeDetail.id",
    $"Data.failureModeDetail.description",
    $"Data.durationSeconds",
    $"Data.tolerance.trigger.value".as("tolerance_trigger_value"),
    $"Data.tolerance.trigger.reason".as("tolerance_trigger_reason"),
    $"Data.tolerance.trigger.unit".as("tolerance_trigger_unit"),
    $"Data.tolerance.worst.value".as("tolerance_worst_value"),
    $"Data.tolerance.worst.reason".as("tolerance_worst_reason"),
    $"Data.tolerance.worst.unit".as("tolerance_worst_unit"),
    $"Data.sourceDetails.id",
    $"Data.sourceDetails.description",
    $"Data.positionDetails.global.lat",
    $"Data.positionDetails.global.lon",
    $"Data.positionDetails.global.elv"
  )

  flattenedData
}

 

0 REPLIES 0
Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.