cancel
Showing results for 
Search instead for 
Did you mean: 
Machine Learning
cancel
Showing results for 
Search instead for 
Did you mean: 

Can you use autoloader with a fixed width file?

david_torres
New Contributor II

I have a collection fixed width files that I would like to ingest monthly with autoloader but I can't seem to find an example.

I can read the files into Dataframes using a python function to map the index and length of each field with no issues but I am not sure if these DF's can then be used directly within a readstream() call.

Would it be better to convert to a parquet file before reading these in?

3 REPLIES 3

etsyal1e2r3
Honored Contributor

Autoloader is meant to be a streaming dataframe/table or just write directly into a table with a once=True option (single batch process). So really you want to autoload each source into their own tables then join them together in later logic if thats required. Autoloader can be set to inferSchema or have a rescue column if the schema ever chabges (sounds like they remain fixed in your case). So look for examples that use that once=true option and the .saveAsTable("<catalog.schema.table>") option. Let me know if you have any issues still.

Anonymous
Not applicable

Hi @David Torres​ 

Thank you for posting your question in our community! We are happy to assist you.

To help us provide you with the most accurate information, could you please take a moment to review the responses and select the one that best answers your question?

This will also help other community members who may have similar questions in the future. Thank you for your participation and let us know if you need any further assistance! 

david_torres
New Contributor II

I found a way to get what I needed and I can apply this to any fixed width file. Will share for anyone trying to do the same thing.

I accomplished this in a Python notebook and will explain the code:

  • Import the libraries needed and define a schema.
import dlt
import pyspark.sql.functions as F
 
# Schema definitions
schema = {'header_1': {'idx': 1, 'len': 9}, 
                   'header_2': {'idx': 10, 'len': 9},
                   'header_3': {'idx': 20, 'len': 9},
                   'header_n': {'idx': 30, 'len': x}
                   }
  • Create a Delta Live Table function that maps the file(s) into a dataframe and based the columns from the schema definition.
  • As I am ingesting this data as raw, I also add two columns to the end to track the source file and modification time.
'''Create Delta live table
Use sql functions to trim data and pull input_file_name() and current_timestamp()
'''
@dlt.table
def tablename():
    df = spark.read.text("folder_containing_data")
    return df.select(*map(lambda x: F.trim(df.value.substr(schema[x]['idx'], schema[x]['len'])).alias(x), schema)).withColumn("source_file", F.lit(F.input_file_name())).withColumn("processing_time", F.lit(F.current_timestamp()))
  • Create a pipeline with your notebook and you are done.
Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.