DLT Streaming Schema and Select

ggsmith
Contributor

I am reading JSON files written to adls from Kafka using dlt and spark.readStream to create a streaming table for my raw ingest data. My schema is two arrays at the top level

NewRecord array, OldRecord array. 

I pass the schema and I run a select on NewFecord.* so I get only the fields in the new record array. The problem is in my streaming table, it returns a null NewRecord and OldRecord column and then all the fields in NewRecord. 

how can I only have the newRecord fields in my table?

ggsmith
Contributor

Edit: Adding code for clarity.

# Top-level struct [OldRecord, NewRecord]
schema = StructType([
    StructField("NewRecord", StructType([...
          ]),
          "OldRecord", StructType([....
          ])
    )

# streaming query
@dlt.table(
    name="newrecord_raw",
    table_properties={"quality": "bronze"},
    temporary=False,
)
def create_table():
    query = (
        spark.readStream.format("cloudFiles")
        .schema(schema)
        .option("cloudFiles.format", "json")
        .option("checkpointLocation", "/Volumes/dev/streaming/")
        .load(sink_dir)
        .select("NewRecord.*")
        .withColumn("load_dt", to_timestamp(current_timestamp()))
    )
    return query

ggsmith
Contributor

I did a full refresh from the delta tables pipeline and that fixed it. I guess it was remembering the first run where I just had the top level arrays as two columns in the table. 

View solution in original post