- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
01-03-2024 03:22 AM - edited 01-03-2024 03:23 AM
@jcozar
I'm using Kafka directly as a source in DLT. The bronze table is set up to receive binlogs data from Kafka as follows:
@dlt.table
def bronze_table():
spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", kafka) \
.option("subscribe", topic) \
.load()
This data is received in a format with an 'after' section containing column and value information, like:
(This format from debezium, https://debezium.io/documentation/reference/stable/connectors/mysql.html)
{ "before": null, "after": { "col1": "hello", "col2": "world", ... "col10": 0 }, ... }
To create silver table from the 'after' JSON in bronze table, I've used a schema with the from_json function. Initially, the schema had issues, leading to incorrect data types and null values. (col10 in example) I've changed the schema in from_json and performed a selected refresh.
# wrong schema
schema = StructType([ \
StructField('col1', StringType(), True), \
StructField('col2', StringType(), True), \
....
StructField('col10', BooleanType(), True)
])
# edited schema
schema = StructType([ \
StructField('col1', StringType(), True), \
StructField('col2', StringType(), True), \
....
StructField('col10', IntegerType(), True)
])
@dlt.table
def silver_table():
dlt.read_stream("bronze_table").withColumn("after", from_json(col("after"), schema))
Although we're not using Unity Catalog tables, our bronze table still retains all the information.
Let me know if you need more specific information!