Sidhant07
Databricks Employee
Databricks Employee

Alternatively, you can try using the INSERT INTO statement directly:

def load_snapshot_tables(source_system_name, source_schema_name, table_name, spark_schema, select_expression):
    snapshot_load_df = (
        spark.readStream
       .format("cloudFiles")
       .option("cloudFiles.format", "json")
       .option("cloudFiles.inferColumnTypes", False)
       .option("cloudFiles.includeExistingFiles", True)
       .option("pathGlobFilter", "*.json.gz")
       .schema(spark_schema)
       .load(f"abfss://YYY@{adl_name}.dfs.core.windows.net/Snapshot/{source_system_name}/{table_name}")
       .selectExpr(
            "CAST(concat(substring(_metadata.file_name, -20,4),'-',substring(_metadata.file_name, -16,2),'-',substring(_metadata.file_name, -14,2)) AS timestamp) AS XXX_Snapshot_Date",
            *select_expression,
            "_metadata.file_name AS XXX_File_Name",
            "_metadata AS XXX_File_Metadata"
        )
    )

    snapshot_load_df.writeStream \
       .format("delta") \
       .option("mergeSchema", "true") \
       .option("delta.enableTypeWidening", "true") \
       .outputMode("append") \
       .queryName(f"insert_into_{table_name}") \
       .toTable(f"{table_name}")