- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
12-01-2023 12:38 PM
Source data looks like:
{
"IntegrityLevel": "16384",
"ParentProcessId": "10972929104936",
"SourceProcessId": "10972929104936",
"SHA256Hash": "a26a1ffb81a61281ffa55cb7778cc3fb0ff981704de49f75f51f18b283fba7a2",
"ImageFileName": "\\Device\\HarddiskVolume3\\Windows\\System32\\conhost.exe",
"SourceThreadId": "192964614202574",
"name": "ProcessRollup2V19",
}
Current streaming read is using an inferred schema, which is rendering fields like ParentProcessId as strings.
I have experimented with a manually-constructed schema using LongType, but this turns the would-be integers to nulls:
val schema = new StructType()
.add("ImageFileName", StringType,true)
.add("ParentProcessId", LongType, true)
If this can't be done via schema, is there an elegant way (avoiding foreachBatch) in
to convert a dozen or so fields to a different type?
Accepted Solutions
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
12-03-2023 08:14 AM
Thanks for confirming that the readStream.withColumn() approach is the best available option. Unfortunately, this will force me to maintain a separate notebook for each of the event types, but it does work. I was hoping to create just one parameterized notebook that could read from an on-disk schema file for each event type (which would be the parameter). This notebook is my evolution of the work in https://www.databricks.com/blog/2021/05/20/building-a-cybersecurity-lakehouse-for-crowdstrike-falcon...
spark.readStream
.format("delta")
.option("failOnDataLoss", "true")
.table(rawTableName)
.filter('event_simpleName === eventName) // prod filter
.withColumn("event", from_json('value,rawEventSchema)) // expand the JSON from raw into a new "event" column.
.withColumn("_time", col("event.timestamp").cast(LongType)/1000) // Add a Splunk-friendly timestamp
.select($"_time", $"event.*") // flatten into a simple row, remove the event.* prefix
.withColumn("timestamp", from_unixtime(col("timestamp").cast(LongType)/1000).cast(TimestampNTZType)) // Create a SQL-friendly timestamp
// Paste event-specific columns here, generated this code in Excel
.withColumn("ContextProcessId",col("ContextProcessId").cast(LongType))
.withColumn("ContextTimeStamp",from_unixtime(col("ContextTimeStamp").cast(LongType)/1000).cast(TimestampNTZType))
.withColumn("Protocol",col("Protocol").cast(IntegerType))
.withColumn("ConnectionFlags",col("ConnectionFlags").cast(LongType))
.withColumn("LocalPort",col("LocalPort").cast(IntegerType))
.withColumn("RemotePort",col("RemotePort").cast(IntegerType))
.withColumn("ConnectionDirection",col("ConnectionDirection").cast(IntegerType))
.withColumn("IcmpType",col("IcmpType").cast(IntegerType))
.withColumn("IcmpCode",col("IcmpCode").cast(IntegerType))
.withColumn("TreeId",col("TreeId").cast(LongType))
.withColumn("ContextThreadId",col("ContextThreadId").cast(LongType))
.withColumn("InContext",col("InContext").cast(BooleanType))
.withColumn("TcpConnectErrorCode",col("TcpConnectErrorCode").cast(IntegerType))
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
12-03-2023 08:14 AM
Thanks for confirming that the readStream.withColumn() approach is the best available option. Unfortunately, this will force me to maintain a separate notebook for each of the event types, but it does work. I was hoping to create just one parameterized notebook that could read from an on-disk schema file for each event type (which would be the parameter). This notebook is my evolution of the work in https://www.databricks.com/blog/2021/05/20/building-a-cybersecurity-lakehouse-for-crowdstrike-falcon...
spark.readStream
.format("delta")
.option("failOnDataLoss", "true")
.table(rawTableName)
.filter('event_simpleName === eventName) // prod filter
.withColumn("event", from_json('value,rawEventSchema)) // expand the JSON from raw into a new "event" column.
.withColumn("_time", col("event.timestamp").cast(LongType)/1000) // Add a Splunk-friendly timestamp
.select($"_time", $"event.*") // flatten into a simple row, remove the event.* prefix
.withColumn("timestamp", from_unixtime(col("timestamp").cast(LongType)/1000).cast(TimestampNTZType)) // Create a SQL-friendly timestamp
// Paste event-specific columns here, generated this code in Excel
.withColumn("ContextProcessId",col("ContextProcessId").cast(LongType))
.withColumn("ContextTimeStamp",from_unixtime(col("ContextTimeStamp").cast(LongType)/1000).cast(TimestampNTZType))
.withColumn("Protocol",col("Protocol").cast(IntegerType))
.withColumn("ConnectionFlags",col("ConnectionFlags").cast(LongType))
.withColumn("LocalPort",col("LocalPort").cast(IntegerType))
.withColumn("RemotePort",col("RemotePort").cast(IntegerType))
.withColumn("ConnectionDirection",col("ConnectionDirection").cast(IntegerType))
.withColumn("IcmpType",col("IcmpType").cast(IntegerType))
.withColumn("IcmpCode",col("IcmpCode").cast(IntegerType))
.withColumn("TreeId",col("TreeId").cast(LongType))
.withColumn("ContextThreadId",col("ContextThreadId").cast(LongType))
.withColumn("InContext",col("InContext").cast(BooleanType))
.withColumn("TcpConnectErrorCode",col("TcpConnectErrorCode").cast(IntegerType))