cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

[delta live tabel] exception: getPrimaryKeys not implemented for debezium

smedegaard
New Contributor III

I've defined a streaming deltlive table in a notebook using python.

  • running on "preview" channel
  • delta cache accelerated (Standard_D4ads_v5) compute

It fails with
org.apache.spark.sql.streaming.StreamingQueryException: [STREAM_FAILED] Query [id = xxx, runId = yyy] terminated with exception: getPrimaryKeys not implemented for debezium SQLSTATE: XXKST

  • running on "current" channel
  • delta cache accelerated (Standard_D4ads_v5) compute

It fails with

scala.ScalaReflectionException: object com.databricks.cdc.spark.DebeziumJDBCMicroBatchProvider not found.

The notebook

 

import dlt
import pyspark.sql.functions as F

@dlt.table(
  name="sliver_hour_values",
  comment="...",
  table_properties={"quality": "silver"},
  partition_cols=["event_year", "event_month", "event_day_of_month"],
)
@dlt.expect_or_drop("valid_date", F.col("event_datetime").isNotNull())
@dlt.expect_or_drop("valid_report_index", F.col("report_index").isNotNull())
@dlt.expect_or_drop("valid_event_datetime", F.col("event_datetime").isNotNull())
@dlt.expect_or_drop("valid_event_value", F.col("event_value").isNotNull())
def get_hour_values():
  return (
    spark
    .readStream
    .table("mycatalog.myschema.hourvalues")
      .withColumnRenamed('ReportIx', 'report_index')
      .withColumnRenamed('DateTime', 'event_datetime')
      .withColumnRenamed('Value', 'event_value')
      .withColumnRenamed('Quality', 'quality')
    .select(
        "report_index",
        "event_datetime",
        "event_value",
        "quality"
    )
    .withColumn("ingestion_datetime", F.current_timestamp())
    .withColumn("event_year", F.year(F.col("event_datetime")))
    .withColumn("event_month", F.month(F.col("event_datetime")))
    .withColumn("event_day_of_month", F.dayofmonth(F.col("event_datetime")))
  )

 

 

2 REPLIES 2

Kaniz_Fatma
Community Manager
Community Manager

Hi @smedegaard

  • You’re encountering a StreamingQueryException with the message: “getPrimaryKeys not implemented for debezium SQLSTATE: XXKST.”
  • This error suggests that the getPrimaryKeys operation is not supported for the Debezium connector in your current setup.
  • To resolve this, consider checking the configuration of your Debezium connector and ensure that it supports the necessary operations for your use case.
  • You’re encountering a ScalaReflectionException with the message: “object com.databricks.cdc.spark.DebeziumJDBCMicroBatchProvider not found.”
  • This error indicates that the specified class (com.databricks.cdc.spark.DebeziumJDBCMicroBatchProvider) is not available in your classpath.
  • To address this issue:
    • Verify that the required library or package containing the missing class is correctly included in your environment.
    • Check your Spark configuration to ensure that the necessary dependencies are available.
    • If the class is part of an external library, make sure it’s added to your Spark cluster’s library dependencies.
  • If you need further assistance, feel free to ask! 😊

@Kaniz_Fatma Thanks for the reply. But it does not help me much. Do you have more specific advise to help me resolve my problem?

Join 100K+ Data Experts: Register Now & Grow with Us!

Excited to expand your horizons with us? Click here to Register and begin your journey to success!

Already a member? Login and join your local regional user group! If there isn’t one near you, fill out this form and we’ll create one for you to join!