Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Showing results for 
Search instead for 
Did you mean: 

Ingesting Kafka Avro into an Delta STREAMING LIVE TABLE

New Contributor III

Using Azure Databricks:

I can create a DLT table in python using

import dlt
import pyspark.sql.functions as fn
from pyspark.sql.types import StringType
    name = "<<landingTable>>",
    path = "<<storage path>>",
    comment = "<< descriptive comment>>"
def landingTable():
    jasConfig = " required username='{}' password='{}';".format(confluentApiKey, confluentSecret)
    binary_to_string = fn.udf(lambda x: str(int.from_bytes(x, byteorder='big')), StringType())
    kafkaOptions = {
      "kafka.bootstrap.servers": confluentBootstrapServers,
      "": "SASL_SSL",
      "kafka.sasl.jaas.config": jasConfig,
      "kafka.ssl.endpoint.identification.algorithm": "https",
      "kafka.sasl.mechanism": "PLAIN",
      "subscribe": confluentTopicName,
      "startingOffsets": "earliest",
      "failOnDataLoss": "false"
    return (
            .withColumn('key', fn.col("key").cast(StringType()))
            .withColumn('valueSchemaId', binary_to_string(fn.expr("substring(value, 2, 4)")))
            .withColumn('avroValue', fn.expr("substring(value, 6, length(value)-5)"))

But not sure how progress:

  1. Ensure that the DLT table is INCREMENTAL / STREAMING
  2. Deserialize the AVRO from:
  • Confluent Schema Registry Client
  • avsc files in an Azure storage account
  • hard coded in a python UDF

My current assumption is to store the raw avro messages in the Bronze/Landing STreamiung live table then use a streaming live view, with a python UDF, to perform the deserializtion.

Just not sure how to get there


New Contributor II

Hi @John Mathews​ 

did you find a way to progress here?

i am stuck in the same point...

New Contributor III

No I didn't. In fact I had to stop using DLT when another issue came up around performing a partial /streaming increment of a large platinum aggregation table. I ended up going back to using:

  • Kafka reader (see Consume Data From Apache Kafka )
  • Streaming dataframes
  • Delta table: enabling enableChangeDataFeed and processing the "readChangeFeed"
  • Using foreachBatch() method on the stream writer to apply the aggregation and required SCD1 upsert
Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.