Hi @theunwoke,
The 1h45m you are seeing is heavily influenced by cross-region network transfer between eu-central-1 (Frankfurt) and us-west (your workspace region). S3 reads that cross AWS regions go over the public internet backbone, so throughput per connection is lower and latency is higher than same-region reads. There are several levers you can pull to bring this down significantly.
UNDERSTAND THE BOTTLENECK
With a single r5d.2xlarge node (8 vCPUs, 64 GB RAM), you have limited parallelism for both the read and write sides. Cross-region S3 reads are bandwidth-constrained per connection, so the key strategy is to increase the number of parallel connections and, if possible, reduce or eliminate the cross-region hop entirely.
OPTION 1: SCALE OUT THE CLUSTER (QUICKEST WIN)
A single r5d.2xlarge gives you 8 cores. For 1 billion rows of Parquet, that is not enough parallelism to saturate network bandwidth on cross-region reads. Consider:
- Use a multi-node cluster. For example, 4-8 workers of r5d.xlarge or r5d.2xlarge. Each additional node opens its own set of parallel S3 connections, multiplying your effective throughput.
- Make sure the number of Parquet files (or file partitions) is large enough to keep all cores busy. If the source data is stored as a small number of large files, Spark will not be able to parallelize the read effectively. You can check with:
spark.read.parquet(s3_path).rdd.getNumPartitions()
If the partition count is low relative to your total cores, the read will be serialized across just a few tasks regardless of cluster size.
OPTION 2: INCREASE READ PARALLELISM SETTINGS
You can tune Spark's S3 connection settings to open more parallel HTTP connections per executor:
spark.conf.set("spark.hadoop.fs.s3a.connection.maximum", "200")
spark.conf.set("spark.hadoop.fs.s3a.threads.max", "64")
These defaults are conservative and increasing them helps when reading many files or large files cross-region.
Also, if your Parquet files are large (hundreds of MB each), consider setting a smaller split size so Spark creates more read tasks per file:
spark.conf.set("spark.sql.files.maxPartitionBytes", "64m")
This gives you more parallelism without needing more source files.
OPTION 3: ELIMINATE THE CROSS-REGION HOP (BEST LONG-TERM)
The most impactful optimization is to avoid cross-region reads entirely during the Databricks job:
1. Use S3 Cross-Region Replication (CRR) to replicate the Frankfurt bucket to a bucket in your workspace region (us-west-2). AWS CRR is asynchronous but handles this at the S3 layer with no compute cost on your side. Then point your Databricks job at the local replica bucket.
AWS documentation: https://docs.aws.amazon.com/AmazonS3/latest/userguide/replication.html
2. Alternatively, use AWS S3 Transfer Acceleration on the source bucket to speed up cross-region reads. This routes traffic through AWS CloudFront edge locations.
AWS documentation: https://docs.aws.amazon.com/AmazonS3/latest/userguide/transfer-acceleration.html
With a same-region bucket, the read portion of your job would be dramatically faster since S3 to EC2 within the same region uses the high-bandwidth AWS internal network.
OPTION 4: OPTIMIZE THE WRITE SIDE
Your current code uses repartition(num_cores*2) before writing. A few things to check:
- repartition() triggers a full shuffle, which is expensive. If you do not need an exact partition count, use coalesce() instead (it avoids a shuffle when reducing partitions). However, if you are increasing partitions for write parallelism, repartition is the right call.
- For a 1 billion row dataset, you likely want more write partitions. A common guideline is to target output files around 128 MB to 256 MB each. With only 16 partitions (8 cores * 2), each output file could be very large. Try a higher number:
test_data.repartition(200).write.mode("overwrite").saveAsTable(new_path)
- Enable optimized writes and auto compaction on the target table to let Databricks handle file sizing automatically:
spark.conf.set("spark.databricks.delta.optimizeWrite.enabled", "true")
spark.conf.set("spark.databricks.delta.autoCompact.enabled", "true")
With optimized writes enabled, you can skip the manual repartition entirely:
test_data = spark.read.schema(schema).parquet(s3_path)
test_data.write.mode("overwrite").saveAsTable(new_path)
Databricks will automatically right-size the output files.
Documentation: https://docs.databricks.com/aws/en/delta/optimize
OPTION 5: USE COPY INTO OR AUTOLOADER
For loading Parquet from S3 into a Delta table, COPY INTO is designed for exactly this pattern and handles parallelism internally:
COPY INTO my_catalog.my_schema.my_table
FROM 's3://your-bucket/path/'
FILEFORMAT = PARQUET
COPY INTO is idempotent (it skips files already loaded) and can be more efficient than a manual spark.read/write pipeline for bulk loads.
Documentation: https://docs.databricks.com/aws/en/ingestion/cloud-object-storage/copy-into
RECOMMENDED ACTION PLAN
1. Short term: Scale up to a multi-worker cluster (4-8 nodes) and increase S3 connection parallelism settings. Drop the manual repartition and enable optimized writes. This alone should cut your time significantly.
2. Medium term: Set up S3 Cross-Region Replication from your Frankfurt bucket to a us-west-2 bucket. Create an external location in Unity Catalog pointing to the local replica. This eliminates cross-region latency from the Databricks job entirely.
3. Register the source as an external location in Unity Catalog so access is governed and auditable:
https://docs.databricks.com/aws/en/connect/unity-catalog/external-locations
Let us know how it goes after trying some of these approaches.
* This reply used an agent system I built to research and draft this response based on the wide set of documentation I have available and previous memory. I personally review the draft for any obvious issues and for monitoring system reliability and update it when I detect any drift, but there is still a small chance that something is inaccurate, especially if you are experimenting with brand new features.