cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

DLT pipline with generated identity column

abhijeet_more
New Contributor II

I got a csv file which I am looking to read into a streaming table. I always want to add a generated identity column as surrogate key. I found few blogs which  says we can achieve this by explicit mention of schema. However, I have around 40 odd fields in my csv file and listing those as schema is very time consuming. How can I generate surrogate key with generated big int column but avoid explicit mention of the entire schema. 

This is what i have and it works really well as long as all the field are mentioned. But I want to avoid that part.

@dlt.table(
  comment="Raw data ",
   schema="""
       id BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1000 INCREMENT BY 1),
       primary_key STRING,
       description STRING
     """)
 
def demo():
  df = spark.readStream.format('cloudFiles')\
        .option('cloudFiles.format', 'csv')\
        .option('Header', True)\
        .option("multiLine","true")\
        .load("/Volumes/dev/data/raw/demo/*.csv")

  return df



1 ACCEPTED SOLUTION

Accepted Solutions

koji_kawamura
Databricks Employee
Databricks Employee

Hi @abhijeet_more 

It seems impossible to declare an identity column while dynamically defining other columns. This CTAS and identity columns doc mentions that all columns should be defined at the table creation. While the doc is not for DLT streaming tables, I guess the same rule applies to DLT tables.

As an alternative, how about generating column definition DDL automatically? If you have sample CSV files in advance, you can run a simple code like below to generate a DDL statement including the id column and other columns:

df = spark.read.format('csv')\
.option("Header", True)\
.option("multiLine","true")\
.load("/Volumes/dev/data/raw/demo/*.csv")

schema_str = "id BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1000 INCREMENT BY 1),\n"\
+ ",\n".join([c.name + ' ' + c.dataType.typeName() for c in df.schema])
print(schema_str)

 It can generate something like this, so that you can copy & paste it to the actual DLT table definition:

# From the following csv file
"primary_key","description","c1","c2"
"p1","desc1","c1-1","c2-1"
"p2","desc2","c1-2","c2-2"
"p3","desc3","c1-3","c2-3"

# Generated output
id BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1000 INCREMENT BY 1),
primary_key string,
description string,
c1 string,
c2 string

Or, I'd just stop using id columns at a raw table. Having an id column adds concurrency and performance limitations. Just ingesting the data as is, with the source filenames and ingestion timestamps can provide better scalability while providing decent traceability. Please check the example below. Generated ID columns can be added at a downstream table if needed in later stages where schema is clearer.

import pyspark.sql.functions as F

@dlt.table()
def demo_raw():
df = spark.readStream.format('cloudFiles')\
.option('cloudFiles.format', 'csv')\
.option('Header', True)\
.option("multiLine","true")\
.load("/Volumes/dev/data/raw/demo/*.csv")\
.withColumn("filename", F.col("_metadata.file_path"))\
.withColumn("timestamp", F.current_timestamp())

return df

 Hope this helps!

View solution in original post

2 REPLIES 2

koji_kawamura
Databricks Employee
Databricks Employee

Hi @abhijeet_more 

It seems impossible to declare an identity column while dynamically defining other columns. This CTAS and identity columns doc mentions that all columns should be defined at the table creation. While the doc is not for DLT streaming tables, I guess the same rule applies to DLT tables.

As an alternative, how about generating column definition DDL automatically? If you have sample CSV files in advance, you can run a simple code like below to generate a DDL statement including the id column and other columns:

df = spark.read.format('csv')\
.option("Header", True)\
.option("multiLine","true")\
.load("/Volumes/dev/data/raw/demo/*.csv")

schema_str = "id BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1000 INCREMENT BY 1),\n"\
+ ",\n".join([c.name + ' ' + c.dataType.typeName() for c in df.schema])
print(schema_str)

 It can generate something like this, so that you can copy & paste it to the actual DLT table definition:

# From the following csv file
"primary_key","description","c1","c2"
"p1","desc1","c1-1","c2-1"
"p2","desc2","c1-2","c2-2"
"p3","desc3","c1-3","c2-3"

# Generated output
id BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1000 INCREMENT BY 1),
primary_key string,
description string,
c1 string,
c2 string

Or, I'd just stop using id columns at a raw table. Having an id column adds concurrency and performance limitations. Just ingesting the data as is, with the source filenames and ingestion timestamps can provide better scalability while providing decent traceability. Please check the example below. Generated ID columns can be added at a downstream table if needed in later stages where schema is clearer.

import pyspark.sql.functions as F

@dlt.table()
def demo_raw():
df = spark.readStream.format('cloudFiles')\
.option('cloudFiles.format', 'csv')\
.option('Header', True)\
.option("multiLine","true")\
.load("/Volumes/dev/data/raw/demo/*.csv")\
.withColumn("filename", F.col("_metadata.file_path"))\
.withColumn("timestamp", F.current_timestamp())

return df

 Hope this helps!

abhijeet_more
New Contributor II

Thank you @koji_kawamura .
This was helpful.

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now