Hello @yit ,
You can’t. An “empty struct” is treated as a fixed struct with zero fields, so AutoLoader will not expand it later. The NOTE in the screenshot applies to JSON just as much as Parquet/Avro/CSV.
If your goal is “discover whatever shows up under payload and keep adding new sub-fields,” simply don’t specify a hint for payload. AutoLoader will infer and evolve nested fields as and when they appear.
Example code(You can run it anywhere):
base = "/tmp/repro_empty_struct_json/input"
out = "/tmp/repro_empty_struct_json/out_empty_struct"
chk = "/tmp/repro_empty_struct_json/chk"
schema = "/tmp/repro_empty_struct_json/schema"
# cleanup
for p in [base, out, chk,schema]:
_ = dbutils.fs.rm(p, True)
# two files: second file introduces a new nested subfield "bar"
dbutils.fs.mkdirs(base)
dbutils.fs.put(f"{base}/file1.json", """{"id":1,"payload":{"foo":"x"}}""", True)
dbutils.fs.put(f"{base}/file2.json", """{"id":2,"payload":{"foo":"y","bar":123}}""", True)
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true") # writer evolution
### Run the below code###
dfB = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", schema)
.option("cloudFiles.inferColumnTypes", "true")
.option("cloudFiles.schemaEvolutionMode", "addNewColumns")
# no schemaHints for payload
.load(base))
qB = (dfB.writeStream
.format("delta")
.option("checkpointLocation", chk)
.trigger(availableNow=True)
.start(out))
qB.awaitTermination()
spark.read.format("delta").load(out).printSchema()
print("C) Data:")
display(spark.read.format("delta").load(out))
##### Add a new file with more subfields####
dbutils.fs.put(f"{base}/file3.json",
"""{"id":2,"payload":{"foo":"y","bar":123,"abc":{"foo1":"x"}}}""",
True)
#### Re-run the above code again ###
You will see that the job will fail for the first time, and once you retry ,it will ivolve the schema automatically and provide the expected schema and result
Please do let me know if you have any further questions. Thanks!