Options
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
01-29-2025 11:00 PM
Alternatively, you can try using the INSERT INTO statement directly:
def load_snapshot_tables(source_system_name, source_schema_name, table_name, spark_schema, select_expression):
snapshot_load_df = (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.inferColumnTypes", False)
.option("cloudFiles.includeExistingFiles", True)
.option("pathGlobFilter", "*.json.gz")
.schema(spark_schema)
.load(f"abfss://YYY@{adl_name}.dfs.core.windows.net/Snapshot/{source_system_name}/{table_name}")
.selectExpr(
"CAST(concat(substring(_metadata.file_name, -20,4),'-',substring(_metadata.file_name, -16,2),'-',substring(_metadata.file_name, -14,2)) AS timestamp) AS XXX_Snapshot_Date",
*select_expression,
"_metadata.file_name AS XXX_File_Name",
"_metadata AS XXX_File_Metadata"
)
)
snapshot_load_df.writeStream \
.format("delta") \
.option("mergeSchema", "true") \
.option("delta.enableTypeWidening", "true") \
.outputMode("append") \
.queryName(f"insert_into_{table_name}") \
.toTable(f"{table_name}")