04-27-2023 04:08 PM
I am pulling data from Google BigQuery and writing it to a bronze table on an interval. I do this in a separate continuous job because DLT did not like the BigQuery connector calling collect on a dataframe inside of DLT.
In Python, I would like to read that bronze table in to DLT in a streaming fashion and create a silver table with some complex dataframe logic and functions. I can accomplish this with the below SQL, but most of our pipeline is in Python and I'd like to know how to do this.
I am probably missing something rather small. I do NOT want to use the absolute path if possible. I would rather reference the table.
How do I convert the below SQL to Python? Can I use a table reference in Python? Where is this explained in the docs?
CREATE STREAMING LIVE VIEW silver_1 -- create a new STREAMING LIVE view called silver_1
SELECT *
FROM STREAM(dev.bronze_raw)
-- catalog = hive_metastore
-- schema = dev
-- table = bronze_raw
-- path would be something like = dbfs:/user/hive/warehouse/dev.db/bronze_raw
Python please...
import dlt
???
04-28-2023 03:24 PM
The below code is a solution. I was missing that I could read from a table with `spark.readStream.format("delta").table("...")`. Simple. Just missed it. This is different than `dlt.read_stream()` which appears in the examples a lot.
This is referenced as an example in the docs on CDC: https://docs.databricks.com/delta-live-tables/cdc.html.
import dlt
@dlt.table(
table_properties = {"quality" : "silver"}
)
def silver_1():
# Read the changes as a stream from the table
df = spark.readStream.format("delta").table("hive_metastore.dev.bronze_raw")
# Return the entire dataframe with all columns
return df
Reading from a table like this is not explicitly given as an example in the Python ref: https://docs.databricks.com/delta-live-tables/python-ref.html. I think that making this an example in a section called "Reading from sources" with examples on how to read in various ways would save people some time. I will send some feedback on that.
04-28-2023 03:24 PM
The below code is a solution. I was missing that I could read from a table with `spark.readStream.format("delta").table("...")`. Simple. Just missed it. This is different than `dlt.read_stream()` which appears in the examples a lot.
This is referenced as an example in the docs on CDC: https://docs.databricks.com/delta-live-tables/cdc.html.
import dlt
@dlt.table(
table_properties = {"quality" : "silver"}
)
def silver_1():
# Read the changes as a stream from the table
df = spark.readStream.format("delta").table("hive_metastore.dev.bronze_raw")
# Return the entire dataframe with all columns
return df
Reading from a table like this is not explicitly given as an example in the Python ref: https://docs.databricks.com/delta-live-tables/python-ref.html. I think that making this an example in a section called "Reading from sources" with examples on how to read in various ways would save people some time. I will send some feedback on that.
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group