cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

[delta live tabel] exception: getPrimaryKeys not implemented for debezium

smedegaard
New Contributor III

I've defined a streaming deltlive table in a notebook using python.

  • running on "preview" channel
  • delta cache accelerated (Standard_D4ads_v5) compute

It fails with
org.apache.spark.sql.streaming.StreamingQueryException: [STREAM_FAILED] Query [id = xxx, runId = yyy] terminated with exception: getPrimaryKeys not implemented for debezium SQLSTATE: XXKST

  • running on "current" channel
  • delta cache accelerated (Standard_D4ads_v5) compute

It fails with

scala.ScalaReflectionException: object com.databricks.cdc.spark.DebeziumJDBCMicroBatchProvider not found.

The notebook

 

import dlt
import pyspark.sql.functions as F

@dlt.table(
  name="sliver_hour_values",
  comment="...",
  table_properties={"quality": "silver"},
  partition_cols=["event_year", "event_month", "event_day_of_month"],
)
@dlt.expect_or_drop("valid_date", F.col("event_datetime").isNotNull())
@dlt.expect_or_drop("valid_report_index", F.col("report_index").isNotNull())
@dlt.expect_or_drop("valid_event_datetime", F.col("event_datetime").isNotNull())
@dlt.expect_or_drop("valid_event_value", F.col("event_value").isNotNull())
def get_hour_values():
  return (
    spark
    .readStream
    .table("mycatalog.myschema.hourvalues")
      .withColumnRenamed('ReportIx', 'report_index')
      .withColumnRenamed('DateTime', 'event_datetime')
      .withColumnRenamed('Value', 'event_value')
      .withColumnRenamed('Quality', 'quality')
    .select(
        "report_index",
        "event_datetime",
        "event_value",
        "quality"
    )
    .withColumn("ingestion_datetime", F.current_timestamp())
    .withColumn("event_year", F.year(F.col("event_datetime")))
    .withColumn("event_month", F.month(F.col("event_datetime")))
    .withColumn("event_day_of_month", F.dayofmonth(F.col("event_datetime")))
  )

 

 

2 REPLIES 2

Kaniz
Community Manager
Community Manager

Hi @smedegaard

  • Youโ€™re encountering a StreamingQueryException with the message: โ€œgetPrimaryKeys not implemented for debezium SQLSTATE: XXKST.โ€
  • This error suggests that the getPrimaryKeys operation is not supported for the Debezium connector in your current setup.
  • To resolve this, consider checking the configuration of your Debezium connector and ensure that it supports the necessary operations for your use case.
  • Youโ€™re encountering a ScalaReflectionException with the message: โ€œobject com.databricks.cdc.spark.DebeziumJDBCMicroBatchProvider not found.โ€
  • This error indicates that the specified class (com.databricks.cdc.spark.DebeziumJDBCMicroBatchProvider) is not available in your classpath.
  • To address this issue:
    • Verify that the required library or package containing the missing class is correctly included in your environment.
    • Check your Spark configuration to ensure that the necessary dependencies are available.
    • If the class is part of an external library, make sure itโ€™s added to your Spark clusterโ€™s library dependencies.
  • If you need further assistance, feel free to ask! ๐Ÿ˜Š

smedegaard
New Contributor III

@Kaniz Thanks for the reply. But it does not help me much. Do you have more specific advise to help me resolve my problem?

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.