<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Autoloader configuration with data type casting in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/autoloader-configuration-with-data-type-casting/m-p/62650#M32029</link>
    <description>&lt;P&gt;Hi&lt;BR /&gt;1: I am reading a parquet file from AWS s3 storage using spark.read.parquet(&amp;lt;s3 path&amp;gt;)&amp;nbsp;&lt;BR /&gt;2: An autoloader job has been configured to load this data into a external delta table.&lt;/P&gt;&lt;P&gt;3: But before loading into this autoloader I need to do some typecasting of data types to match them with my target table.&lt;BR /&gt;I have tried using of "schemaHints", tried casting of data after loading data and pass the transformed DF to autoloader as source, "inferSchema" and mergeSchema options to do this.&amp;nbsp;&lt;BR /&gt;But none of them are not working. The casted columns are storing in '_rescue_data' directly.&lt;/P&gt;&lt;P&gt;Is there any way to do this?&lt;BR /&gt;NOTE: I need to avoid maintaining multiple locations ie., avoid writing transformed data to a another parquet file and read it from their.&lt;/P&gt;</description>
    <pubDate>Tue, 05 Mar 2024 12:57:02 GMT</pubDate>
    <dc:creator>srinivas_001</dc:creator>
    <dc:date>2024-03-05T12:57:02Z</dc:date>
    <item>
      <title>Autoloader configuration with data type casting</title>
      <link>https://community.databricks.com/t5/data-engineering/autoloader-configuration-with-data-type-casting/m-p/62650#M32029</link>
      <description>&lt;P&gt;Hi&lt;BR /&gt;1: I am reading a parquet file from AWS s3 storage using spark.read.parquet(&amp;lt;s3 path&amp;gt;)&amp;nbsp;&lt;BR /&gt;2: An autoloader job has been configured to load this data into a external delta table.&lt;/P&gt;&lt;P&gt;3: But before loading into this autoloader I need to do some typecasting of data types to match them with my target table.&lt;BR /&gt;I have tried using of "schemaHints", tried casting of data after loading data and pass the transformed DF to autoloader as source, "inferSchema" and mergeSchema options to do this.&amp;nbsp;&lt;BR /&gt;But none of them are not working. The casted columns are storing in '_rescue_data' directly.&lt;/P&gt;&lt;P&gt;Is there any way to do this?&lt;BR /&gt;NOTE: I need to avoid maintaining multiple locations ie., avoid writing transformed data to a another parquet file and read it from their.&lt;/P&gt;</description>
      <pubDate>Tue, 05 Mar 2024 12:57:02 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/autoloader-configuration-with-data-type-casting/m-p/62650#M32029</guid>
      <dc:creator>srinivas_001</dc:creator>
      <dc:date>2024-03-05T12:57:02Z</dc:date>
    </item>
    <item>
      <title>Re: Autoloader configuration with data type casting</title>
      <link>https://community.databricks.com/t5/data-engineering/autoloader-configuration-with-data-type-casting/m-p/62659#M32035</link>
      <description>&lt;P&gt;Please share your code and example file&lt;/P&gt;</description>
      <pubDate>Tue, 05 Mar 2024 14:37:32 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/autoloader-configuration-with-data-type-casting/m-p/62659#M32035</guid>
      <dc:creator>Hubert-Dudek</dc:creator>
      <dc:date>2024-03-05T14:37:32Z</dc:date>
    </item>
    <item>
      <title>Re: Autoloader configuration with data type casting</title>
      <link>https://community.databricks.com/t5/data-engineering/autoloader-configuration-with-data-type-casting/m-p/62674#M32038</link>
      <description>&lt;P&gt;Table creation:&lt;/P&gt;&lt;P&gt;CREATE EXTERNAL TABLE test_cat.test_schema.test_table (&lt;BR /&gt;id STRING, name STRING,&lt;BR /&gt;age INT,&lt;BR /&gt;dob STRING&lt;BR /&gt;)&lt;BR /&gt;USING DELTA&lt;BR /&gt;PARTITIONED BY (dob)&lt;BR /&gt;LOCATION '/path/to/delta/table/location'&lt;/P&gt;&lt;P&gt;parquet file:&lt;BR /&gt;while reading data from parquet file it was taking age as Long and dob as date&lt;/P&gt;&lt;P&gt;for sample data to process autloloader:&lt;/P&gt;&lt;P&gt;from pyspark.sql.function import *&lt;BR /&gt;from pyspark.sql.types import *&lt;/P&gt;&lt;P&gt;schema = StructType([&lt;BR /&gt;StructField("id", StringType(), True),&lt;BR /&gt;StructField("name", StringType(), True),&lt;BR /&gt;StructField("age", LongType(), True),&lt;BR /&gt;StructField("dob", DateType(), True)&lt;BR /&gt;])&lt;/P&gt;&lt;P&gt;data=[("1","srini","25",date(2024,03,5))]&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;version 1:&lt;BR /&gt;df al_write(datasource,checkdir,sformat,tabname):&lt;BR /&gt;stream_to_write=spark.readStream.format("cloudFiles")&lt;BR /&gt;.option("cloudFiles.format",sformat)&lt;BR /&gt;.option("cloudFiles.schemaLocation",checkdir)&lt;BR /&gt;.option("inferSchema","false")&lt;BR /&gt;.option("cloudFiles.schemaHints","age INT, dob string")&lt;BR /&gt;.load(ds)&lt;BR /&gt;.writeStream&lt;BR /&gt;.format("delta")&lt;BR /&gt;.option("checkpointLocation",checkdir)&lt;BR /&gt;.option("mergeSchema","true")&lt;BR /&gt;.option("cloudFiles.useIncrementalListing","true").&lt;BR /&gt;.table(tabname)&lt;BR /&gt;return stream_to_write&lt;BR /&gt;&lt;BR /&gt;from pyspark.sql.function import *&lt;BR /&gt;from pyspark.sql.types import *&lt;BR /&gt;ds="s3://&amp;lt;path to parquet file&amp;gt;"&lt;BR /&gt;checkdir="s3://path to checkdir"&lt;BR /&gt;tablename="test_streaming"&lt;BR /&gt;stream_to_write=al_write(datasource=ds,checkdir=checkdir,sformat="parquet",tabname=tablename)&lt;/P&gt;&lt;P&gt;version 2:&lt;BR /&gt;df al_write(datasource,checkdir,sformat,tabname):&lt;BR /&gt;stream_to_write=spark.readStream.format("cloudFiles")&lt;BR /&gt;.option("cloudFiles.format",sformat)&lt;BR /&gt;.option("cloudFiles.schemaLocation",checkdir)&lt;BR /&gt;.option("inferSchema","false")&lt;BR /&gt;.option("cloudFiles.schemaHints","age INT, dob string")&lt;BR /&gt;.load(ds)&lt;BR /&gt;df_tranform=stream_to_write.withColumn("age",col("age").cast(IntegerType()))&lt;BR /&gt;df_tranform.writeStream.format("delta").option("checkpoiintLocation",checkdir).option("mergeSchema","true").trigger(availableNow=True).option("cloudFiles.useIncrementalListing","true").table(tabname)&lt;BR /&gt;return stream_to_write&lt;BR /&gt;&lt;BR /&gt;from pyspark.sql.function import *&lt;BR /&gt;from pyspark.sql.types import *&lt;BR /&gt;ds="s3://&amp;lt;path to parquet file&amp;gt;"&lt;BR /&gt;checkdir="s3://path to checkdir"&lt;BR /&gt;tablename="test_streaming"&lt;BR /&gt;stream_to_write=al_write(datasource=ds,checkdir=checkdir,sformat="parquet",tabname=tablename)&lt;/P&gt;&lt;P&gt;version 3:&lt;/P&gt;&lt;P&gt;&lt;BR /&gt;df al_write(datasource,checkdir,sformat,tabname,schema):&lt;BR /&gt;stream_to_write=spark.readStream.format("cloudFiles")&lt;BR /&gt;.option("cloudFiles.format",sformat)&lt;BR /&gt;.option("cloudFiles.schemaLocation",checkdir)&lt;BR /&gt;.schema(schema)&lt;BR /&gt;.load(ds)&lt;BR /&gt;.writeStream&lt;BR /&gt;.format("delta")&lt;BR /&gt;.option("checkpointLocation",checkdir)&lt;BR /&gt;.option("mergeSchema","true")&lt;BR /&gt;.option("cloudFiles.useIncrementalListing","true").&lt;BR /&gt;.table(tabname)&lt;BR /&gt;return stream_to_write&lt;BR /&gt;&lt;BR /&gt;&lt;/P&gt;&lt;P&gt;from pyspark.sql.function import *&lt;BR /&gt;from pyspark.sql.types import *&lt;BR /&gt;schema = StructType([&lt;BR /&gt;StructField("id", StringType(), True),&lt;BR /&gt;StructField("name", StringType(), True),&lt;BR /&gt;StructField("age", IntegerType(), True),&lt;BR /&gt;StructField("dob", StringType(), True)&lt;BR /&gt;])&lt;/P&gt;&lt;P&gt;ds="s3://&amp;lt;path to parquet file&amp;gt;"&lt;BR /&gt;checkdir="s3://path to checkdir"&lt;BR /&gt;tablename="test_streaming"&lt;BR /&gt;stream_to_write=al_write(datasource=ds,checkdir=checkdir,sformat="parquet",tabname=tablename,schema=schema)&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Tue, 05 Mar 2024 15:17:12 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/autoloader-configuration-with-data-type-casting/m-p/62674#M32038</guid>
      <dc:creator>srinivas_001</dc:creator>
      <dc:date>2024-03-05T15:17:12Z</dc:date>
    </item>
    <item>
      <title>Re: Autoloader configuration with data type casting</title>
      <link>https://community.databricks.com/t5/data-engineering/autoloader-configuration-with-data-type-casting/m-p/100992#M40506</link>
      <description>&lt;P&gt;Data can be added to the rescued data column when types do not match and when implicit casting does not work. However, check to see if the typecasting you're trying to do is supported by &lt;A href="https://docs.databricks.com/en/delta/type-widening.html" target="_self"&gt;Delta Lake's type widening feature&lt;/A&gt;, which gives more flexibility and may make it so that your data goes to the correct column automatically. In other cases, you likely will need to re-create the table with the proper "higher" type - in this example, StringType&lt;/P&gt;</description>
      <pubDate>Thu, 05 Dec 2024 00:37:34 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/autoloader-configuration-with-data-type-casting/m-p/100992#M40506</guid>
      <dc:creator>cgrant</dc:creator>
      <dc:date>2024-12-05T00:37:34Z</dc:date>
    </item>
  </channel>
</rss>

