4 weeks ago
I want to create empty streaming tables in dlt with only schema specified. Is it possible ?
I want to do it in dlt python.
4 weeks ago - last edited 4 weeks ago
it is possible to create empty streaming tables in Delta Live Tables (DLT) with only the schema specified using the DLT Python API. Here is how you can do it:
from pyspark.sql.types import StructType, StructField, StringType, LongType
sales_schema = StructType([
StructField("customer_id", StringType(), True),
StructField("customer_name", StringType(), True),
StructField("number_of_line_items", StringType(), True),
StructField("order_datetime", StringType(), True),
StructField("order_number", LongType(), True)
])
Use the @Dlt.table decorator to define the streaming table and specify the schema. Here is an example:
@Dlt.table(
name="sales",
comment="Raw data on sales",
schema=sales_schema
)
def sales():
return spark.readStream.format("rate").load() # Placeholder for the actual streaming source
In this example, the sales function defines a streaming table with the specified schema. The spark.readStream.format("rate").load() is a placeholder for the actual streaming source
Please refer to: https://docs.databricks.com/en/delta-live-tables/python-ref.html
4 weeks ago
Hi there @Alberto_Umana , Awesome i knew i can try spark.readStream but didn't know how to specify the source.
My major motive of creating this as streaming table is not for streaming source but for incremental ingestion , i.e. post probably the job will be either a batch job or will loading with the help of autoloader.
So what does ("rate") means I didn't get it.
3 weeks ago - last edited 3 weeks ago
Hi there @Alberto_Umana ,
I tried this approach but I am getting this error. It is able to create a streaming table but when a schema is specified it fails.
I am getting this error
com.databricks.pipelines.common.errors.DLTAnalysisException: Table 'bb1123_loans' has a user-specified schema that is incompatible with the schema
inferred from its query.
"
Streaming tables are stateful and remember data that has already been
processed. If you want to recompute the table from scratch, please full refresh
the table.
Declared schema:
root
|-- loan_number: string (nullable = false)
|-- loan_bal: decimal(38,18) (nullable = true)
|-- cust_number: string (nullable = true)
|-- cust_nm: string (nullable = true)
|-- cust_addr: string (nullable = true)
Inferred schema:
root
|-- timestamp: timestamp (nullable = true)
|-- value: long (nullable = true)
at com.databricks.pipelines.graph.DataflowGraph.$anonfun$validationFailure$22(DataflowGraph.scala:984)
at lang.Thread.run(Thread.java:750)
This is my code
# Case 3: Empty table with a predefined schema
if not schema:
raise ValueError("Schema must be provided for empty tables without input_path or DataFrame.")
@dlt.table(name=table_name,schema = schema)
def empty_table():
log_event(logger,f"Creating empty DLT streaming table: {table_name}.", "INFO")
return (
spark.readStream.format("rate").load()
)
3 weeks ago
I figured out a solution to this error. So by just creating an empty streaming table with rate source we cannot specify the schema we want. We have to build on top of it and edit the schemas. This is what I mean
@dlt.table(name=table_name)
def empty_table():
log_event(logger,f"Creating empty DLT streaming table: {table_name}.", "INFO")
# Create a dummy streaming DataFrame with the `rate` source
streaming_df = spark.readStream.format("rate").option("rowsPerSecond", 1).load()
# Map the `rate` source columns to match the schema with default null values
empty_stream_df = streaming_df.select(
*[
lit(None).cast(field.dataType).alias(field.name)
for field in schema.fields
]
).where("1=0") # Ensures the stream is empty
return empty_stream_df
If this looks correct. @Alberto_Umana . Just send a like. I will accept it as a solution. If you have something better then plz do tell me about it
4 weeks ago
Hi @ashraf1395,
The term "rate" refers to a special source in Apache Spark's Structured Streaming that generates data at a specified rate. This source is primarily used for testing and benchmarking purposes. When you use spark.readStream.format("rate").load(), it creates a streaming DataFrame that continuously generates rows with two columns: timestamp and value. The timestamp column contains the current timestamp, and the value column contains a monotonically increasing number starting from 0.
This source allows you to simulate a stream of data without needing an actual data source, which can be useful for testing the performance and behavior of your streaming application. You can control the rate at which rows are generated by setting options such as rowsPerSecond
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโt want to miss the chance to attend and share knowledge.
If there isnโt a group near you, start one and help create a community that brings people together.
Request a New Group