cancel
Showing results for 
Search instead for 
Did you mean: 
Warehousing & Analytics
cancel
Showing results for 
Search instead for 
Did you mean: 

In Python, Streaming read by DLT from Hive Table

MetaRossiVinli
Contributor

I am pulling data from Google BigQuery and writing it to a bronze table on an interval. I do this in a separate continuous job because DLT did not like the BigQuery connector calling collect on a dataframe inside of DLT.

In Python, I would like to read that bronze table in to DLT in a streaming fashion and create a silver table with some complex dataframe logic and functions. I can accomplish this with the below SQL, but most of our pipeline is in Python and I'd like to know how to do this.

I am probably missing something rather small. I do NOT want to use the absolute path if possible. I would rather reference the table.

How do I convert the below SQL to Python? Can I use a table reference in Python? Where is this explained in the docs?

CREATE STREAMING LIVE VIEW silver_1  -- create a new STREAMING LIVE view called silver_1
SELECT *
FROM STREAM(dev.bronze_raw)
-- catalog = hive_metastore
-- schema = dev
-- table = bronze_raw
-- path would be something like = dbfs:/user/hive/warehouse/dev.db/bronze_raw

Python please...

import dlt
 
???

1 ACCEPTED SOLUTION

Accepted Solutions

MetaRossiVinli
Contributor

The below code is a solution. I was missing that I could read from a table with `spark.readStream.format("delta").table("...")`. Simple. Just missed it. This is different than `dlt.read_stream()` which appears in the examples a lot.

This is referenced as an example in the docs on CDC: https://docs.databricks.com/delta-live-tables/cdc.html.

import dlt
 
@dlt.table(
    table_properties = {"quality" : "silver"}
)
def silver_1():
    # Read the changes as a stream from the table
    df = spark.readStream.format("delta").table("hive_metastore.dev.bronze_raw")
    
    # Return the entire dataframe with all columns
    return df

Reading from a table like this is not explicitly given as an example in the Python ref: https://docs.databricks.com/delta-live-tables/python-ref.html. I think that making this an example in a section called "Reading from sources" with examples on how to read in various ways would save people some time. I will send some feedback on that.

View solution in original post

1 REPLY 1

MetaRossiVinli
Contributor

The below code is a solution. I was missing that I could read from a table with `spark.readStream.format("delta").table("...")`. Simple. Just missed it. This is different than `dlt.read_stream()` which appears in the examples a lot.

This is referenced as an example in the docs on CDC: https://docs.databricks.com/delta-live-tables/cdc.html.

import dlt
 
@dlt.table(
    table_properties = {"quality" : "silver"}
)
def silver_1():
    # Read the changes as a stream from the table
    df = spark.readStream.format("delta").table("hive_metastore.dev.bronze_raw")
    
    # Return the entire dataframe with all columns
    return df

Reading from a table like this is not explicitly given as an example in the Python ref: https://docs.databricks.com/delta-live-tables/python-ref.html. I think that making this an example in a section called "Reading from sources" with examples on how to read in various ways would save people some time. I will send some feedback on that.

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.