cancel
Showing results for 
Search instead for 
Did you mean: 
Get Started Discussions
Start your journey with Databricks by joining discussions on getting started guides, tutorials, and introductory topics. Connect with beginners and experts alike to kickstart your Databricks experience.
cancel
Showing results for 
Search instead for 
Did you mean: 

Convert multiple string fields to int or long during streaming

hukel
Contributor

Source data looks like:

 

{
	"IntegrityLevel": "16384",
	"ParentProcessId": "10972929104936",
	"SourceProcessId": "10972929104936",
	"SHA256Hash": "a26a1ffb81a61281ffa55cb7778cc3fb0ff981704de49f75f51f18b283fba7a2",
	"ImageFileName": "\\Device\\HarddiskVolume3\\Windows\\System32\\conhost.exe",
	"SourceThreadId": "192964614202574",
	"name": "ProcessRollup2V19",
}

 

Current streaming read is using an inferred schema, which is rendering fields like ParentProcessId as strings.

I have experimented with a manually-constructed schema using LongType, but this turns the would-be integers to nulls:

 

val schema = new StructType()
  .add("ImageFileName", StringType,true)
  .add("ParentProcessId", LongType, true)

 

If this can't be done via schema, is there an elegant way (avoiding foreachBatch) in

readStream.format("delta") or writeStream.format("delta")

to convert a dozen or so fields to a different type?

1 ACCEPTED SOLUTION

Accepted Solutions

hukel
Contributor

Thanks for confirming that the readStream.withColumn() approach is the best available option.  Unfortunately, this will force me to maintain a separate notebook for each of the event types,  but it does work.   I was hoping to create just one parameterized notebook that could read from an on-disk schema file for each event type (which would be the parameter).   This notebook is my evolution of the work in https://www.databricks.com/blog/2021/05/20/building-a-cybersecurity-lakehouse-for-crowdstrike-falcon...

spark.readStream
  .format("delta")
  .option("failOnDataLoss", "true")
  .table(rawTableName)
  .filter('event_simpleName === eventName)  // prod filter
  .withColumn("event", from_json('value,rawEventSchema))  // expand the JSON from raw into a new "event" column.
  .withColumn("_time", col("event.timestamp").cast(LongType)/1000)   // Add a Splunk-friendly timestamp
  .select($"_time", $"event.*")  // flatten into a simple row,  remove the event.* prefix
  .withColumn("timestamp", from_unixtime(col("timestamp").cast(LongType)/1000).cast(TimestampNTZType))  // Create a SQL-friendly timestamp
  //  Paste event-specific columns here, generated this code in Excel
  .withColumn("ContextProcessId",col("ContextProcessId").cast(LongType))
  .withColumn("ContextTimeStamp",from_unixtime(col("ContextTimeStamp").cast(LongType)/1000).cast(TimestampNTZType))
  .withColumn("Protocol",col("Protocol").cast(IntegerType))
  .withColumn("ConnectionFlags",col("ConnectionFlags").cast(LongType))
  .withColumn("LocalPort",col("LocalPort").cast(IntegerType))
  .withColumn("RemotePort",col("RemotePort").cast(IntegerType))
  .withColumn("ConnectionDirection",col("ConnectionDirection").cast(IntegerType))
  .withColumn("IcmpType",col("IcmpType").cast(IntegerType))
  .withColumn("IcmpCode",col("IcmpCode").cast(IntegerType))
  .withColumn("TreeId",col("TreeId").cast(LongType))
  .withColumn("ContextThreadId",col("ContextThreadId").cast(LongType))
  .withColumn("InContext",col("InContext").cast(BooleanType))
  .withColumn("TcpConnectErrorCode",col("TcpConnectErrorCode").cast(IntegerType))

 

View solution in original post

1 REPLY 1

hukel
Contributor

Thanks for confirming that the readStream.withColumn() approach is the best available option.  Unfortunately, this will force me to maintain a separate notebook for each of the event types,  but it does work.   I was hoping to create just one parameterized notebook that could read from an on-disk schema file for each event type (which would be the parameter).   This notebook is my evolution of the work in https://www.databricks.com/blog/2021/05/20/building-a-cybersecurity-lakehouse-for-crowdstrike-falcon...

spark.readStream
  .format("delta")
  .option("failOnDataLoss", "true")
  .table(rawTableName)
  .filter('event_simpleName === eventName)  // prod filter
  .withColumn("event", from_json('value,rawEventSchema))  // expand the JSON from raw into a new "event" column.
  .withColumn("_time", col("event.timestamp").cast(LongType)/1000)   // Add a Splunk-friendly timestamp
  .select($"_time", $"event.*")  // flatten into a simple row,  remove the event.* prefix
  .withColumn("timestamp", from_unixtime(col("timestamp").cast(LongType)/1000).cast(TimestampNTZType))  // Create a SQL-friendly timestamp
  //  Paste event-specific columns here, generated this code in Excel
  .withColumn("ContextProcessId",col("ContextProcessId").cast(LongType))
  .withColumn("ContextTimeStamp",from_unixtime(col("ContextTimeStamp").cast(LongType)/1000).cast(TimestampNTZType))
  .withColumn("Protocol",col("Protocol").cast(IntegerType))
  .withColumn("ConnectionFlags",col("ConnectionFlags").cast(LongType))
  .withColumn("LocalPort",col("LocalPort").cast(IntegerType))
  .withColumn("RemotePort",col("RemotePort").cast(IntegerType))
  .withColumn("ConnectionDirection",col("ConnectionDirection").cast(IntegerType))
  .withColumn("IcmpType",col("IcmpType").cast(IntegerType))
  .withColumn("IcmpCode",col("IcmpCode").cast(IntegerType))
  .withColumn("TreeId",col("TreeId").cast(LongType))
  .withColumn("ContextThreadId",col("ContextThreadId").cast(LongType))
  .withColumn("InContext",col("InContext").cast(BooleanType))
  .withColumn("TcpConnectErrorCode",col("TcpConnectErrorCode").cast(IntegerType))