10-13-2022 07:22 AM
I am attempting to use autoloader to add a number of csv files to a delta table. The underlying csv files have spaces in the attribute names though (i.e. 'Account Number' instead of 'AccountNumber'). When I run my autoload, I get the following error message:
Found invalid character(s) among ' ,;{}()\n\t=' in the column names of your schema.
My autoload methodology is:
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("cloudFiles.schemaLocation", checkpoint_directory)
.option("header", "True")
.load(data_source)
.writeStream
.format("delta")
.trigger(once=True)
.option("checkpointLocation", checkpoint_directory)
.option("mergeSchema", "true")
.table(table_name))
The proposed solution via the error message is to change the column mapping mode to 'name' mapping in the table properties to allow for spaces:
ALTER TABLE <table_name> SET TBLPROPERTIES (
'delta.minReaderVersion' = '2',
'delta.minWriterVersion' = '5',
'delta.columnMapping.mode' = 'name'
)
The issue I am having is that the table is not created until autoloader runs, so I cannot alter the table properties. So my question is in two parts:
A. Can I create a blank delta table so that I can alter the table properties prior to autoloading running? For example, I could run:
CREATE OR REPLACE TABLE IF NOT EXISTS table_name
which would then allow me to alter the table, then run autoloader so that I can ingest csvs with spaces in the attribute names.
B. Is there an alternate method (potentially aliasing during the spark.readStream) that would allow me to ingest these csvs with spaces in the attribute names without needing to have an existing table with adjusted table properties?
11-01-2022 12:45 PM
@Hubert Dudek thanks for your response! I was able to use what you proposed above to generate the schema. The issue is that the schema sets all attributes to STRING values and renames them numerically ('_c0', '_c1', etc.). Although this allows us to then write stream, it does not accurately portray the schema or the attribute names. If I instead use:
csvFile = spark.read.options(header = 'True',inferSchema='True').csv(data_source)
I can then alter the attribute names with:
json_str = json.dumps(schema_json)
json_str = json_str.replace(" ", "")
schema_json = json.loads(json_str)
We can then create the ddl and pass it as the schema to the readStream.
10-13-2022 10:38 AM
Also attempted to create a blank pyspark dataframe and write as a delta table:
df = spark.createDataFrame([], StructType([]))
df.write.format("delta").saveAsTable(table_name)
But this fails with:
Data used in creating the Delta table doesn't have any columns.
Another option could be to create a test column, alter the table, stream the data then remove the test column.
10-18-2022 06:29 AM
To get the schema, just read your CSV as not stream and take it from dataframe
csvFile = spark.read.csv("yourfile")
csvFile .createOrReplaceTempView("csvFile ")
schema_json = spark.sql("SELECT * FROM csvFile ").schema.json()
ddl = spark.sparkContext._jvm.org.apache.spark.sql.types.DataType.fromJson(schema_json).toDDL()
print(ddl)
11-01-2022 12:45 PM
@Hubert Dudek thanks for your response! I was able to use what you proposed above to generate the schema. The issue is that the schema sets all attributes to STRING values and renames them numerically ('_c0', '_c1', etc.). Although this allows us to then write stream, it does not accurately portray the schema or the attribute names. If I instead use:
csvFile = spark.read.options(header = 'True',inferSchema='True').csv(data_source)
I can then alter the attribute names with:
json_str = json.dumps(schema_json)
json_str = json_str.replace(" ", "")
schema_json = json.loads(json_str)
We can then create the ddl and pass it as the schema to the readStream.
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group