Hi @abhijeet_more
It seems impossible to declare an identity column while dynamically defining other columns. This CTAS and identity columns doc mentions that all columns should be defined at the table creation. While the doc is not for DLT streaming tables, I guess the same rule applies to DLT tables.
As an alternative, how about generating column definition DDL automatically? If you have sample CSV files in advance, you can run a simple code like below to generate a DDL statement including the id column and other columns:
df = spark.read.format('csv')\
.option("Header", True)\
.option("multiLine","true")\
.load("/Volumes/dev/data/raw/demo/*.csv")
schema_str = "id BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1000 INCREMENT BY 1),\n"\
+ ",\n".join([c.name + ' ' + c.dataType.typeName() for c in df.schema])
print(schema_str)
It can generate something like this, so that you can copy & paste it to the actual DLT table definition:
# From the following csv file
"primary_key","description","c1","c2"
"p1","desc1","c1-1","c2-1"
"p2","desc2","c1-2","c2-2"
"p3","desc3","c1-3","c2-3"
# Generated output
id BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1000 INCREMENT BY 1),
primary_key string,
description string,
c1 string,
c2 string
Or, I'd just stop using id columns at a raw table. Having an id column adds concurrency and performance limitations. Just ingesting the data as is, with the source filenames and ingestion timestamps can provide better scalability while providing decent traceability. Please check the example below. Generated ID columns can be added at a downstream table if needed in later stages where schema is clearer.
import pyspark.sql.functions as F
@dlt.table()
def demo_raw():
df = spark.readStream.format('cloudFiles')\
.option('cloudFiles.format', 'csv')\
.option('Header', True)\
.option("multiLine","true")\
.load("/Volumes/dev/data/raw/demo/*.csv")\
.withColumn("filename", F.col("_metadata.file_path"))\
.withColumn("timestamp", F.current_timestamp())
return df
Hope this helps!