Autoloader configuration with data type casting
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
03-05-2024 04:57 AM
Hi
1: I am reading a parquet file from AWS s3 storage using spark.read.parquet(<s3 path>)
2: An autoloader job has been configured to load this data into a external delta table.
3: But before loading into this autoloader I need to do some typecasting of data types to match them with my target table.
I have tried using of "schemaHints", tried casting of data after loading data and pass the transformed DF to autoloader as source, "inferSchema" and mergeSchema options to do this.
But none of them are not working. The casted columns are storing in '_rescue_data' directly.
Is there any way to do this?
NOTE: I need to avoid maintaining multiple locations ie., avoid writing transformed data to a another parquet file and read it from their.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
03-05-2024 06:37 AM
Please share your code and example file
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
03-05-2024 07:15 AM - edited 03-05-2024 07:17 AM
Table creation:
CREATE EXTERNAL TABLE test_cat.test_schema.test_table (
id STRING, name STRING,
age INT,
dob STRING
)
USING DELTA
PARTITIONED BY (dob)
LOCATION '/path/to/delta/table/location'
parquet file:
while reading data from parquet file it was taking age as Long and dob as date
for sample data to process autloloader:
from pyspark.sql.function import *
from pyspark.sql.types import *
schema = StructType([
StructField("id", StringType(), True),
StructField("name", StringType(), True),
StructField("age", LongType(), True),
StructField("dob", DateType(), True)
])
data=[("1","srini","25",date(2024,03,5))]
version 1:
df al_write(datasource,checkdir,sformat,tabname):
stream_to_write=spark.readStream.format("cloudFiles")
.option("cloudFiles.format",sformat)
.option("cloudFiles.schemaLocation",checkdir)
.option("inferSchema","false")
.option("cloudFiles.schemaHints","age INT, dob string")
.load(ds)
.writeStream
.format("delta")
.option("checkpointLocation",checkdir)
.option("mergeSchema","true")
.option("cloudFiles.useIncrementalListing","true").
.table(tabname)
return stream_to_write
from pyspark.sql.function import *
from pyspark.sql.types import *
ds="s3://<path to parquet file>"
checkdir="s3://path to checkdir"
tablename="test_streaming"
stream_to_write=al_write(datasource=ds,checkdir=checkdir,sformat="parquet",tabname=tablename)
version 2:
df al_write(datasource,checkdir,sformat,tabname):
stream_to_write=spark.readStream.format("cloudFiles")
.option("cloudFiles.format",sformat)
.option("cloudFiles.schemaLocation",checkdir)
.option("inferSchema","false")
.option("cloudFiles.schemaHints","age INT, dob string")
.load(ds)
df_tranform=stream_to_write.withColumn("age",col("age").cast(IntegerType()))
df_tranform.writeStream.format("delta").option("checkpoiintLocation",checkdir).option("mergeSchema","true").trigger(availableNow=True).option("cloudFiles.useIncrementalListing","true").table(tabname)
return stream_to_write
from pyspark.sql.function import *
from pyspark.sql.types import *
ds="s3://<path to parquet file>"
checkdir="s3://path to checkdir"
tablename="test_streaming"
stream_to_write=al_write(datasource=ds,checkdir=checkdir,sformat="parquet",tabname=tablename)
version 3:
df al_write(datasource,checkdir,sformat,tabname,schema):
stream_to_write=spark.readStream.format("cloudFiles")
.option("cloudFiles.format",sformat)
.option("cloudFiles.schemaLocation",checkdir)
.schema(schema)
.load(ds)
.writeStream
.format("delta")
.option("checkpointLocation",checkdir)
.option("mergeSchema","true")
.option("cloudFiles.useIncrementalListing","true").
.table(tabname)
return stream_to_write
from pyspark.sql.function import *
from pyspark.sql.types import *
schema = StructType([
StructField("id", StringType(), True),
StructField("name", StringType(), True),
StructField("age", IntegerType(), True),
StructField("dob", StringType(), True)
])
ds="s3://<path to parquet file>"
checkdir="s3://path to checkdir"
tablename="test_streaming"
stream_to_write=al_write(datasource=ds,checkdir=checkdir,sformat="parquet",tabname=tablename,schema=schema)
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
12-04-2024 04:37 PM
Data can be added to the rescued data column when types do not match and when implicit casting does not work. However, check to see if the typecasting you're trying to do is supported by Delta Lake's type widening feature, which gives more flexibility and may make it so that your data goes to the correct column automatically. In other cases, you likely will need to re-create the table with the proper "higher" type - in this example, StringType

