532664
New Contributor III

@jcozar 
I'm using Kafka directly as a source in DLT. The bronze table is set up to receive binlogs data from Kafka as follows:

 

@dlt.table
def bronze_table():
    spark.readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", kafka) \
        .option("subscribe", topic) \
        .load()

 

This data is received in a format with an 'after' section containing column and value information, like:

(This format from debezium, https://debezium.io/documentation/reference/stable/connectors/mysql.html)

 

{ "before": null, "after": { "col1": "hello", "col2": "world", ... "col10": 0 }, ... }

 

To create silver table from the 'after' JSON in bronze table, I've used a schema with the from_json function. Initially, the schema had issues, leading to incorrect data types and null values. (col10 in example) I've changed the schema in from_json and performed a selected refresh.

 

# wrong schema
schema = StructType([ \
    StructField('col1', StringType(), True), \
    StructField('col2', StringType(), True), \
    ....
    StructField('col10', BooleanType(), True)
 ])

# edited schema 
schema = StructType([ \
    StructField('col1', StringType(), True), \
    StructField('col2', StringType(), True), \
    ....
    StructField('col10', IntegerType(), True)
 ])


@dlt.table
def silver_table():
    dlt.read_stream("bronze_table").withColumn("after", from_json(col("after"), schema))

 

Although we're not using Unity Catalog tables, our bronze table still retains all the information.

Let me know if you need more specific information!