cancel
Showing results for 
Search instead for 
Did you mean: 
Community Discussions
cancel
Showing results for 
Search instead for 
Did you mean: 

Convert multiple string fields to int or long during streaming

hukel
Contributor

Source data looks like:

 

{
	"IntegrityLevel": "16384",
	"ParentProcessId": "10972929104936",
	"SourceProcessId": "10972929104936",
	"SHA256Hash": "a26a1ffb81a61281ffa55cb7778cc3fb0ff981704de49f75f51f18b283fba7a2",
	"ImageFileName": "\\Device\\HarddiskVolume3\\Windows\\System32\\conhost.exe",
	"SourceThreadId": "192964614202574",
	"name": "ProcessRollup2V19",
}

 

Current streaming read is using an inferred schema, which is rendering fields like ParentProcessId as strings.

I have experimented with a manually-constructed schema using LongType, but this turns the would-be integers to nulls:

 

val schema = new StructType()
  .add("ImageFileName", StringType,true)
  .add("ParentProcessId", LongType, true)

 

If this can't be done via schema, is there an elegant way (avoiding foreachBatch) in

readStream.format("delta") or writeStream.format("delta")

to convert a dozen or so fields to a different type?

1 ACCEPTED SOLUTION

Accepted Solutions

hukel
Contributor

Thanks for confirming that the readStream.withColumn() approach is the best available option.  Unfortunately, this will force me to maintain a separate notebook for each of the event types,  but it does work.   I was hoping to create just one parameterized notebook that could read from an on-disk schema file for each event type (which would be the parameter).   This notebook is my evolution of the work in https://www.databricks.com/blog/2021/05/20/building-a-cybersecurity-lakehouse-for-crowdstrike-falcon...

spark.readStream
  .format("delta")
  .option("failOnDataLoss", "true")
  .table(rawTableName)
  .filter('event_simpleName === eventName)  // prod filter
  .withColumn("event", from_json('value,rawEventSchema))  // expand the JSON from raw into a new "event" column.
  .withColumn("_time", col("event.timestamp").cast(LongType)/1000)   // Add a Splunk-friendly timestamp
  .select($"_time", $"event.*")  // flatten into a simple row,  remove the event.* prefix
  .withColumn("timestamp", from_unixtime(col("timestamp").cast(LongType)/1000).cast(TimestampNTZType))  // Create a SQL-friendly timestamp
  //  Paste event-specific columns here, generated this code in Excel
  .withColumn("ContextProcessId",col("ContextProcessId").cast(LongType))
  .withColumn("ContextTimeStamp",from_unixtime(col("ContextTimeStamp").cast(LongType)/1000).cast(TimestampNTZType))
  .withColumn("Protocol",col("Protocol").cast(IntegerType))
  .withColumn("ConnectionFlags",col("ConnectionFlags").cast(LongType))
  .withColumn("LocalPort",col("LocalPort").cast(IntegerType))
  .withColumn("RemotePort",col("RemotePort").cast(IntegerType))
  .withColumn("ConnectionDirection",col("ConnectionDirection").cast(IntegerType))
  .withColumn("IcmpType",col("IcmpType").cast(IntegerType))
  .withColumn("IcmpCode",col("IcmpCode").cast(IntegerType))
  .withColumn("TreeId",col("TreeId").cast(LongType))
  .withColumn("ContextThreadId",col("ContextThreadId").cast(LongType))
  .withColumn("InContext",col("InContext").cast(BooleanType))
  .withColumn("TcpConnectErrorCode",col("TcpConnectErrorCode").cast(IntegerType))

 

View solution in original post

2 REPLIES 2

Kaniz
Community Manager
Community Manager

Hi @hukel, Certainly! Let’s address the data type conversion issue for your streaming read.

 

Inferred Schema and Data Types: When using the inferSchema option during CSV import, Spark attempts to automatically infer the data types of columns based on the data in the file. However, it might not always get it right, especially for numeric columns that appear as strings. In your case, the ParentProcessId field is being inferred as a string.

 

Manually Specified Schema: You’ve already experimented with a manually constructed schema using LongType for the ParentProcessId. However, this approach resulted in null values. Let’s improve this.

 

Correcting Data Types: To convert a string column to an integer (or other numeric types), you can use the cast function.

Avoiding foreachBatch: If you want to avoid using foreachBatch, you can directly apply the type conversion within your streaming read or write operations. 

 

hukel
Contributor

Thanks for confirming that the readStream.withColumn() approach is the best available option.  Unfortunately, this will force me to maintain a separate notebook for each of the event types,  but it does work.   I was hoping to create just one parameterized notebook that could read from an on-disk schema file for each event type (which would be the parameter).   This notebook is my evolution of the work in https://www.databricks.com/blog/2021/05/20/building-a-cybersecurity-lakehouse-for-crowdstrike-falcon...

spark.readStream
  .format("delta")
  .option("failOnDataLoss", "true")
  .table(rawTableName)
  .filter('event_simpleName === eventName)  // prod filter
  .withColumn("event", from_json('value,rawEventSchema))  // expand the JSON from raw into a new "event" column.
  .withColumn("_time", col("event.timestamp").cast(LongType)/1000)   // Add a Splunk-friendly timestamp
  .select($"_time", $"event.*")  // flatten into a simple row,  remove the event.* prefix
  .withColumn("timestamp", from_unixtime(col("timestamp").cast(LongType)/1000).cast(TimestampNTZType))  // Create a SQL-friendly timestamp
  //  Paste event-specific columns here, generated this code in Excel
  .withColumn("ContextProcessId",col("ContextProcessId").cast(LongType))
  .withColumn("ContextTimeStamp",from_unixtime(col("ContextTimeStamp").cast(LongType)/1000).cast(TimestampNTZType))
  .withColumn("Protocol",col("Protocol").cast(IntegerType))
  .withColumn("ConnectionFlags",col("ConnectionFlags").cast(LongType))
  .withColumn("LocalPort",col("LocalPort").cast(IntegerType))
  .withColumn("RemotePort",col("RemotePort").cast(IntegerType))
  .withColumn("ConnectionDirection",col("ConnectionDirection").cast(IntegerType))
  .withColumn("IcmpType",col("IcmpType").cast(IntegerType))
  .withColumn("IcmpCode",col("IcmpCode").cast(IntegerType))
  .withColumn("TreeId",col("TreeId").cast(LongType))
  .withColumn("ContextThreadId",col("ContextThreadId").cast(LongType))
  .withColumn("InContext",col("InContext").cast(BooleanType))
  .withColumn("TcpConnectErrorCode",col("TcpConnectErrorCode").cast(IntegerType))