- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
10-13-2022 07:22 AM
I am attempting to use autoloader to add a number of csv files to a delta table. The underlying csv files have spaces in the attribute names though (i.e. 'Account Number' instead of 'AccountNumber'). When I run my autoload, I get the following error message:
Found invalid character(s) among ' ,;{}()\n\t=' in the column names of your schema.
My autoload methodology is:
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("cloudFiles.schemaLocation", checkpoint_directory)
.option("header", "True")
.load(data_source)
.writeStream
.format("delta")
.trigger(once=True)
.option("checkpointLocation", checkpoint_directory)
.option("mergeSchema", "true")
.table(table_name))
The proposed solution via the error message is to change the column mapping mode to 'name' mapping in the table properties to allow for spaces:
ALTER TABLE <table_name> SET TBLPROPERTIES (
'delta.minReaderVersion' = '2',
'delta.minWriterVersion' = '5',
'delta.columnMapping.mode' = 'name'
)
The issue I am having is that the table is not created until autoloader runs, so I cannot alter the table properties. So my question is in two parts:
A. Can I create a blank delta table so that I can alter the table properties prior to autoloading running? For example, I could run:
CREATE OR REPLACE TABLE IF NOT EXISTS table_name
which would then allow me to alter the table, then run autoloader so that I can ingest csvs with spaces in the attribute names.
B. Is there an alternate method (potentially aliasing during the spark.readStream) that would allow me to ingest these csvs with spaces in the attribute names without needing to have an existing table with adjusted table properties?
- Labels:
-
Autoloader
-
Error Message
Accepted Solutions
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
11-01-2022 12:45 PM
@Hubert Dudek thanks for your response! I was able to use what you proposed above to generate the schema. The issue is that the schema sets all attributes to STRING values and renames them numerically ('_c0', '_c1', etc.). Although this allows us to then write stream, it does not accurately portray the schema or the attribute names. If I instead use:
csvFile = spark.read.options(header = 'True',inferSchema='True').csv(data_source)
I can then alter the attribute names with:
json_str = json.dumps(schema_json)
json_str = json_str.replace(" ", "")
schema_json = json.loads(json_str)
We can then create the ddl and pass it as the schema to the readStream.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
10-13-2022 10:38 AM
Also attempted to create a blank pyspark dataframe and write as a delta table:
df = spark.createDataFrame([], StructType([]))
df.write.format("delta").saveAsTable(table_name)
But this fails with:
Data used in creating the Delta table doesn't have any columns.
Another option could be to create a test column, alter the table, stream the data then remove the test column.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
10-18-2022 06:29 AM
- Create, as you said table registered in metastore, but for that, you need to define the schema
- write stream directly into that table .writeStream.table(table_name)
To get the schema, just read your CSV as not stream and take it from dataframe
csvFile = spark.read.csv("yourfile")
csvFile .createOrReplaceTempView("csvFile ")
schema_json = spark.sql("SELECT * FROM csvFile ").schema.json()
ddl = spark.sparkContext._jvm.org.apache.spark.sql.types.DataType.fromJson(schema_json).toDDL()
print(ddl)
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
11-01-2022 12:45 PM
@Hubert Dudek thanks for your response! I was able to use what you proposed above to generate the schema. The issue is that the schema sets all attributes to STRING values and renames them numerically ('_c0', '_c1', etc.). Although this allows us to then write stream, it does not accurately portray the schema or the attribute names. If I instead use:
csvFile = spark.read.options(header = 'True',inferSchema='True').csv(data_source)
I can then alter the attribute names with:
json_str = json.dumps(schema_json)
json_str = json_str.replace(" ", "")
schema_json = json.loads(json_str)
We can then create the ddl and pass it as the schema to the readStream.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
09-10-2024 01:47 PM
can you give me the step by step code to achieve this end to end?