cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Autoloader configuration with data type casting

srinivas_001
New Contributor III

Hi
1: I am reading a parquet file from AWS s3 storage using spark.read.parquet(<s3 path>) 
2: An autoloader job has been configured to load this data into a external delta table.

3: But before loading into this autoloader I need to do some typecasting of data types to match them with my target table.
I have tried using of "schemaHints", tried casting of data after loading data and pass the transformed DF to autoloader as source, "inferSchema" and mergeSchema options to do this. 
But none of them are not working. The casted columns are storing in '_rescue_data' directly.

Is there any way to do this?
NOTE: I need to avoid maintaining multiple locations ie., avoid writing transformed data to a another parquet file and read it from their.

2 REPLIES 2

Hubert-Dudek
Esteemed Contributor III

Please share your code and example file

Table creation:

CREATE EXTERNAL TABLE test_cat.test_schema.test_table (
id STRING, name STRING,
age INT,
dob STRING
)
USING DELTA
PARTITIONED BY (dob)
LOCATION '/path/to/delta/table/location'

parquet file:
while reading data from parquet file it was taking age as Long and dob as date

for sample data to process autloloader:

from pyspark.sql.function import *
from pyspark.sql.types import *

schema = StructType([
StructField("id", StringType(), True),
StructField("name", StringType(), True),
StructField("age", LongType(), True),
StructField("dob", DateType(), True)
])

data=[("1","srini","25",date(2024,03,5))]

 

 

version 1:
df al_write(datasource,checkdir,sformat,tabname):
stream_to_write=spark.readStream.format("cloudFiles")
.option("cloudFiles.format",sformat)
.option("cloudFiles.schemaLocation",checkdir)
.option("inferSchema","false")
.option("cloudFiles.schemaHints","age INT, dob string")
.load(ds)
.writeStream
.format("delta")
.option("checkpointLocation",checkdir)
.option("mergeSchema","true")
.option("cloudFiles.useIncrementalListing","true").
.table(tabname)
return stream_to_write

from pyspark.sql.function import *
from pyspark.sql.types import *
ds="s3://<path to parquet file>"
checkdir="s3://path to checkdir"
tablename="test_streaming"
stream_to_write=al_write(datasource=ds,checkdir=checkdir,sformat="parquet",tabname=tablename)

version 2:
df al_write(datasource,checkdir,sformat,tabname):
stream_to_write=spark.readStream.format("cloudFiles")
.option("cloudFiles.format",sformat)
.option("cloudFiles.schemaLocation",checkdir)
.option("inferSchema","false")
.option("cloudFiles.schemaHints","age INT, dob string")
.load(ds)
df_tranform=stream_to_write.withColumn("age",col("age").cast(IntegerType()))
df_tranform.writeStream.format("delta").option("checkpoiintLocation",checkdir).option("mergeSchema","true").trigger(availableNow=True).option("cloudFiles.useIncrementalListing","true").table(tabname)
return stream_to_write

from pyspark.sql.function import *
from pyspark.sql.types import *
ds="s3://<path to parquet file>"
checkdir="s3://path to checkdir"
tablename="test_streaming"
stream_to_write=al_write(datasource=ds,checkdir=checkdir,sformat="parquet",tabname=tablename)

version 3:


df al_write(datasource,checkdir,sformat,tabname,schema):
stream_to_write=spark.readStream.format("cloudFiles")
.option("cloudFiles.format",sformat)
.option("cloudFiles.schemaLocation",checkdir)
.schema(schema)
.load(ds)
.writeStream
.format("delta")
.option("checkpointLocation",checkdir)
.option("mergeSchema","true")
.option("cloudFiles.useIncrementalListing","true").
.table(tabname)
return stream_to_write

from pyspark.sql.function import *
from pyspark.sql.types import *
schema = StructType([
StructField("id", StringType(), True),
StructField("name", StringType(), True),
StructField("age", IntegerType(), True),
StructField("dob", StringType(), True)
])

ds="s3://<path to parquet file>"
checkdir="s3://path to checkdir"
tablename="test_streaming"
stream_to_write=al_write(datasource=ds,checkdir=checkdir,sformat="parquet",tabname=tablename,schema=schema)

 

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group