cancel
Showing results for 
Search instead for 
Did you mean: 
Machine Learning
cancel
Showing results for 
Search instead for 
Did you mean: 

ingest a .csv file with spaces in column names using Delta Live into a streaming table

vaver_3
New Contributor III

How do I ingest a .csv file with spaces in column names using Delta Live into a streaming table? All of the fields should be read using the default behavior .csv files for DLT autoloader - as strings.

Running the pipeline gives me an error about invalid characters in the column names of my schema. ("Found invalid character(s) among " ,;{}()\n\t=" in the column names of your

schema.") However, adding column mapping as a table property (as recommended in the full error comment) then gives me the error "com.databricks.sql.transaction.tahoe.ColumnMappingUnsupportedException:

Schema change is detected:" and lets me know "Schema changes are not allowed during the change of column mapping mode."

I've even tried setting the schema both in the table info and when reading the .csv.

df = spark.read.format('csv').options(header='true').load(path_to_load)
tbl_schema = df.schema.add("_rescued_data","string",True)
@dlt.table(
  comment="comment",
  schema=tbl_schema, 
  table_properties={
    'delta.minReaderVersion' : '2', 
    'delta.minWriterVersion' : '5', 
    'delta.columnMapping.mode' : 'name'
  }
)
def BB_EDIP_raw():          
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .options(header='true')
      # .option("inferSchema", "true")
      .schema(tbl_schema)
      .load(path_to_load) 
  )

I still get the same error - that there is schema change from the old schema of just "root" to the new schema of root/all the fields (see below - list of fields shortened):

com.databricks.sql.transaction.tahoe.ColumnMappingUnsupportedException:

Schema change is detected:

old schema:

root

new schema:

root

-- UniqueID: string (nullable = true)

-- FirstName: string (nullable = true)

-- MiddleName: string (nullable = true)

-- LastName: string (nullable = true)

-- HOME_BUSINESS: string (nullable = true)

-- BUSINESS_OWNER1: string (nullable = true)

-- Not in use: string (nullable = true)

-- EDUC_MODEL1: string (nullable = true)

-- Political Affiliation: string (nullable = true)

-- Working Couples Dual Income: string (nullable = true)

-- Online Score: string (nullable = true)

-- _rescued_data: string (nullable = true)

Schema changes are not allowed during the change of column mapping mode.

So, how do I ingest a .csv file with spaces in column names using Delta Live into a streaming table? Is it possible? Should I be trying a different method? These files are provided to us by a vendor, so I would like to not have to pre-process them just to get the raw/bronze layer loaded. Thanks!

For reference, here is the first error about spaces in column names:

org.apache.spark.sql.AnalysisException:

Found invalid character(s) among " ,;{}()\n\t=" in the column names of your

schema.

Please enable column mapping by setting table property 'delta.columnMapping.mode' to 'name'.

For more details, refer to https://docs.databricks.com/delta/delta-column-mapping.html

Or you can use alias to rename it.

org.apache.spark.sql.AnalysisException: Column name "Not in use" contains invalid character(s). Please use alias to rename it.

1 ACCEPTED SOLUTION

Accepted Solutions

vaver_3
New Contributor III

After additional googling on "withColumnRenamed", I was able to replace all spaces in column names with "_" all at once by using select and alias instead:

@dlt.view(
  comment=""
)
def vw_raw():          
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .options(header='true')
      .option("inferSchema", "true")
      .load(path_to_load) 
  )
 
@dlt.table(
  comment=""
)
def table_raw():          
  return (
    dlt.readStream("vw_raw")
      .select([col(c).alias(c.replace(" ", "_")) for c in dlt.readStream("vw_raw").columns]) 
  )

It also works using "cloudFiles.inferColumnTypes" = "true" and "cloudFiles.schemaHints" in the view definition.

View solution in original post

1 REPLY 1

vaver_3
New Contributor III

After additional googling on "withColumnRenamed", I was able to replace all spaces in column names with "_" all at once by using select and alias instead:

@dlt.view(
  comment=""
)
def vw_raw():          
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .options(header='true')
      .option("inferSchema", "true")
      .load(path_to_load) 
  )
 
@dlt.table(
  comment=""
)
def table_raw():          
  return (
    dlt.readStream("vw_raw")
      .select([col(c).alias(c.replace(" ", "_")) for c in dlt.readStream("vw_raw").columns]) 
  )

It also works using "cloudFiles.inferColumnTypes" = "true" and "cloudFiles.schemaHints" in the view definition.

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.