Hello @yit ,
You canโt. An โempty structโ is treated as a fixed struct with zero fields, so AutoLoader will not expand it later. The NOTE in the screenshot applies to JSON just as much as Parquet/Avro/CSV.
If your goal is โdiscover whatever shows up under payload and keep adding new sub-fields,โ simply donโt specify a hint for payload. AutoLoader will infer and evolve nested fields as and when they appear.
Example code(You can run it anywhere):
base = "/tmp/repro_empty_struct_json/input"
out = "/tmp/repro_empty_struct_json/out_empty_struct"
chk = "/tmp/repro_empty_struct_json/chk"
schema = "/tmp/repro_empty_struct_json/schema"
# cleanup
for p in [base, out, chk,schema]:
_ = dbutils.fs.rm(p, True)
# two files: second file introduces a new nested subfield "bar"
dbutils.fs.mkdirs(base)
dbutils.fs.put(f"{base}/file1.json", """{"id":1,"payload":{"foo":"x"}}""", True)
dbutils.fs.put(f"{base}/file2.json", """{"id":2,"payload":{"foo":"y","bar":123}}""", True)
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true") # writer evolution
### Run the below code###
dfB = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", schema)
.option("cloudFiles.inferColumnTypes", "true")
.option("cloudFiles.schemaEvolutionMode", "addNewColumns")
# no schemaHints for payload
.load(base))
qB = (dfB.writeStream
.format("delta")
.option("checkpointLocation", chk)
.trigger(availableNow=True)
.start(out))
qB.awaitTermination()
spark.read.format("delta").load(out).printSchema()
print("C) Data:")
display(spark.read.format("delta").load(out))
##### Add a new file with more subfields####
dbutils.fs.put(f"{base}/file3.json",
"""{"id":2,"payload":{"foo":"y","bar":123,"abc":{"foo1":"x"}}}""",
True)
#### Re-run the above code again ###
You will see that the job will fail for the first time, and once you retry ,it will ivolve the schema automatically and provide the expected schema and result
Please do let me know if you have any further questions. Thanks!