I've defined a streaming deltlive table in a notebook using python.
- running on "preview" channel
- delta cache accelerated (Standard_D4ads_v5) compute
It fails with
org.apache.spark.sql.streaming.StreamingQueryException: [STREAM_FAILED] Query [id = xxx, runId = yyy] terminated with exception: getPrimaryKeys not implemented for debezium SQLSTATE: XXKST
- running on "current" channel
- delta cache accelerated (Standard_D4ads_v5) compute
It fails with
scala.ScalaReflectionException: object com.databricks.cdc.spark.DebeziumJDBCMicroBatchProvider not found.
The notebook
import dlt
import pyspark.sql.functions as F
@dlt.table(
name="sliver_hour_values",
comment="...",
table_properties={"quality": "silver"},
partition_cols=["event_year", "event_month", "event_day_of_month"],
)
@dlt.expect_or_drop("valid_date", F.col("event_datetime").isNotNull())
@dlt.expect_or_drop("valid_report_index", F.col("report_index").isNotNull())
@dlt.expect_or_drop("valid_event_datetime", F.col("event_datetime").isNotNull())
@dlt.expect_or_drop("valid_event_value", F.col("event_value").isNotNull())
def get_hour_values():
return (
spark
.readStream
.table("mycatalog.myschema.hourvalues")
.withColumnRenamed('ReportIx', 'report_index')
.withColumnRenamed('DateTime', 'event_datetime')
.withColumnRenamed('Value', 'event_value')
.withColumnRenamed('Quality', 'quality')
.select(
"report_index",
"event_datetime",
"event_value",
"quality"
)
.withColumn("ingestion_datetime", F.current_timestamp())
.withColumn("event_year", F.year(F.col("event_datetime")))
.withColumn("event_month", F.month(F.col("event_datetime")))
.withColumn("event_day_of_month", F.dayofmonth(F.col("event_datetime")))
)