Using DLT, I have two streaming sources coming from autoloader. Source1 contains a single row of data in the file and Source2 has thousands of rows. There is a common key column between the two sources to join them together. So far, so good.
I have a function that does a lookup in a CosmosDB database for an item. This function takes an ItemId as a parameter. I use the return value as a filter in a later step in my pipeline. I can hard-code an ItemId in this function and the pipeline runs without issues and pulls back the proper data from Cosmos and filters the DLT table correctly. However, I don't want to hard-code the ItemId but get it's value from Source1 after a file lands. Source1 will always only contain 1 row per file and the ItemId is a column in that row.
I have tried something like this:
ItemId = spark.sql("SELECT ItemId from STREAM(LIVE.Source1)")
FilterInfo = LookupInCosmosdb(ItemId)
... but that doesn't work. I've tried several other things and none have worked.
How can I:
- Set a variable from a DLT streaming table
- Use that variable in another DLT table as a filter clause
- OR, use a sub-select directly in the WHERE clause of Source2 like this:
- SELECT * FROM Source2 WHERE Source2.ItemId IN (SELECT LookupInCosmosdb(Source1.KeyCol) FROM Source1)