I found a way to get what I needed and I can apply this to any fixed width file. Will share for anyone trying to do the same thing.
I accomplished this in a Python notebook and will explain the code:
- Import the libraries needed and define a schema.
import dlt
import pyspark.sql.functions as F
# Schema definitions
schema = {'header_1': {'idx': 1, 'len': 9},
'header_2': {'idx': 10, 'len': 9},
'header_3': {'idx': 20, 'len': 9},
'header_n': {'idx': 30, 'len': x}
}
- Create a Delta Live Table function that maps the file(s) into a dataframe and based the columns from the schema definition.
- As I am ingesting this data as raw, I also add two columns to the end to track the source file and modification time.
'''Create Delta live table
Use sql functions to trim data and pull input_file_name() and current_timestamp()
'''
@dlt.table
def tablename():
df = spark.read.text("folder_containing_data")
return df.select(*map(lambda x: F.trim(df.value.substr(schema[x]['idx'], schema[x]['len'])).alias(x), schema)).withColumn("source_file", F.lit(F.input_file_name())).withColumn("processing_time", F.lit(F.current_timestamp()))
- Create a pipeline with your notebook and you are done.