cancel
Showing results for 
Search instead for 
Did you mean: 
Community Platform Discussions
Connect with fellow community members to discuss general topics related to the Databricks platform, industry trends, and best practices. Share experiences, ask questions, and foster collaboration within the community.
cancel
Showing results for 
Search instead for 
Did you mean: 

Convert multiple string fields to int or long during streaming

hukel
Contributor

Source data looks like:

 

{
	"IntegrityLevel": "16384",
	"ParentProcessId": "10972929104936",
	"SourceProcessId": "10972929104936",
	"SHA256Hash": "a26a1ffb81a61281ffa55cb7778cc3fb0ff981704de49f75f51f18b283fba7a2",
	"ImageFileName": "\\Device\\HarddiskVolume3\\Windows\\System32\\conhost.exe",
	"SourceThreadId": "192964614202574",
	"name": "ProcessRollup2V19",
}

 

Current streaming read is using an inferred schema, which is rendering fields like ParentProcessId as strings.

I have experimented with a manually-constructed schema using LongType, but this turns the would-be integers to nulls:

 

val schema = new StructType()
  .add("ImageFileName", StringType,true)
  .add("ParentProcessId", LongType, true)

 

If this can't be done via schema, is there an elegant way (avoiding foreachBatch) in

readStream.format("delta") or writeStream.format("delta")

to convert a dozen or so fields to a different type?

1 ACCEPTED SOLUTION

Accepted Solutions

hukel
Contributor

Thanks for confirming that the readStream.withColumn() approach is the best available option.  Unfortunately, this will force me to maintain a separate notebook for each of the event types,  but it does work.   I was hoping to create just one parameterized notebook that could read from an on-disk schema file for each event type (which would be the parameter).   This notebook is my evolution of the work in https://www.databricks.com/blog/2021/05/20/building-a-cybersecurity-lakehouse-for-crowdstrike-falcon...

spark.readStream
  .format("delta")
  .option("failOnDataLoss", "true")
  .table(rawTableName)
  .filter('event_simpleName === eventName)  // prod filter
  .withColumn("event", from_json('value,rawEventSchema))  // expand the JSON from raw into a new "event" column.
  .withColumn("_time", col("event.timestamp").cast(LongType)/1000)   // Add a Splunk-friendly timestamp
  .select($"_time", $"event.*")  // flatten into a simple row,  remove the event.* prefix
  .withColumn("timestamp", from_unixtime(col("timestamp").cast(LongType)/1000).cast(TimestampNTZType))  // Create a SQL-friendly timestamp
  //  Paste event-specific columns here, generated this code in Excel
  .withColumn("ContextProcessId",col("ContextProcessId").cast(LongType))
  .withColumn("ContextTimeStamp",from_unixtime(col("ContextTimeStamp").cast(LongType)/1000).cast(TimestampNTZType))
  .withColumn("Protocol",col("Protocol").cast(IntegerType))
  .withColumn("ConnectionFlags",col("ConnectionFlags").cast(LongType))
  .withColumn("LocalPort",col("LocalPort").cast(IntegerType))
  .withColumn("RemotePort",col("RemotePort").cast(IntegerType))
  .withColumn("ConnectionDirection",col("ConnectionDirection").cast(IntegerType))
  .withColumn("IcmpType",col("IcmpType").cast(IntegerType))
  .withColumn("IcmpCode",col("IcmpCode").cast(IntegerType))
  .withColumn("TreeId",col("TreeId").cast(LongType))
  .withColumn("ContextThreadId",col("ContextThreadId").cast(LongType))
  .withColumn("InContext",col("InContext").cast(BooleanType))
  .withColumn("TcpConnectErrorCode",col("TcpConnectErrorCode").cast(IntegerType))

 

View solution in original post

2 REPLIES 2

Kaniz_Fatma
Community Manager
Community Manager

Hi @hukel, Certainly! Let’s address the data type conversion issue for your streaming read.

 

Inferred Schema and Data Types: When using the inferSchema option during CSV import, Spark attempts to automatically infer the data types of columns based on the data in the file. However, it might not always get it right, especially for numeric columns that appear as strings. In your case, the ParentProcessId field is being inferred as a string.

 

Manually Specified Schema: You’ve already experimented with a manually constructed schema using LongType for the ParentProcessId. However, this approach resulted in null values. Let’s improve this.

 

Correcting Data Types: To convert a string column to an integer (or other numeric types), you can use the cast function.

Avoiding foreachBatch: If you want to avoid using foreachBatch, you can directly apply the type conversion within your streaming read or write operations. 

 

hukel
Contributor

Thanks for confirming that the readStream.withColumn() approach is the best available option.  Unfortunately, this will force me to maintain a separate notebook for each of the event types,  but it does work.   I was hoping to create just one parameterized notebook that could read from an on-disk schema file for each event type (which would be the parameter).   This notebook is my evolution of the work in https://www.databricks.com/blog/2021/05/20/building-a-cybersecurity-lakehouse-for-crowdstrike-falcon...

spark.readStream
  .format("delta")
  .option("failOnDataLoss", "true")
  .table(rawTableName)
  .filter('event_simpleName === eventName)  // prod filter
  .withColumn("event", from_json('value,rawEventSchema))  // expand the JSON from raw into a new "event" column.
  .withColumn("_time", col("event.timestamp").cast(LongType)/1000)   // Add a Splunk-friendly timestamp
  .select($"_time", $"event.*")  // flatten into a simple row,  remove the event.* prefix
  .withColumn("timestamp", from_unixtime(col("timestamp").cast(LongType)/1000).cast(TimestampNTZType))  // Create a SQL-friendly timestamp
  //  Paste event-specific columns here, generated this code in Excel
  .withColumn("ContextProcessId",col("ContextProcessId").cast(LongType))
  .withColumn("ContextTimeStamp",from_unixtime(col("ContextTimeStamp").cast(LongType)/1000).cast(TimestampNTZType))
  .withColumn("Protocol",col("Protocol").cast(IntegerType))
  .withColumn("ConnectionFlags",col("ConnectionFlags").cast(LongType))
  .withColumn("LocalPort",col("LocalPort").cast(IntegerType))
  .withColumn("RemotePort",col("RemotePort").cast(IntegerType))
  .withColumn("ConnectionDirection",col("ConnectionDirection").cast(IntegerType))
  .withColumn("IcmpType",col("IcmpType").cast(IntegerType))
  .withColumn("IcmpCode",col("IcmpCode").cast(IntegerType))
  .withColumn("TreeId",col("TreeId").cast(LongType))
  .withColumn("ContextThreadId",col("ContextThreadId").cast(LongType))
  .withColumn("InContext",col("InContext").cast(BooleanType))
  .withColumn("TcpConnectErrorCode",col("TcpConnectErrorCode").cast(IntegerType))

 

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group