Using Azure Databricks:
I can create a DLT table in python using
import dlt
import pyspark.sql.functions as fn
from pyspark.sql.types import StringType
name = "<<landingTable>>",
path = "<<storage path>>",
comment = "<< descriptive comment>>"
def landingTable():
jasConfig = " required username='{}' password='{}';".format(confluentApiKey, confluentSecret)
binary_to_string = fn.udf(lambda x: str(int.from_bytes(x, byteorder='big')), StringType())
kafkaOptions = {
"kafka.bootstrap.servers": confluentBootstrapServers,
"": "SASL_SSL",
"kafka.sasl.jaas.config": jasConfig,
"kafka.ssl.endpoint.identification.algorithm": "https",
"kafka.sasl.mechanism": "PLAIN",
"subscribe": confluentTopicName,
"startingOffsets": "earliest",
"failOnDataLoss": "false"
return (
.withColumn('key', fn.col("key").cast(StringType()))
.withColumn('valueSchemaId', binary_to_string(fn.expr("substring(value, 2, 4)")))
.withColumn('avroValue', fn.expr("substring(value, 6, length(value)-5)"))
But not sure how progress:
- Ensure that the DLT table is INCREMENTAL / STREAMING
- Deserialize the AVRO from:
- Confluent Schema Registry Client
- avsc files in an Azure storage account
- hard coded in a python UDF
My current assumption is to store the raw avro messages in the Bronze/Landing STreamiung live table then use a streaming live view, with a python UDF, to perform the deserializtion.
Just not sure how to get there