koji_kawamura
Databricks Employee
Databricks Employee

Hi @abhijeet_more 

It seems impossible to declare an identity column while dynamically defining other columns. This CTAS and identity columns doc mentions that all columns should be defined at the table creation. While the doc is not for DLT streaming tables, I guess the same rule applies to DLT tables.

As an alternative, how about generating column definition DDL automatically? If you have sample CSV files in advance, you can run a simple code like below to generate a DDL statement including the id column and other columns:

df = spark.read.format('csv')\
.option("Header", True)\
.option("multiLine","true")\
.load("/Volumes/dev/data/raw/demo/*.csv")

schema_str = "id BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1000 INCREMENT BY 1),\n"\
+ ",\n".join([c.name + ' ' + c.dataType.typeName() for c in df.schema])
print(schema_str)

 It can generate something like this, so that you can copy & paste it to the actual DLT table definition:

# From the following csv file
"primary_key","description","c1","c2"
"p1","desc1","c1-1","c2-1"
"p2","desc2","c1-2","c2-2"
"p3","desc3","c1-3","c2-3"

# Generated output
id BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1000 INCREMENT BY 1),
primary_key string,
description string,
c1 string,
c2 string

Or, I'd just stop using id columns at a raw table. Having an id column adds concurrency and performance limitations. Just ingesting the data as is, with the source filenames and ingestion timestamps can provide better scalability while providing decent traceability. Please check the example below. Generated ID columns can be added at a downstream table if needed in later stages where schema is clearer.

import pyspark.sql.functions as F

@dlt.table()
def demo_raw():
df = spark.readStream.format('cloudFiles')\
.option('cloudFiles.format', 'csv')\
.option('Header', True)\
.option("multiLine","true")\
.load("/Volumes/dev/data/raw/demo/*.csv")\
.withColumn("filename", F.col("_metadata.file_path"))\
.withColumn("timestamp", F.current_timestamp())

return df

 Hope this helps!

View solution in original post