cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Empty Streaming tables in dlt

ashraf1395
Valued Contributor

I want to create empty streaming tables in dlt with only schema specified. Is it possible ?
I want to do it in dlt python.

5 REPLIES 5

Alberto_Umana
Databricks Employee
Databricks Employee

it is possible to create empty streaming tables in Delta Live Tables (DLT) with only the schema specified using the DLT Python API. Here is how you can do it:

 

  • Import the DLT module:
    import dlt

 

  • Define the schema:
    You can specify the schema using a Python StructType or a SQL DDL string. Here is an example using a Python StructType:

from pyspark.sql.types import StructType, StructField, StringType, LongType

 

sales_schema = StructType([

    StructField("customer_id", StringType(), True),

    StructField("customer_name", StringType(), True),

    StructField("number_of_line_items", StringType(), True),

    StructField("order_datetime", StringType(), True),

    StructField("order_number", LongType(), True)

])

 

  • Create the streaming table:

Use the @Dlt.table decorator to define the streaming table and specify the schema. Here is an example:

 

@Dlt.table(

    name="sales",

    comment="Raw data on sales",

    schema=sales_schema

)

def sales():

    return spark.readStream.format("rate").load()  # Placeholder for the actual streaming source


In this example, the sales function defines a streaming table with the specified schema. The spark.readStream.format("rate").load() is a placeholder for the actual streaming source

Please refer to: https://docs.databricks.com/en/delta-live-tables/python-ref.html

Hi there @Alberto_Umana , Awesome i knew i can try spark.readStream but didn't know how to specify the source.

My major motive of creating this as streaming table is not for streaming source but for incremental ingestion , i.e. post probably the job will be either a batch job or will loading with the help of autoloader.

So what does ("rate") means I didn't get it.

Hi there @Alberto_Umana ,

I tried this approach but I am getting this error. It is able to create a streaming table but when a schema is specified it fails.

I am getting this error

 

com.databricks.pipelines.common.errors.DLTAnalysisException: Table 'bb1123_loans' has a user-specified schema that is incompatible with the schema
 inferred from its query.
"
Streaming tables are stateful and remember data that has already been
processed. If you want to recompute the table from scratch, please full refresh
the table.
              

Declared schema:
root
 |-- loan_number: string (nullable = false)
 |-- loan_bal: decimal(38,18) (nullable = true)
 |-- cust_number: string (nullable = true)
 |-- cust_nm: string (nullable = true)
 |-- cust_addr: string (nullable = true)


Inferred schema:
root
 |-- timestamp: timestamp (nullable = true)
 |-- value: long (nullable = true)
at com.databricks.pipelines.graph.DataflowGraph.$anonfun$validationFailure$22(DataflowGraph.scala:984)
at lang.Thread.run(Thread.java:750)

 

 

This is my code

 

            # Case 3: Empty table with a predefined schema
            if not schema:
                raise ValueError("Schema must be provided for empty tables without input_path or DataFrame.")
            
            @dlt.table(name=table_name,schema = schema)
            def empty_table():
                log_event(logger,f"Creating empty DLT streaming table: {table_name}.", "INFO")
                return (
                    spark.readStream.format("rate").load()
                )

 

 

I figured out a solution to this error. So by just creating an empty streaming table with rate source we cannot specify the schema we want. We have to build on top of it and edit the schemas. This is what I mean

            @dlt.table(name=table_name)
            def empty_table():
                log_event(logger,f"Creating empty DLT streaming table: {table_name}.", "INFO")
                
                # Create a dummy streaming DataFrame with the `rate` source
                streaming_df = spark.readStream.format("rate").option("rowsPerSecond", 1).load()

                # Map the `rate` source columns to match the schema with default null values
                empty_stream_df = streaming_df.select(
                    *[
                        lit(None).cast(field.dataType).alias(field.name)
                        for field in schema.fields
                    ]
                ).where("1=0")  # Ensures the stream is empty

                return empty_stream_df

 If this looks correct. @Alberto_Umana . Just send a like. I will accept it as a solution. If you have something better then plz do tell me about it 

Alberto_Umana
Databricks Employee
Databricks Employee

Hi @ashraf1395,

The term "rate" refers to a special source in Apache Spark's Structured Streaming that generates data at a specified rate. This source is primarily used for testing and benchmarking purposes. When you use spark.readStream.format("rate").load(), it creates a streaming DataFrame that continuously generates rows with two columns: timestamp and value. The timestamp column contains the current timestamp, and the value column contains a monotonically increasing number starting from 0.

This source allows you to simulate a stream of data without needing an actual data source, which can be useful for testing the performance and behavior of your streaming application. You can control the rate at which rows are generated by setting options such as rowsPerSecond

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group