cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Help integrating streaming pipeline with AWS services S3 and Lambda

matty_f
New Contributor II

Hi there, I am trying to build a delta live tables pipeline that ingests gzip compressed archives as they're uploaded to S3. The archives contain 2 files in a proprietary format, and one is needed to determine how to parse the other. Once the file contents have been parsed, an array of dicts is produced and written to the table

I've managed to get this working using a UDF (uses BytesIO, tarfile, and a custom parsing library that wraps a host binary). I'm not sure if I should be performing such heavy computation in a UDF, especially reaching out and executing shell commands during the UDF. If there is a more appropriate solution, please let me know (current code below)

Once the archive contents have been parsed and converted into rows, they are written to the delta live table. From there, there are multiple tables which all read and filter the data. When new matching rows appear, the data needs to be forwarded to a Lambda function for further processing by another piece of proprietary tech that converts unstructured data to structured

I am not quite sure how to achieve this. My first thoughts are to use `foreachBatch` to write into SQS or Kinesis infrastructure that I will have to provision myself. From there, I'll also configure Lambda functions to process the data, and then write the results back into another sink that Databricks can then read from.

On top of all this, I wonder how to manage versioning of ETL jobs, such that if I deploy new code for the ETL, Databricks knows which downstream assets are out of date and may need re-generation -- but I might save this one for a new topic once I have the pipeline running

Thanks in advance!

Code for decompressing and parsing zipfiles

(note: most of this is working when I do a custom run in a Notebook, but I haven't yet tried adding the Notebook to a Pipeline)

row_schema = ArrayType(StructType([
    StructField("foo", StringType()),
    StructField("bar", IntegerType()),
]))
 
@udf(returnType=row_schema)
def extract_rows(content):
  # decompress archive
  archive = tarfile.open(fileobj=io.BytesIO(content))  
  # parse archive contents ...
  # extract variable number of rows from parsed content ...
 
@dlt.table
def my_table():
  files = (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "binaryFile")
      .load("s3://...")
  )
  return files.select(
    extract_rows(files.content).alias("rows")
  ).select(explode("rows")).select("col.*")

0 REPLIES 0
Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.