- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
08-30-2024 06:50 AM
I am reading JSON files written to adls from Kafka using dlt and spark.readStream to create a streaming table for my raw ingest data. My schema is two arrays at the top level
NewRecord array, OldRecord array.
I pass the schema and I run a select on NewFecord.* so I get only the fields in the new record array. The problem is in my streaming table, it returns a null NewRecord and OldRecord column and then all the fields in NewRecord.
how can I only have the newRecord fields in my table?
- Labels:
-
Delta Lake
Accepted Solutions
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
08-30-2024 02:54 PM
I did a full refresh from the delta tables pipeline and that fixed it. I guess it was remembering the first run where I just had the top level arrays as two columns in the table.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
08-30-2024 07:44 AM
Edit: Adding code for clarity.
# Top-level struct [OldRecord, NewRecord]
schema = StructType([
StructField("NewRecord", StructType([...
]),
"OldRecord", StructType([....
])
)
# streaming query
@dlt.table(
name="newrecord_raw",
table_properties={"quality": "bronze"},
temporary=False,
)
def create_table():
query = (
spark.readStream.format("cloudFiles")
.schema(schema)
.option("cloudFiles.format", "json")
.option("checkpointLocation", "/Volumes/dev/streaming/")
.load(sink_dir)
.select("NewRecord.*")
.withColumn("load_dt", to_timestamp(current_timestamp()))
)
return query
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
08-30-2024 02:54 PM
I did a full refresh from the delta tables pipeline and that fixed it. I guess it was remembering the first run where I just had the top level arrays as two columns in the table.

