- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
03-03-2025 11:30 AM
I got a csv file which I am looking to read into a streaming table. I always want to add a generated identity column as surrogate key. I found few blogs which says we can achieve this by explicit mention of schema. However, I have around 40 odd fields in my csv file and listing those as schema is very time consuming. How can I generate surrogate key with generated big int column but avoid explicit mention of the entire schema.
This is what i have and it works really well as long as all the field are mentioned. But I want to avoid that part.
Accepted Solutions
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
03-03-2025 06:44 PM
It seems impossible to declare an identity column while dynamically defining other columns. This CTAS and identity columns doc mentions that all columns should be defined at the table creation. While the doc is not for DLT streaming tables, I guess the same rule applies to DLT tables.
As an alternative, how about generating column definition DDL automatically? If you have sample CSV files in advance, you can run a simple code like below to generate a DDL statement including the id column and other columns:
df = spark.read.format('csv')\
.option("Header", True)\
.option("multiLine","true")\
.load("/Volumes/dev/data/raw/demo/*.csv")
schema_str = "id BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1000 INCREMENT BY 1),\n"\
+ ",\n".join([c.name + ' ' + c.dataType.typeName() for c in df.schema])
print(schema_str)
It can generate something like this, so that you can copy & paste it to the actual DLT table definition:
# From the following csv file
"primary_key","description","c1","c2"
"p1","desc1","c1-1","c2-1"
"p2","desc2","c1-2","c2-2"
"p3","desc3","c1-3","c2-3"
# Generated output
id BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1000 INCREMENT BY 1),
primary_key string,
description string,
c1 string,
c2 string
Or, I'd just stop using id columns at a raw table. Having an id column adds concurrency and performance limitations. Just ingesting the data as is, with the source filenames and ingestion timestamps can provide better scalability while providing decent traceability. Please check the example below. Generated ID columns can be added at a downstream table if needed in later stages where schema is clearer.
import pyspark.sql.functions as F
@dlt.table()
def demo_raw():
df = spark.readStream.format('cloudFiles')\
.option('cloudFiles.format', 'csv')\
.option('Header', True)\
.option("multiLine","true")\
.load("/Volumes/dev/data/raw/demo/*.csv")\
.withColumn("filename", F.col("_metadata.file_path"))\
.withColumn("timestamp", F.current_timestamp())
return df
Hope this helps!
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
03-03-2025 06:44 PM
It seems impossible to declare an identity column while dynamically defining other columns. This CTAS and identity columns doc mentions that all columns should be defined at the table creation. While the doc is not for DLT streaming tables, I guess the same rule applies to DLT tables.
As an alternative, how about generating column definition DDL automatically? If you have sample CSV files in advance, you can run a simple code like below to generate a DDL statement including the id column and other columns:
df = spark.read.format('csv')\
.option("Header", True)\
.option("multiLine","true")\
.load("/Volumes/dev/data/raw/demo/*.csv")
schema_str = "id BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1000 INCREMENT BY 1),\n"\
+ ",\n".join([c.name + ' ' + c.dataType.typeName() for c in df.schema])
print(schema_str)
It can generate something like this, so that you can copy & paste it to the actual DLT table definition:
# From the following csv file
"primary_key","description","c1","c2"
"p1","desc1","c1-1","c2-1"
"p2","desc2","c1-2","c2-2"
"p3","desc3","c1-3","c2-3"
# Generated output
id BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1000 INCREMENT BY 1),
primary_key string,
description string,
c1 string,
c2 string
Or, I'd just stop using id columns at a raw table. Having an id column adds concurrency and performance limitations. Just ingesting the data as is, with the source filenames and ingestion timestamps can provide better scalability while providing decent traceability. Please check the example below. Generated ID columns can be added at a downstream table if needed in later stages where schema is clearer.
import pyspark.sql.functions as F
@dlt.table()
def demo_raw():
df = spark.readStream.format('cloudFiles')\
.option('cloudFiles.format', 'csv')\
.option('Header', True)\
.option("multiLine","true")\
.load("/Volumes/dev/data/raw/demo/*.csv")\
.withColumn("filename", F.col("_metadata.file_path"))\
.withColumn("timestamp", F.current_timestamp())
return df
Hope this helps!
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
03-04-2025 01:57 PM
Thank you @koji_kawamura .
This was helpful.

