cancel
Showing results for 
Search instead for 
Did you mean: 
Machine Learning
Dive into the world of machine learning on the Databricks platform. Explore discussions on algorithms, model training, deployment, and more. Connect with ML enthusiasts and experts.
cancel
Showing results for 
Search instead for 
Did you mean: 

ingest a .csv file with spaces in column names using Delta Live into a streaming table

vaver_3
New Contributor III

How do I ingest a .csv file with spaces in column names using Delta Live into a streaming table? All of the fields should be read using the default behavior .csv files for DLT autoloader - as strings.

Running the pipeline gives me an error about invalid characters in the column names of my schema. ("Found invalid character(s) among " ,;{}()\n\t=" in the column names of your

schema.") However, adding column mapping as a table property (as recommended in the full error comment) then gives me the error "com.databricks.sql.transaction.tahoe.ColumnMappingUnsupportedException:

Schema change is detected:" and lets me know "Schema changes are not allowed during the change of column mapping mode."

I've even tried setting the schema both in the table info and when reading the .csv.

df = spark.read.format('csv').options(header='true').load(path_to_load)
tbl_schema = df.schema.add("_rescued_data","string",True)
@dlt.table(
  comment="comment",
  schema=tbl_schema, 
  table_properties={
    'delta.minReaderVersion' : '2', 
    'delta.minWriterVersion' : '5', 
    'delta.columnMapping.mode' : 'name'
  }
)
def BB_EDIP_raw():          
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .options(header='true')
      # .option("inferSchema", "true")
      .schema(tbl_schema)
      .load(path_to_load) 
  )

I still get the same error - that there is schema change from the old schema of just "root" to the new schema of root/all the fields (see below - list of fields shortened):

com.databricks.sql.transaction.tahoe.ColumnMappingUnsupportedException:

Schema change is detected:

old schema:

root

new schema:

root

-- UniqueID: string (nullable = true)

-- FirstName: string (nullable = true)

-- MiddleName: string (nullable = true)

-- LastName: string (nullable = true)

-- HOME_BUSINESS: string (nullable = true)

-- BUSINESS_OWNER1: string (nullable = true)

-- Not in use: string (nullable = true)

-- EDUC_MODEL1: string (nullable = true)

-- Political Affiliation: string (nullable = true)

-- Working Couples Dual Income: string (nullable = true)

-- Online Score: string (nullable = true)

-- _rescued_data: string (nullable = true)

Schema changes are not allowed during the change of column mapping mode.

So, how do I ingest a .csv file with spaces in column names using Delta Live into a streaming table? Is it possible? Should I be trying a different method? These files are provided to us by a vendor, so I would like to not have to pre-process them just to get the raw/bronze layer loaded. Thanks!

For reference, here is the first error about spaces in column names:

org.apache.spark.sql.AnalysisException:

Found invalid character(s) among " ,;{}()\n\t=" in the column names of your

schema.

Please enable column mapping by setting table property 'delta.columnMapping.mode' to 'name'.

For more details, refer to https://docs.databricks.com/delta/delta-column-mapping.html

Or you can use alias to rename it.

org.apache.spark.sql.AnalysisException: Column name "Not in use" contains invalid character(s). Please use alias to rename it.

1 ACCEPTED SOLUTION

Accepted Solutions

vaver_3
New Contributor III

After additional googling on "withColumnRenamed", I was able to replace all spaces in column names with "_" all at once by using select and alias instead:

@dlt.view(
  comment=""
)
def vw_raw():          
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .options(header='true')
      .option("inferSchema", "true")
      .load(path_to_load) 
  )
 
@dlt.table(
  comment=""
)
def table_raw():          
  return (
    dlt.readStream("vw_raw")
      .select([col(c).alias(c.replace(" ", "_")) for c in dlt.readStream("vw_raw").columns]) 
  )

It also works using "cloudFiles.inferColumnTypes" = "true" and "cloudFiles.schemaHints" in the view definition.

View solution in original post

1 REPLY 1

vaver_3
New Contributor III

After additional googling on "withColumnRenamed", I was able to replace all spaces in column names with "_" all at once by using select and alias instead:

@dlt.view(
  comment=""
)
def vw_raw():          
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .options(header='true')
      .option("inferSchema", "true")
      .load(path_to_load) 
  )
 
@dlt.table(
  comment=""
)
def table_raw():          
  return (
    dlt.readStream("vw_raw")
      .select([col(c).alias(c.replace(" ", "_")) for c in dlt.readStream("vw_raw").columns]) 
  )

It also works using "cloudFiles.inferColumnTypes" = "true" and "cloudFiles.schemaHints" in the view definition.

Join 100K+ Data Experts: Register Now & Grow with Us!

Excited to expand your horizons with us? Click here to Register and begin your journey to success!

Already a member? Login and join your local regional user group! If there isn’t one near you, fill out this form and we’ll create one for you to join!