08-30-2022 01:44 PM
I have 150k small csv files (~50Mb) stored in S3 which I want to load into a delta table.
All CSV files are stored in the following structure in S3:
bucket/folder/name_00000000_00000100.csv
bucket/folder/name_00000100_00000200.csv
This is the code I use to start the auto loader:
## mount external s3 bucket
dbutils.fs.mount(f"s3a://{access_key}:{encoded_secret_key}@{aws_bucket_name}", f"/mnt/{mount_name}")
## autoloader function
def autoload_csv (data_source, table_name, checkpoint_path, schema):
query = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("header","true")
.option("delimiter", ";")
.option("rescuedDataColumn", "_rescue")
.schema(schema)
.load(data_source)
.withColumn("timestamp",col("timestamp").cast(TimestampType()))
.writeStream.format("delta")
.trigger(once=True)
.option("mergeSchema", "true")
.option("checkpointLocation", checkpoint_path)
.toTable(tableName=table_name)
)
return query
## Define schema
schema = StructType([
StructField("timestamp", LongType(), True),
StructField(“aaa”, LongType(), True),
StructField(“bbb”, LongType(), True),
StructField(“ccc”, LongType(), True),
StructField(“eee”, LongType(), True),
StructField(“fff”, LongType(), True),
StructField(“ggg”, StringType(), True),
])
## start script (schema is
input_data_path = ‘/input_data
table_name = ‘default.input_data’
chkpt_path = '/tmp/input_data/_checkpoints'
query = autoload_csv(data_source=input_data_path, table_name=table_name,checkpoint_path=chkpt_path, schema=schema)
It takes two hours on eight 4 core/32GB RAW workers to import all files right now. There must be something wrong.
I have attached the following images:
How can I speed up the data import?
How can I debug the issue further by myself?
EDIT:
I just tried the same import pipeline with the same number but smaller files (<1Mb). I noticed that it runs more smoothly when I delete trigger(once=True). Unfortunately this doesn't help with bigger files. With bigger files the auto loader takes for ever to initialise the stream.
08-30-2022 08:19 PM
50mb in total or each of your 150,000 files is of size 50mb which is approx 7.15 TB?
08-30-2022 10:43 PM
Some files are a bit smaller some are a bit bigger. All files combined are probably between 4.5TB and 7 TB.
The import is also slow for much smaller files. I tried the same setup for 100k small files (<1MB) and it also takes a while (see edit).
09-09-2022 04:46 PM
You are using .trigger(once=True) so it will try to process all the data in a single micro-batch. I will recommend to use availableNow trigger to process your data in multiple micro-batches. Then you can check streaming metrics to check how many records per batch are you processing and how long is taking
09-09-2022 06:29 PM
Good pointer thank you! Is the feature also available in Python? Looking at the docs it looks like it's only available in Scala.
08-31-2022 12:47 AM
You should be using a cluster with a heavy drive node with 2-4 worker nodes. As this is just inserts and no joins are being performed.
Use like an i3.4xl driver and i3.xl worker type.
09-09-2022 06:17 PM
Hi,
I have ran more experiments regarding the data throughput. Will share my results soon.
09-17-2022 12:45 AM
Hi @Jan R
Hope all is well! Just wanted to check in if you were able to resolve your issue and would you be happy to share the solution or mark an answer as best? Else please let us know if you need more help.
We'd love to hear from you.
Thanks!
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group