Table creation:
CREATE EXTERNAL TABLE test_cat.test_schema.test_table (
id STRING, name STRING,
age INT,
dob STRING
)
USING DELTA
PARTITIONED BY (dob)
LOCATION '/path/to/delta/table/location'
parquet file:
while reading data from parquet file it was taking age as Long and dob as date
for sample data to process autloloader:
from pyspark.sql.function import *
from pyspark.sql.types import *
schema = StructType([
StructField("id", StringType(), True),
StructField("name", StringType(), True),
StructField("age", LongType(), True),
StructField("dob", DateType(), True)
])
data=[("1","srini","25",date(2024,03,5))]
version 1:
df al_write(datasource,checkdir,sformat,tabname):
stream_to_write=spark.readStream.format("cloudFiles")
.option("cloudFiles.format",sformat)
.option("cloudFiles.schemaLocation",checkdir)
.option("inferSchema","false")
.option("cloudFiles.schemaHints","age INT, dob string")
.load(ds)
.writeStream
.format("delta")
.option("checkpointLocation",checkdir)
.option("mergeSchema","true")
.option("cloudFiles.useIncrementalListing","true").
.table(tabname)
return stream_to_write
from pyspark.sql.function import *
from pyspark.sql.types import *
ds="s3://<path to parquet file>"
checkdir="s3://path to checkdir"
tablename="test_streaming"
stream_to_write=al_write(datasource=ds,checkdir=checkdir,sformat="parquet",tabname=tablename)
version 2:
df al_write(datasource,checkdir,sformat,tabname):
stream_to_write=spark.readStream.format("cloudFiles")
.option("cloudFiles.format",sformat)
.option("cloudFiles.schemaLocation",checkdir)
.option("inferSchema","false")
.option("cloudFiles.schemaHints","age INT, dob string")
.load(ds)
df_tranform=stream_to_write.withColumn("age",col("age").cast(IntegerType()))
df_tranform.writeStream.format("delta").option("checkpoiintLocation",checkdir).option("mergeSchema","true").trigger(availableNow=True).option("cloudFiles.useIncrementalListing","true").table(tabname)
return stream_to_write
from pyspark.sql.function import *
from pyspark.sql.types import *
ds="s3://<path to parquet file>"
checkdir="s3://path to checkdir"
tablename="test_streaming"
stream_to_write=al_write(datasource=ds,checkdir=checkdir,sformat="parquet",tabname=tablename)
version 3:
df al_write(datasource,checkdir,sformat,tabname,schema):
stream_to_write=spark.readStream.format("cloudFiles")
.option("cloudFiles.format",sformat)
.option("cloudFiles.schemaLocation",checkdir)
.schema(schema)
.load(ds)
.writeStream
.format("delta")
.option("checkpointLocation",checkdir)
.option("mergeSchema","true")
.option("cloudFiles.useIncrementalListing","true").
.table(tabname)
return stream_to_write
from pyspark.sql.function import *
from pyspark.sql.types import *
schema = StructType([
StructField("id", StringType(), True),
StructField("name", StringType(), True),
StructField("age", IntegerType(), True),
StructField("dob", StringType(), True)
])
ds="s3://<path to parquet file>"
checkdir="s3://path to checkdir"
tablename="test_streaming"
stream_to_write=al_write(datasource=ds,checkdir=checkdir,sformat="parquet",tabname=tablename,schema=schema)