Hi guys!
I'm facing a problem, and I have no idea where it came from. My process is not appending all the topic data into my bronze table. I checked the topic, and the data is there.
For example, I have some rows that are still in my Kafka topic but do not show up in my Bronze table. These records were created in Kafka around the same time yesterday. However, until now, they have not been processed, but the new data (from today) is there.
There were no errors or alerts in my jobs, and it's pure chaos.
read_kafka = (spark.readStream.format('kafka')
.option("kafka.bootstrap.servers", KAFKA_HOST)
.option("subscribe", topic_name)
.option("startingOffsets", "earliest")
.option("minPartitions", 15)
.option("unparsedDataColumn", "bi_unparsed_data")
.load())
read_kafka = transform_data(read_kafka, tableObject.schema, force_reset_schema)
read_kafka = adjust_date_columns(read_kafka)
write_stream = (read_kafka.writeStream
.format("delta")
.option("mergeSchema", "true")
.option("checkpointLocation", f"{delta_location}_checkpoint/")
.queryName(table_name_concat_target)
.outputMode("Append")
.trigger(once=True)
.start(delta_location))
await_stream_initialization(write_stream)