cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

How to use autoloader with csv containing spaces in attribute names?

Dave_Nithio
Contributor

I am attempting to use autoloader to add a number of csv files to a delta table. The underlying csv files have spaces in the attribute names though (i.e. 'Account Number' instead of 'AccountNumber'). When I run my autoload, I get the following error message:

Found invalid character(s) among ' ,;{}()\n\t=' in the column names of your schema.

My autoload methodology is:

(spark.readStream
                  .format("cloudFiles")
                  .option("cloudFiles.format", "csv")
                  .option("cloudFiles.schemaLocation", checkpoint_directory)
                  .option("header", "True")
                  .load(data_source)
                  .writeStream
                  .format("delta")
                  .trigger(once=True)
                  .option("checkpointLocation", checkpoint_directory)
                  .option("mergeSchema", "true")
                  .table(table_name))

The proposed solution via the error message is to change the column mapping mode to 'name' mapping in the table properties to allow for spaces:

ALTER TABLE <table_name> SET TBLPROPERTIES (
 'delta.minReaderVersion' = '2',
 'delta.minWriterVersion' = '5',
 'delta.columnMapping.mode' = 'name'
)

The issue I am having is that the table is not created until autoloader runs, so I cannot alter the table properties. So my question is in two parts:

A. Can I create a blank delta table so that I can alter the table properties prior to autoloading running? For example, I could run:

CREATE OR REPLACE TABLE IF NOT EXISTS table_name

which would then allow me to alter the table, then run autoloader so that I can ingest csvs with spaces in the attribute names.

B. Is there an alternate method (potentially aliasing during the spark.readStream) that would allow me to ingest these csvs with spaces in the attribute names without needing to have an existing table with adjusted table properties?

1 ACCEPTED SOLUTION

Accepted Solutions

Dave_Nithio
Contributor

@Hubert Dudek​ thanks for your response! I was able to use what you proposed above to generate the schema. The issue is that the schema sets all attributes to STRING values and renames them numerically ('_c0', '_c1', etc.). Although this allows us to then write stream, it does not accurately portray the schema or the attribute names. If I instead use:

csvFile = spark.read.options(header = 'True',inferSchema='True').csv(data_source)

I can then alter the attribute names with:

json_str = json.dumps(schema_json)
json_str = json_str.replace(" ", "")
schema_json = json.loads(json_str)

We can then create the ddl and pass it as the schema to the readStream.

View solution in original post

3 REPLIES 3

Dave_Nithio
Contributor

Also attempted to create a blank pyspark dataframe and write as a delta table:

df = spark.createDataFrame([], StructType([]))
df.write.format("delta").saveAsTable(table_name)

But this fails with:

Data used in creating the Delta table doesn't have any columns.

Another option could be to create a test column, alter the table, stream the data then remove the test column.

Hubert-Dudek
Esteemed Contributor III
  • Create, as you said table registered in metastore, but for that, you need to define the schema
  • write stream directly into that table .writeStream.table(table_name)

To get the schema, just read your CSV as not stream and take it from dataframe

csvFile = spark.read.csv("yourfile")
csvFile .createOrReplaceTempView("csvFile ")
schema_json = spark.sql("SELECT * FROM csvFile ").schema.json()
ddl = spark.sparkContext._jvm.org.apache.spark.sql.types.DataType.fromJson(schema_json).toDDL()
print(ddl)

Dave_Nithio
Contributor

@Hubert Dudek​ thanks for your response! I was able to use what you proposed above to generate the schema. The issue is that the schema sets all attributes to STRING values and renames them numerically ('_c0', '_c1', etc.). Although this allows us to then write stream, it does not accurately portray the schema or the attribute names. If I instead use:

csvFile = spark.read.options(header = 'True',inferSchema='True').csv(data_source)

I can then alter the attribute names with:

json_str = json.dumps(schema_json)
json_str = json_str.replace(" ", "")
schema_json = json.loads(json_str)

We can then create the ddl and pass it as the schema to the readStream.

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.