cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Ingesting Kafka Avro into an Delta STREAMING LIVE TABLE

jm99
New Contributor III

Using Azure Databricks:

I can create a DLT table in python using

import dlt
import pyspark.sql.functions as fn
from pyspark.sql.types import StringType
 
@dlt.table(
    name = "<<landingTable>>",
    path = "<<storage path>>",
    comment = "<< descriptive comment>>"
)
def landingTable():
    jasConfig = "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username='{}' password='{}';".format(confluentApiKey, confluentSecret)
    
    binary_to_string = fn.udf(lambda x: str(int.from_bytes(x, byteorder='big')), StringType())
    
    kafkaOptions = {
      "kafka.bootstrap.servers": confluentBootstrapServers,
      "kafka.security.protocol": "SASL_SSL",
      "kafka.sasl.jaas.config": jasConfig,
      "kafka.ssl.endpoint.identification.algorithm": "https",
      "kafka.sasl.mechanism": "PLAIN",
      "subscribe": confluentTopicName,
      "startingOffsets": "earliest",
      "failOnDataLoss": "false"
    }
    
    return (
        spark
            .readStream
            .format("kafka")
            .options(**kafkaOptions)
            .load()
            .withColumn('key', fn.col("key").cast(StringType()))
            .withColumn('valueSchemaId', binary_to_string(fn.expr("substring(value, 2, 4)")))
            .withColumn('avroValue', fn.expr("substring(value, 6, length(value)-5)"))
            .select(
                'topic',
                'partition',
                'offset',
                'timestamp',
                'timestampType',
                'key',
                'valueSchemaId',
                'avroValue'
            )

But not sure how progress:

  1. Ensure that the DLT table is INCREMENTAL / STREAMING
  2. Deserialize the AVRO from:
  • Confluent Schema Registry Client
  • avsc files in an Azure storage account
  • hard coded in a python UDF

My current assumption is to store the raw avro messages in the Bronze/Landing STreamiung live table then use a streaming live view, with a python UDF, to perform the deserializtion.

Just not sure how to get there

2 REPLIES 2

lninza
New Contributor II

Hi @John Mathews​ 

did you find a way to progress here?

i am stuck in the same point...

jm99
New Contributor III

No I didn't. In fact I had to stop using DLT when another issue came up around performing a partial /streaming increment of a large platinum aggregation table. I ended up going back to using:

  • Kafka reader (see Consume Data From Apache Kafka )
  • Streaming dataframes
  • Delta table: enabling enableChangeDataFeed and processing the "readChangeFeed"
  • Using foreachBatch() method on the stream writer to apply the aggregation and required SCD1 upsert
Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.