Hi there, I am trying to build a delta live tables pipeline that ingests gzip compressed archives as they're uploaded to S3. The archives contain 2 files in a proprietary format, and one is needed to determine how to parse the other. Once the file contents have been parsed, an array of dicts is produced and written to the table
I've managed to get this working using a UDF (uses BytesIO, tarfile, and a custom parsing library that wraps a host binary). I'm not sure if I should be performing such heavy computation in a UDF, especially reaching out and executing shell commands during the UDF. If there is a more appropriate solution, please let me know (current code below)
Once the archive contents have been parsed and converted into rows, they are written to the delta live table. From there, there are multiple tables which all read and filter the data. When new matching rows appear, the data needs to be forwarded to a Lambda function for further processing by another piece of proprietary tech that converts unstructured data to structured
I am not quite sure how to achieve this. My first thoughts are to use `foreachBatch` to write into SQS or Kinesis infrastructure that I will have to provision myself. From there, I'll also configure Lambda functions to process the data, and then write the results back into another sink that Databricks can then read from.
On top of all this, I wonder how to manage versioning of ETL jobs, such that if I deploy new code for the ETL, Databricks knows which downstream assets are out of date and may need re-generation -- but I might save this one for a new topic once I have the pipeline running
Thanks in advance!
Code for decompressing and parsing zipfiles
(note: most of this is working when I do a custom run in a Notebook, but I haven't yet tried adding the Notebook to a Pipeline)
row_schema = ArrayType(StructType([
StructField("foo", StringType()),
StructField("bar", IntegerType()),
]))
@udf(returnType=row_schema)
def extract_rows(content):
# decompress archive
archive = tarfile.open(fileobj=io.BytesIO(content))
# parse archive contents ...
# extract variable number of rows from parsed content ...
@dlt.table
def my_table():
files = (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "binaryFile")
.load("s3://...")
)
return files.select(
extract_rows(files.content).alias("rows")
).select(explode("rows")).select("col.*")