cancel
Showing results for 
Search instead for 
Did you mean: 
Warehousing & Analytics
Engage in discussions on data warehousing, analytics, and BI solutions within the Databricks Community. Share insights, tips, and best practices for leveraging data for informed decision-making.
cancel
Showing results for 
Search instead for 
Did you mean: 

In Python, Streaming read by DLT from Hive Table

MetaRossiVinli
Contributor

I am pulling data from Google BigQuery and writing it to a bronze table on an interval. I do this in a separate continuous job because DLT did not like the BigQuery connector calling collect on a dataframe inside of DLT.

In Python, I would like to read that bronze table in to DLT in a streaming fashion and create a silver table with some complex dataframe logic and functions. I can accomplish this with the below SQL, but most of our pipeline is in Python and I'd like to know how to do this.

I am probably missing something rather small. I do NOT want to use the absolute path if possible. I would rather reference the table.

How do I convert the below SQL to Python? Can I use a table reference in Python? Where is this explained in the docs?

CREATE STREAMING LIVE VIEW silver_1  -- create a new STREAMING LIVE view called silver_1
SELECT *
FROM STREAM(dev.bronze_raw)
-- catalog = hive_metastore
-- schema = dev
-- table = bronze_raw
-- path would be something like = dbfs:/user/hive/warehouse/dev.db/bronze_raw

Python please...

import dlt
 
???

1 ACCEPTED SOLUTION

Accepted Solutions

MetaRossiVinli
Contributor

The below code is a solution. I was missing that I could read from a table with `spark.readStream.format("delta").table("...")`. Simple. Just missed it. This is different than `dlt.read_stream()` which appears in the examples a lot.

This is referenced as an example in the docs on CDC: https://docs.databricks.com/delta-live-tables/cdc.html.

import dlt
 
@dlt.table(
    table_properties = {"quality" : "silver"}
)
def silver_1():
    # Read the changes as a stream from the table
    df = spark.readStream.format("delta").table("hive_metastore.dev.bronze_raw")
    
    # Return the entire dataframe with all columns
    return df

Reading from a table like this is not explicitly given as an example in the Python ref: https://docs.databricks.com/delta-live-tables/python-ref.html. I think that making this an example in a section called "Reading from sources" with examples on how to read in various ways would save people some time. I will send some feedback on that.

View solution in original post

1 REPLY 1

MetaRossiVinli
Contributor

The below code is a solution. I was missing that I could read from a table with `spark.readStream.format("delta").table("...")`. Simple. Just missed it. This is different than `dlt.read_stream()` which appears in the examples a lot.

This is referenced as an example in the docs on CDC: https://docs.databricks.com/delta-live-tables/cdc.html.

import dlt
 
@dlt.table(
    table_properties = {"quality" : "silver"}
)
def silver_1():
    # Read the changes as a stream from the table
    df = spark.readStream.format("delta").table("hive_metastore.dev.bronze_raw")
    
    # Return the entire dataframe with all columns
    return df

Reading from a table like this is not explicitly given as an example in the Python ref: https://docs.databricks.com/delta-live-tables/python-ref.html. I think that making this an example in a section called "Reading from sources" with examples on how to read in various ways would save people some time. I will send some feedback on that.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group