cancel
Showing results for 
Search instead for 
Did you mean: 
Warehousing & Analytics
Engage in discussions on data warehousing, analytics, and BI solutions within the Databricks Community. Share insights, tips, and best practices for leveraging data for informed decision-making.
cancel
Showing results for 
Search instead for 
Did you mean: 

In Python, Streaming read by DLT from Hive Table

MetaRossiVinli
Contributor

I am pulling data from Google BigQuery and writing it to a bronze table on an interval. I do this in a separate continuous job because DLT did not like the BigQuery connector calling collect on a dataframe inside of DLT.

 

In Python, I would like to read that bronze table in to DLT in a streaming fashion and create a silver table with some complex dataframe logic and functions. I can accomplish this with the below SQL, but most of our pipeline is in Python and I'd like to know how to do this.

 

I am probably missing something rather small. I do NOT want to use the absolute path if possible. I would rather reference the table.

 

How do I convert the below SQL to Python? Can I use a table reference in Python? Where is this explained in the docs?

CREATE STREAMING LIVE VIEW silver_1  -- create a new STREAMING LIVE view called silver_1
SELECT *
FROM STREAM(dev.bronze_raw)
-- catalog = hive_metastore
-- schema = dev
-- table = bronze_raw
-- path would be something like = dbfs:/user/hive/warehouse/dev.db/bronze_raw

Python please...

import dlt
 
???

 

1 ACCEPTED SOLUTION

Accepted Solutions

MetaRossiVinli
Contributor

The below code is a solution. I was missing that I could read from a table with `spark.readStream.format("delta").table("...")`. Simple. Just missed it. This is different than `dlt.read_stream()` which appears in the examples a lot.

This is referenced as an example in the docs on CDC: https://docs.databricks.com/delta-live-tables/cdc.html.

import dlt
 
@dlt.table(
    table_properties = {"quality" : "silver"}
)
def silver_1():
    # Read the changes as a stream from the table
    df = spark.readStream.format("delta").table("hive_metastore.dev.bronze_raw")
    
    # Return the entire dataframe with all columns
    return df

Reading from a table like this is not explicitly given as an example in the Python ref: https://docs.databricks.com/delta-live-tables/python-ref.html. I think that making this an example in a section called "Reading from sources" with examples on how to read in various ways would save people some time. I will send some feedback on that.

View solution in original post

1 REPLY 1

MetaRossiVinli
Contributor

The below code is a solution. I was missing that I could read from a table with `spark.readStream.format("delta").table("...")`. Simple. Just missed it. This is different than `dlt.read_stream()` which appears in the examples a lot.

This is referenced as an example in the docs on CDC: https://docs.databricks.com/delta-live-tables/cdc.html.

import dlt
 
@dlt.table(
    table_properties = {"quality" : "silver"}
)
def silver_1():
    # Read the changes as a stream from the table
    df = spark.readStream.format("delta").table("hive_metastore.dev.bronze_raw")
    
    # Return the entire dataframe with all columns
    return df

Reading from a table like this is not explicitly given as an example in the Python ref: https://docs.databricks.com/delta-live-tables/python-ref.html. I think that making this an example in a section called "Reading from sources" with examples on how to read in various ways would save people some time. I will send some feedback on that.

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now