- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
02-08-2023 10:41 AM
Goal
Import and consolidate GBs / TBs of local data in 20-mb chunk parquet files into Databricks / Delta lake / partitioned tables.
What I've Done
I took a small subset of data, roughly 72.5 GB and ingested using streaming below. The data is already sequentially ordered and hierarchically sorted as `id/time_bin/smaller_time_bin.parquet`. The partitions should represent ~12-20 GB each. Results:
- It takes ~4.5 hours with 4 XLarge "General" compute instances
- It blows up the size to ~300 GB
- It can't handle the schema I've supplied, causes "PlainLong" map errors (I'll grab that later if relevant)
Why is it blowing up so large and why does it take so long? I have worked on a project before that had ~15 GB and it only took ~5-10 minutes to ingest with 1-2 Large general instances, so I suspect there's an issue with my read code below, otherwise it should only take ~30 minutes to 1 hour to run.
Note: the original compression is brotli, but this is not supported by Delta cache, so I let it convert to Snappy (default compression).
Code
from pyspark.sql.types import *
from pyspark.sql.functions import col, current_timestamp, expr, input_file_name, regexp_extract, unix_timestamp, from_unixtime, year, month, dayofmonth, hour
# Define variables used in code below
file_path = "s3://<bucket>/parquet/<id>/**/<time_series_data>/*.parquet"
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
table_name = f"test_small_1"
checkpoint_path = f"/tmp/{username}/_checkpoint/ingest_test_small_1"
# Clear out data from previous demo execution
spark.sql(f"DROP TABLE IF EXISTS {table_name}")
dbutils.fs.rm(checkpoint_path, True)
schema = StructType([
StructField("ts", LongType()),
StructField("x", FloatType()),
StructField("y", FloatType()),
StructField("z", FloatType()),
])
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.allowOverwrites", True)
.option("modifiedAfter", "2022-12-21 00:00:00.000000 UTC+0")
.option("cloudFiles.format", "parquet")
.option("cloudFiles.schemaLocation", checkpoint_path)
.option("spark.databricks.io.cache.enabled", False)
.option("mergeSchema", "True")
# .schema(schema)
.load(file_path)
.select("*")
.withColumn("id", regexp_extract(input_file_name(), r's3://<bucket>/parquet/([a-zA-Z0-9]*)/.*', 1))
.withColumn('timestamp', expr('timestamp_micros(BIGINT(ts))'))
.withColumn("year", year(col("timestamp")))
.withColumn("month", month(col("timestamp")))
.writeStream
.partitionBy("year", "month")
.option("checkpointLocation", checkpoint_path)
.trigger(availableNow=True)
.toTable(table_name))
- Labels:
-
Data Ingestion & connectivity
-
Slow
Accepted Solutions
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
02-08-2023 04:23 PM
Def running into small file problems. 70gb in 20mbg chunks is 3500 files to read and process so tons of scheduling and overhead.
Adding the year and month columns won't actually increase the data size because those values are captured in the path and not in the actual file.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
02-08-2023 11:19 AM
I suspect that the adding of partitioning columns "year", "month", and "id" double the per-row size. Brotli is a better compression than Snappy, so perhaps that makes up the ~2x remaining difference in size?
Is there a better way to avoid adding columns to do partitioning? Also, should I avoid using IDs and instead use an int, given that the number of unique IDs is on the order of 100's, to save space and perhaps result in better Z-ordering?
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
02-08-2023 04:23 PM
Def running into small file problems. 70gb in 20mbg chunks is 3500 files to read and process so tons of scheduling and overhead.
Adding the year and month columns won't actually increase the data size because those values are captured in the path and not in the actual file.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
02-12-2023 10:56 PM
Hi @Erik Louie
Hope all is well! Just wanted to check in if you were able to resolve your issue and would you be happy to share the solution or mark an answer as best? Else please let us know if you need more help.
We'd love to hear from you.
Thanks!