cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Autoloader configuration with data type casting

srinivas_001
New Contributor III

Hi
1: I am reading a parquet file from AWS s3 storage using spark.read.parquet(<s3 path>) 
2: An autoloader job has been configured to load this data into a external delta table.

3: But before loading into this autoloader I need to do some typecasting of data types to match them with my target table.
I have tried using of "schemaHints", tried casting of data after loading data and pass the transformed DF to autoloader as source, "inferSchema" and mergeSchema options to do this. 
But none of them are not working. The casted columns are storing in '_rescue_data' directly.

Is there any way to do this?
NOTE: I need to avoid maintaining multiple locations ie., avoid writing transformed data to a another parquet file and read it from their.

3 REPLIES 3

Hubert-Dudek
Esteemed Contributor III

Please share your code and example file

Table creation:

CREATE EXTERNAL TABLE test_cat.test_schema.test_table (
id STRING, name STRING,
age INT,
dob STRING
)
USING DELTA
PARTITIONED BY (dob)
LOCATION '/path/to/delta/table/location'

parquet file:
while reading data from parquet file it was taking age as Long and dob as date

for sample data to process autloloader:

from pyspark.sql.function import *
from pyspark.sql.types import *

schema = StructType([
StructField("id", StringType(), True),
StructField("name", StringType(), True),
StructField("age", LongType(), True),
StructField("dob", DateType(), True)
])

data=[("1","srini","25",date(2024,03,5))]

 

 

version 1:
df al_write(datasource,checkdir,sformat,tabname):
stream_to_write=spark.readStream.format("cloudFiles")
.option("cloudFiles.format",sformat)
.option("cloudFiles.schemaLocation",checkdir)
.option("inferSchema","false")
.option("cloudFiles.schemaHints","age INT, dob string")
.load(ds)
.writeStream
.format("delta")
.option("checkpointLocation",checkdir)
.option("mergeSchema","true")
.option("cloudFiles.useIncrementalListing","true").
.table(tabname)
return stream_to_write

from pyspark.sql.function import *
from pyspark.sql.types import *
ds="s3://<path to parquet file>"
checkdir="s3://path to checkdir"
tablename="test_streaming"
stream_to_write=al_write(datasource=ds,checkdir=checkdir,sformat="parquet",tabname=tablename)

version 2:
df al_write(datasource,checkdir,sformat,tabname):
stream_to_write=spark.readStream.format("cloudFiles")
.option("cloudFiles.format",sformat)
.option("cloudFiles.schemaLocation",checkdir)
.option("inferSchema","false")
.option("cloudFiles.schemaHints","age INT, dob string")
.load(ds)
df_tranform=stream_to_write.withColumn("age",col("age").cast(IntegerType()))
df_tranform.writeStream.format("delta").option("checkpoiintLocation",checkdir).option("mergeSchema","true").trigger(availableNow=True).option("cloudFiles.useIncrementalListing","true").table(tabname)
return stream_to_write

from pyspark.sql.function import *
from pyspark.sql.types import *
ds="s3://<path to parquet file>"
checkdir="s3://path to checkdir"
tablename="test_streaming"
stream_to_write=al_write(datasource=ds,checkdir=checkdir,sformat="parquet",tabname=tablename)

version 3:


df al_write(datasource,checkdir,sformat,tabname,schema):
stream_to_write=spark.readStream.format("cloudFiles")
.option("cloudFiles.format",sformat)
.option("cloudFiles.schemaLocation",checkdir)
.schema(schema)
.load(ds)
.writeStream
.format("delta")
.option("checkpointLocation",checkdir)
.option("mergeSchema","true")
.option("cloudFiles.useIncrementalListing","true").
.table(tabname)
return stream_to_write

from pyspark.sql.function import *
from pyspark.sql.types import *
schema = StructType([
StructField("id", StringType(), True),
StructField("name", StringType(), True),
StructField("age", IntegerType(), True),
StructField("dob", StringType(), True)
])

ds="s3://<path to parquet file>"
checkdir="s3://path to checkdir"
tablename="test_streaming"
stream_to_write=al_write(datasource=ds,checkdir=checkdir,sformat="parquet",tabname=tablename,schema=schema)

 

Kaniz
Community Manager
Community Manager

Hi @srinivas_001 , When working with Parquet files in Spark, you can read the file, perform typecasting on the columns, and then write the transformed data directly to your external Delta table. 

 

Let’s break down the steps:

 

  1. Read the Parquet file: Start by reading the Parquet file from your AWS S3 storage using spark.read.parquet(<s3 path>). This will load the data into a DataFrame.
  2. Typecast the columns: After reading the data, you can use the withColumn function along with the cast method to change the data types of specific columns. 

    For example:

from pyspark.sql.functions import col df = spark.read.parquet('<s3 path>') df = df.select(col('col1').cast('int'), col('col2').cast('string'))

Write to the external Delta table: Once you’ve typecasted the columns, you can directly write the transformed DataFrame to your external Delta table. 

Use the df.write.format('delta').mode('overwrite').save('<delta table path>') method to achieve this.

 

By following these steps, you’ll avoid maintaining multiple locations and directly load the transformed data into your Delta table. 

 

Remember to replace <s3 path> and <delta table path> with the actual paths relevant to your setup.

 

Feel free to adapt this approach to your specific use case, and let me know if you need further assistance! 🚀

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.