cancel
Showing results for 
Search instead for 
Did you mean: 
Community Platform Discussions
Connect with fellow community members to discuss general topics related to the Databricks platform, industry trends, and best practices. Share experiences, ask questions, and foster collaboration within the community.
cancel
Showing results for 
Search instead for 
Did you mean: 

Convert multiple string fields to int or long during streaming

hukel
Contributor

Source data looks like:

 

{
	"IntegrityLevel": "16384",
	"ParentProcessId": "10972929104936",
	"SourceProcessId": "10972929104936",
	"SHA256Hash": "a26a1ffb81a61281ffa55cb7778cc3fb0ff981704de49f75f51f18b283fba7a2",
	"ImageFileName": "\\Device\\HarddiskVolume3\\Windows\\System32\\conhost.exe",
	"SourceThreadId": "192964614202574",
	"name": "ProcessRollup2V19",
}

 

Current streaming read is using an inferred schema, which is rendering fields like ParentProcessId as strings.

I have experimented with a manually-constructed schema using LongType, but this turns the would-be integers to nulls:

 

val schema = new StructType()
  .add("ImageFileName", StringType,true)
  .add("ParentProcessId", LongType, true)

 

If this can't be done via schema, is there an elegant way (avoiding foreachBatch) in

readStream.format("delta") or writeStream.format("delta")

to convert a dozen or so fields to a different type?

1 ACCEPTED SOLUTION

Accepted Solutions

hukel
Contributor

Thanks for confirming that the readStream.withColumn() approach is the best available option.  Unfortunately, this will force me to maintain a separate notebook for each of the event types,  but it does work.   I was hoping to create just one parameterized notebook that could read from an on-disk schema file for each event type (which would be the parameter).   This notebook is my evolution of the work in https://www.databricks.com/blog/2021/05/20/building-a-cybersecurity-lakehouse-for-crowdstrike-falcon...

spark.readStream
  .format("delta")
  .option("failOnDataLoss", "true")
  .table(rawTableName)
  .filter('event_simpleName === eventName)  // prod filter
  .withColumn("event", from_json('value,rawEventSchema))  // expand the JSON from raw into a new "event" column.
  .withColumn("_time", col("event.timestamp").cast(LongType)/1000)   // Add a Splunk-friendly timestamp
  .select($"_time", $"event.*")  // flatten into a simple row,  remove the event.* prefix
  .withColumn("timestamp", from_unixtime(col("timestamp").cast(LongType)/1000).cast(TimestampNTZType))  // Create a SQL-friendly timestamp
  //  Paste event-specific columns here, generated this code in Excel
  .withColumn("ContextProcessId",col("ContextProcessId").cast(LongType))
  .withColumn("ContextTimeStamp",from_unixtime(col("ContextTimeStamp").cast(LongType)/1000).cast(TimestampNTZType))
  .withColumn("Protocol",col("Protocol").cast(IntegerType))
  .withColumn("ConnectionFlags",col("ConnectionFlags").cast(LongType))
  .withColumn("LocalPort",col("LocalPort").cast(IntegerType))
  .withColumn("RemotePort",col("RemotePort").cast(IntegerType))
  .withColumn("ConnectionDirection",col("ConnectionDirection").cast(IntegerType))
  .withColumn("IcmpType",col("IcmpType").cast(IntegerType))
  .withColumn("IcmpCode",col("IcmpCode").cast(IntegerType))
  .withColumn("TreeId",col("TreeId").cast(LongType))
  .withColumn("ContextThreadId",col("ContextThreadId").cast(LongType))
  .withColumn("InContext",col("InContext").cast(BooleanType))
  .withColumn("TcpConnectErrorCode",col("TcpConnectErrorCode").cast(IntegerType))

 

View solution in original post

1 REPLY 1

hukel
Contributor

Thanks for confirming that the readStream.withColumn() approach is the best available option.  Unfortunately, this will force me to maintain a separate notebook for each of the event types,  but it does work.   I was hoping to create just one parameterized notebook that could read from an on-disk schema file for each event type (which would be the parameter).   This notebook is my evolution of the work in https://www.databricks.com/blog/2021/05/20/building-a-cybersecurity-lakehouse-for-crowdstrike-falcon...

spark.readStream
  .format("delta")
  .option("failOnDataLoss", "true")
  .table(rawTableName)
  .filter('event_simpleName === eventName)  // prod filter
  .withColumn("event", from_json('value,rawEventSchema))  // expand the JSON from raw into a new "event" column.
  .withColumn("_time", col("event.timestamp").cast(LongType)/1000)   // Add a Splunk-friendly timestamp
  .select($"_time", $"event.*")  // flatten into a simple row,  remove the event.* prefix
  .withColumn("timestamp", from_unixtime(col("timestamp").cast(LongType)/1000).cast(TimestampNTZType))  // Create a SQL-friendly timestamp
  //  Paste event-specific columns here, generated this code in Excel
  .withColumn("ContextProcessId",col("ContextProcessId").cast(LongType))
  .withColumn("ContextTimeStamp",from_unixtime(col("ContextTimeStamp").cast(LongType)/1000).cast(TimestampNTZType))
  .withColumn("Protocol",col("Protocol").cast(IntegerType))
  .withColumn("ConnectionFlags",col("ConnectionFlags").cast(LongType))
  .withColumn("LocalPort",col("LocalPort").cast(IntegerType))
  .withColumn("RemotePort",col("RemotePort").cast(IntegerType))
  .withColumn("ConnectionDirection",col("ConnectionDirection").cast(IntegerType))
  .withColumn("IcmpType",col("IcmpType").cast(IntegerType))
  .withColumn("IcmpCode",col("IcmpCode").cast(IntegerType))
  .withColumn("TreeId",col("TreeId").cast(LongType))
  .withColumn("ContextThreadId",col("ContextThreadId").cast(LongType))
  .withColumn("InContext",col("InContext").cast(BooleanType))
  .withColumn("TcpConnectErrorCode",col("TcpConnectErrorCode").cast(IntegerType))

 

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group