cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

How to use autoloader with csv containing spaces in attribute names?

Dave_Nithio
Contributor

I am attempting to use autoloader to add a number of csv files to a delta table. The underlying csv files have spaces in the attribute names though (i.e. 'Account Number' instead of 'AccountNumber'). When I run my autoload, I get the following error message:

Found invalid character(s) among ' ,;{}()\n\t=' in the column names of your schema.

My autoload methodology is:

(spark.readStream
                  .format("cloudFiles")
                  .option("cloudFiles.format", "csv")
                  .option("cloudFiles.schemaLocation", checkpoint_directory)
                  .option("header", "True")
                  .load(data_source)
                  .writeStream
                  .format("delta")
                  .trigger(once=True)
                  .option("checkpointLocation", checkpoint_directory)
                  .option("mergeSchema", "true")
                  .table(table_name))

The proposed solution via the error message is to change the column mapping mode to 'name' mapping in the table properties to allow for spaces:

ALTER TABLE <table_name> SET TBLPROPERTIES (
 'delta.minReaderVersion' = '2',
 'delta.minWriterVersion' = '5',
 'delta.columnMapping.mode' = 'name'
)

The issue I am having is that the table is not created until autoloader runs, so I cannot alter the table properties. So my question is in two parts:

A. Can I create a blank delta table so that I can alter the table properties prior to autoloading running? For example, I could run:

CREATE OR REPLACE TABLE IF NOT EXISTS table_name

which would then allow me to alter the table, then run autoloader so that I can ingest csvs with spaces in the attribute names.

B. Is there an alternate method (potentially aliasing during the spark.readStream) that would allow me to ingest these csvs with spaces in the attribute names without needing to have an existing table with adjusted table properties?

1 ACCEPTED SOLUTION

Accepted Solutions

Dave_Nithio
Contributor

@Hubert Dudekโ€‹ thanks for your response! I was able to use what you proposed above to generate the schema. The issue is that the schema sets all attributes to STRING values and renames them numerically ('_c0', '_c1', etc.). Although this allows us to then write stream, it does not accurately portray the schema or the attribute names. If I instead use:

csvFile = spark.read.options(header = 'True',inferSchema='True').csv(data_source)

I can then alter the attribute names with:

json_str = json.dumps(schema_json)
json_str = json_str.replace(" ", "")
schema_json = json.loads(json_str)

We can then create the ddl and pass it as the schema to the readStream.

View solution in original post

4 REPLIES 4

Dave_Nithio
Contributor

Also attempted to create a blank pyspark dataframe and write as a delta table:

df = spark.createDataFrame([], StructType([]))
df.write.format("delta").saveAsTable(table_name)

But this fails with:

Data used in creating the Delta table doesn't have any columns.

Another option could be to create a test column, alter the table, stream the data then remove the test column.

Hubert-Dudek
Esteemed Contributor III
  • Create, as you said table registered in metastore, but for that, you need to define the schema
  • write stream directly into that table .writeStream.table(table_name)

To get the schema, just read your CSV as not stream and take it from dataframe

csvFile = spark.read.csv("yourfile")
csvFile .createOrReplaceTempView("csvFile ")
schema_json = spark.sql("SELECT * FROM csvFile ").schema.json()
ddl = spark.sparkContext._jvm.org.apache.spark.sql.types.DataType.fromJson(schema_json).toDDL()
print(ddl)

Dave_Nithio
Contributor

@Hubert Dudekโ€‹ thanks for your response! I was able to use what you proposed above to generate the schema. The issue is that the schema sets all attributes to STRING values and renames them numerically ('_c0', '_c1', etc.). Although this allows us to then write stream, it does not accurately portray the schema or the attribute names. If I instead use:

csvFile = spark.read.options(header = 'True',inferSchema='True').csv(data_source)

I can then alter the attribute names with:

json_str = json.dumps(schema_json)
json_str = json_str.replace(" ", "")
schema_json = json.loads(json_str)

We can then create the ddl and pass it as the schema to the readStream.

epk33
New Contributor II

can you give me the step by step code to achieve this end to end?

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group