cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Data size inflates massively while ingesting

Erik_L
Contributor II

Goal

Import and consolidate GBs / TBs of local data in 20-mb chunk parquet files into Databricks / Delta lake / partitioned tables.

What I've Done

I took a small subset of data, roughly 72.5 GB and ingested using streaming below. The data is already sequentially ordered and hierarchically sorted as `id/time_bin/smaller_time_bin.parquet`. The partitions should represent ~12-20 GB each. Results:

  • It takes ~4.5 hours with 4 XLarge "General" compute instances
  • It blows up the size to ~300 GB
  • It can't handle the schema I've supplied, causes "PlainLong" map errors (I'll grab that later if relevant)

Why is it blowing up so large and why does it take so long? I have worked on a project before that had ~15 GB and it only took ~5-10 minutes to ingest with 1-2 Large general instances, so I suspect there's an issue with my read code below, otherwise it should only take ~30 minutes to 1 hour to run.

Note: the original compression is brotli, but this is not supported by Delta cache, so I let it convert to Snappy (default compression).

Code

from pyspark.sql.types import *
from pyspark.sql.functions import col, current_timestamp, expr, input_file_name, regexp_extract, unix_timestamp, from_unixtime, year, month, dayofmonth, hour
 
    
# Define variables used in code below
file_path = "s3://<bucket>/parquet/<id>/**/<time_series_data>/*.parquet"
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
table_name = f"test_small_1"
checkpoint_path = f"/tmp/{username}/_checkpoint/ingest_test_small_1"
 
# Clear out data from previous demo execution
spark.sql(f"DROP TABLE IF EXISTS {table_name}")
dbutils.fs.rm(checkpoint_path, True)
 
schema = StructType([
    StructField("ts", LongType()),
    StructField("x", FloatType()),
    StructField("y", FloatType()),
    StructField("z", FloatType()),
])
(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.allowOverwrites", True)
  .option("modifiedAfter", "2022-12-21 00:00:00.000000 UTC+0")
  .option("cloudFiles.format", "parquet")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .option("spark.databricks.io.cache.enabled", False)
  .option("mergeSchema", "True")
#   .schema(schema)
  .load(file_path)
  .select("*")
  .withColumn("id", regexp_extract(input_file_name(), r's3://<bucket>/parquet/([a-zA-Z0-9]*)/.*', 1))
    .withColumn('timestamp', expr('timestamp_micros(BIGINT(ts))')) 
    .withColumn("year", year(col("timestamp")))
    .withColumn("month", month(col("timestamp")))
  .writeStream
  .partitionBy("year", "month")
  .option("checkpointLocation", checkpoint_path)
  .trigger(availableNow=True)
  .toTable(table_name))

1 ACCEPTED SOLUTION

Accepted Solutions

Anonymous
Not applicable

Def running into small file problems. 70gb in 20mbg chunks is 3500 files to read and process so tons of scheduling and overhead.

Adding the year and month columns won't actually increase the data size because those values are captured in the path and not in the actual file.

View solution in original post

3 REPLIES 3

Erik_L
Contributor II

I suspect that the adding of partitioning columns "year", "month", and "id" double the per-row size. Brotli is a better compression than Snappy, so perhaps that makes up the ~2x remaining difference in size?

Is there a better way to avoid adding columns to do partitioning? Also, should I avoid using IDs and instead use an int, given that the number of unique IDs is on the order of 100's, to save space and perhaps result in better Z-ordering?

Anonymous
Not applicable

Def running into small file problems. 70gb in 20mbg chunks is 3500 files to read and process so tons of scheduling and overhead.

Adding the year and month columns won't actually increase the data size because those values are captured in the path and not in the actual file.

Anonymous
Not applicable

Hi @Erik Louie​ 

Hope all is well! Just wanted to check in if you were able to resolve your issue and would you be happy to share the solution or mark an answer as best? Else please let us know if you need more help. 

We'd love to hear from you.

Thanks!

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.