Goal
Import and consolidate GBs / TBs of local data in 20-mb chunk parquet files into Databricks / Delta lake / partitioned tables.
What I've Done
I took a small subset of data, roughly 72.5 GB and ingested using streaming below. The data is already sequentially ordered and hierarchically sorted as `id/time_bin/smaller_time_bin.parquet`. The partitions should represent ~12-20 GB each. Results:
- It takes ~4.5 hours with 4 XLarge "General" compute instances
- It blows up the size to ~300 GB
- It can't handle the schema I've supplied, causes "PlainLong" map errors (I'll grab that later if relevant)
Why is it blowing up so large and why does it take so long? I have worked on a project before that had ~15 GB and it only took ~5-10 minutes to ingest with 1-2 Large general instances, so I suspect there's an issue with my read code below, otherwise it should only take ~30 minutes to 1 hour to run.
Note: the original compression is brotli, but this is not supported by Delta cache, so I let it convert to Snappy (default compression).
Code
from pyspark.sql.types import *
from pyspark.sql.functions import col, current_timestamp, expr, input_file_name, regexp_extract, unix_timestamp, from_unixtime, year, month, dayofmonth, hour
# Define variables used in code below
file_path = "s3://<bucket>/parquet/<id>/**/<time_series_data>/*.parquet"
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
table_name = f"test_small_1"
checkpoint_path = f"/tmp/{username}/_checkpoint/ingest_test_small_1"
# Clear out data from previous demo execution
spark.sql(f"DROP TABLE IF EXISTS {table_name}")
dbutils.fs.rm(checkpoint_path, True)
schema = StructType([
StructField("ts", LongType()),
StructField("x", FloatType()),
StructField("y", FloatType()),
StructField("z", FloatType()),
])
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.allowOverwrites", True)
.option("modifiedAfter", "2022-12-21 00:00:00.000000 UTC+0")
.option("cloudFiles.format", "parquet")
.option("cloudFiles.schemaLocation", checkpoint_path)
.option("spark.databricks.io.cache.enabled", False)
.option("mergeSchema", "True")
# .schema(schema)
.load(file_path)
.select("*")
.withColumn("id", regexp_extract(input_file_name(), r's3://<bucket>/parquet/([a-zA-Z0-9]*)/.*', 1))
.withColumn('timestamp', expr('timestamp_micros(BIGINT(ts))'))
.withColumn("year", year(col("timestamp")))
.withColumn("month", month(col("timestamp")))
.writeStream
.partitionBy("year", "month")
.option("checkpointLocation", checkpoint_path)
.trigger(availableNow=True)
.toTable(table_name))