cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Ingesting Kafka Avro into an Delta STREAMING LIVE TABLE

jm99
New Contributor III

Using Azure Databricks:

I can create a DLT table in python using

import dlt
import pyspark.sql.functions as fn
from pyspark.sql.types import StringType
 
@dlt.table(
    name = "<<landingTable>>",
    path = "<<storage path>>",
    comment = "<< descriptive comment>>"
)
def landingTable():
    jasConfig = "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username='{}' password='{}';".format(confluentApiKey, confluentSecret)
    
    binary_to_string = fn.udf(lambda x: str(int.from_bytes(x, byteorder='big')), StringType())
    
    kafkaOptions = {
      "kafka.bootstrap.servers": confluentBootstrapServers,
      "kafka.security.protocol": "SASL_SSL",
      "kafka.sasl.jaas.config": jasConfig,
      "kafka.ssl.endpoint.identification.algorithm": "https",
      "kafka.sasl.mechanism": "PLAIN",
      "subscribe": confluentTopicName,
      "startingOffsets": "earliest",
      "failOnDataLoss": "false"
    }
    
    return (
        spark
            .readStream
            .format("kafka")
            .options(**kafkaOptions)
            .load()
            .withColumn('key', fn.col("key").cast(StringType()))
            .withColumn('valueSchemaId', binary_to_string(fn.expr("substring(value, 2, 4)")))
            .withColumn('avroValue', fn.expr("substring(value, 6, length(value)-5)"))
            .select(
                'topic',
                'partition',
                'offset',
                'timestamp',
                'timestampType',
                'key',
                'valueSchemaId',
                'avroValue'
            )

But not sure how progress:

  1. Ensure that the DLT table is INCREMENTAL / STREAMING
  2. Deserialize the AVRO from:
  • Confluent Schema Registry Client
  • avsc files in an Azure storage account
  • hard coded in a python UDF

My current assumption is to store the raw avro messages in the Bronze/Landing STreamiung live table then use a streaming live view, with a python UDF, to perform the deserializtion.

Just not sure how to get there

2 REPLIES 2

lninza
New Contributor II

Hi @John Mathews​ 

did you find a way to progress here?

i am stuck in the same point...

jm99
New Contributor III

No I didn't. In fact I had to stop using DLT when another issue came up around performing a partial /streaming increment of a large platinum aggregation table. I ended up going back to using:

  • Kafka reader (see Consume Data From Apache Kafka )
  • Streaming dataframes
  • Delta table: enabling enableChangeDataFeed and processing the "readChangeFeed"
  • Using foreachBatch() method on the stream writer to apply the aggregation and required SCD1 upsert

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group