<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Ingesting Kafka Avro into an Delta STREAMING  LIVE TABLE in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/ingesting-kafka-avro-into-an-delta-streaming-live-table/m-p/17987#M11879</link>
    <description>&lt;P&gt;Using Azure Databricks:&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;I can create a DLT table in python using&lt;/P&gt;&lt;PRE&gt;&lt;CODE&gt;import dlt
import pyspark.sql.functions as fn
from pyspark.sql.types import StringType
&amp;nbsp;
@dlt.table(
    name = "&amp;lt;&amp;lt;landingTable&amp;gt;&amp;gt;",
    path = "&amp;lt;&amp;lt;storage path&amp;gt;&amp;gt;",
    comment = "&amp;lt;&amp;lt; descriptive comment&amp;gt;&amp;gt;"
)
def landingTable():
    jasConfig = "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username='{}' password='{}';".format(confluentApiKey, confluentSecret)
    
    binary_to_string = fn.udf(lambda x: str(int.from_bytes(x, byteorder='big')), StringType())
    
    kafkaOptions = {
      "kafka.bootstrap.servers": confluentBootstrapServers,
      "kafka.security.protocol": "SASL_SSL",
      "kafka.sasl.jaas.config": jasConfig,
      "kafka.ssl.endpoint.identification.algorithm": "https",
      "kafka.sasl.mechanism": "PLAIN",
      "subscribe": confluentTopicName,
      "startingOffsets": "earliest",
      "failOnDataLoss": "false"
    }
    
    return (
        spark
            .readStream
            .format("kafka")
            .options(**kafkaOptions)
            .load()
            .withColumn('key', fn.col("key").cast(StringType()))
            .withColumn('valueSchemaId', binary_to_string(fn.expr("substring(value, 2, 4)")))
            .withColumn('avroValue', fn.expr("substring(value, 6, length(value)-5)"))
            .select(
                'topic',
                'partition',
                'offset',
                'timestamp',
                'timestampType',
                'key',
                'valueSchemaId',
                'avroValue'
            )&lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt;But not sure how progress:&lt;/P&gt;&lt;OL&gt;&lt;LI&gt;Ensure that the DLT table is INCREMENTAL / STREAMING&lt;/LI&gt;&lt;LI&gt;Deserialize the AVRO from:&lt;/LI&gt;&lt;/OL&gt;&lt;UL&gt;&lt;LI&gt;Confluent Schema Registry Client&lt;/LI&gt;&lt;LI&gt;avsc files in an Azure storage account&lt;/LI&gt;&lt;LI&gt;hard coded in a python UDF &lt;/LI&gt;&lt;/UL&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;My current assumption is to store the raw avro messages in the Bronze/Landing STreamiung live table then use a streaming live view, with a python UDF, to perform the deserializtion.&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Just not sure how to get there&lt;/P&gt;</description>
    <pubDate>Fri, 10 Jun 2022 09:06:03 GMT</pubDate>
    <dc:creator>jm99</dc:creator>
    <dc:date>2022-06-10T09:06:03Z</dc:date>
    <item>
      <title>Ingesting Kafka Avro into an Delta STREAMING  LIVE TABLE</title>
      <link>https://community.databricks.com/t5/data-engineering/ingesting-kafka-avro-into-an-delta-streaming-live-table/m-p/17987#M11879</link>
      <description>&lt;P&gt;Using Azure Databricks:&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;I can create a DLT table in python using&lt;/P&gt;&lt;PRE&gt;&lt;CODE&gt;import dlt
import pyspark.sql.functions as fn
from pyspark.sql.types import StringType
&amp;nbsp;
@dlt.table(
    name = "&amp;lt;&amp;lt;landingTable&amp;gt;&amp;gt;",
    path = "&amp;lt;&amp;lt;storage path&amp;gt;&amp;gt;",
    comment = "&amp;lt;&amp;lt; descriptive comment&amp;gt;&amp;gt;"
)
def landingTable():
    jasConfig = "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username='{}' password='{}';".format(confluentApiKey, confluentSecret)
    
    binary_to_string = fn.udf(lambda x: str(int.from_bytes(x, byteorder='big')), StringType())
    
    kafkaOptions = {
      "kafka.bootstrap.servers": confluentBootstrapServers,
      "kafka.security.protocol": "SASL_SSL",
      "kafka.sasl.jaas.config": jasConfig,
      "kafka.ssl.endpoint.identification.algorithm": "https",
      "kafka.sasl.mechanism": "PLAIN",
      "subscribe": confluentTopicName,
      "startingOffsets": "earliest",
      "failOnDataLoss": "false"
    }
    
    return (
        spark
            .readStream
            .format("kafka")
            .options(**kafkaOptions)
            .load()
            .withColumn('key', fn.col("key").cast(StringType()))
            .withColumn('valueSchemaId', binary_to_string(fn.expr("substring(value, 2, 4)")))
            .withColumn('avroValue', fn.expr("substring(value, 6, length(value)-5)"))
            .select(
                'topic',
                'partition',
                'offset',
                'timestamp',
                'timestampType',
                'key',
                'valueSchemaId',
                'avroValue'
            )&lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt;But not sure how progress:&lt;/P&gt;&lt;OL&gt;&lt;LI&gt;Ensure that the DLT table is INCREMENTAL / STREAMING&lt;/LI&gt;&lt;LI&gt;Deserialize the AVRO from:&lt;/LI&gt;&lt;/OL&gt;&lt;UL&gt;&lt;LI&gt;Confluent Schema Registry Client&lt;/LI&gt;&lt;LI&gt;avsc files in an Azure storage account&lt;/LI&gt;&lt;LI&gt;hard coded in a python UDF &lt;/LI&gt;&lt;/UL&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;My current assumption is to store the raw avro messages in the Bronze/Landing STreamiung live table then use a streaming live view, with a python UDF, to perform the deserializtion.&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Just not sure how to get there&lt;/P&gt;</description>
      <pubDate>Fri, 10 Jun 2022 09:06:03 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/ingesting-kafka-avro-into-an-delta-streaming-live-table/m-p/17987#M11879</guid>
      <dc:creator>jm99</dc:creator>
      <dc:date>2022-06-10T09:06:03Z</dc:date>
    </item>
    <item>
      <title>Re: Ingesting Kafka Avro into an Delta STREAMING  LIVE TABLE</title>
      <link>https://community.databricks.com/t5/data-engineering/ingesting-kafka-avro-into-an-delta-streaming-live-table/m-p/17988#M11880</link>
      <description>&lt;P&gt;Hi @John Mathews​&amp;nbsp; &lt;/P&gt;&lt;P&gt;did you find a way to progress here?&lt;/P&gt;&lt;P&gt;i am stuck in the same point...&lt;/P&gt;</description>
      <pubDate>Tue, 25 Oct 2022 12:18:02 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/ingesting-kafka-avro-into-an-delta-streaming-live-table/m-p/17988#M11880</guid>
      <dc:creator>lninza</dc:creator>
      <dc:date>2022-10-25T12:18:02Z</dc:date>
    </item>
    <item>
      <title>Re: Ingesting Kafka Avro into an Delta STREAMING  LIVE TABLE</title>
      <link>https://community.databricks.com/t5/data-engineering/ingesting-kafka-avro-into-an-delta-streaming-live-table/m-p/17989#M11881</link>
      <description>&lt;P&gt;No I didn't.  In fact I had to stop using DLT when another issue came up around performing a partial /streaming increment of a large platinum aggregation table. I ended up going back to using:&lt;/P&gt;&lt;UL&gt;&lt;LI&gt;Kafka reader (see  &lt;A href="https://www.confluent.io/blog/consume-avro-data-from-kafka-topics-and-secured-schema-registry-with-databricks-confluent-cloud-on-azure" alt="https://www.confluent.io/blog/consume-avro-data-from-kafka-topics-and-secured-schema-registry-with-databricks-confluent-cloud-on-azure" target="_blank"&gt;Consume Data From Apache Kafka&lt;/A&gt;  )&lt;/LI&gt;&lt;LI&gt;Streaming dataframes&lt;/LI&gt;&lt;LI&gt;Delta table: enabling enableChangeDataFeed and processing the "readChangeFeed"&lt;/LI&gt;&lt;LI&gt;Using foreachBatch() method on the stream writer to apply the aggregation and required SCD1 upsert&lt;/LI&gt;&lt;/UL&gt;</description>
      <pubDate>Tue, 25 Oct 2022 15:53:13 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/ingesting-kafka-avro-into-an-delta-streaming-live-table/m-p/17989#M11881</guid>
      <dc:creator>jm99</dc:creator>
      <dc:date>2022-10-25T15:53:13Z</dc:date>
    </item>
  </channel>
</rss>

