cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Large Data ingestion issue using auto loader

bzh
New Contributor

 

The goal of this project is to ingest 1000+ files (100MB per file) from S3 into Databricks. Since this will be incremental changes, we are using Autoloader for continued ingestion and transformation using a cluster (i3.xlarge).
 
The current process is very slow. Feels like it might take days to complete.
1. Each file is about 100,000+ rows. But when I run the code, Spark UI shows there are over 3,000,000 rows (see attachment) being processed altogether even though I setup maxFilesPerTrigger=1
2. We are using UDFs. We understand UDFs can be a costly operation compared to the Pyspark dataframe. But some of the python logic is on row level transformation which is quite hard to convert into a dataframe.
 
 
 
Here is the flow of our code:

# read stream from s3 mount
df = (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("sep", "||||")
.schema(transaction_schema)
.option("maxFilesPerTrigger", 1)
.load("dbfs:/mnt/s3/public/test_transactions")
)

# load contracts_df once outside the streaming operation
contracts_df = spark.read.table("hive_metastore.spindl.contracts")

# apply transformations to the entire streaming DataFrame
df = df.withColumn("transaction_hash", F.col("id"))
# ... more transformations ...

# define the UDFs
contract_info_udf = F.udf(contract_info, ...

# apply the UDFs
df = df.withColumn("contract_info", contract_info_udf(F.struct(contracts_df.columns)))
# ... more transformations ...

# write into transactions table
df.write.mode("append").insertInto("hive_metastore.spindl.test_transactions")

# write the stream
query = df.writeStream\
.format("delta")\
.option("checkpointLocation", "/tmp/delta/test/_write_stream_checkpoints/")\
.start()

query.awaitTermination()

 

 
Here is the flow of our code. 
3 REPLIES 3

Tharun-Kumar
Databricks Employee
Databricks Employee

@bzh 

I highly suspect that a single file has 3 million records. To confirm if the data is from a single file or multiple files, could you add a new column with the value as input_file_name(). This would help us to understand whether the config maxFilesPerTrigger is being considered or not.

Document - https://docs.databricks.com/en/sql/language-manual/functions/input_file_name.html

Lakshay
Databricks Employee
Databricks Employee

It looks like the 3m records are coming from one file. And to process these records, you might need more cores in your cluster.

youssefmrini
Databricks Employee
Databricks Employee

 

There are several possible ways to improve the performance of your Spark streaming job for ingesting a large volume of S3 files. Here are a few suggestions:

  1. Tune the spark.sql.shuffle.partitions config parameter:

By default, the number of shuffle partitions is set to 200, which may be too low for your workload. This parameter controls how many partitions Spark should use when shuffling data between stages, and too few partitions can lead to low parallelism and slow performance. You can try increasing this parameter to a higher number, based on the size of your data and the number of cores on your cluster nodes, to increase parallelism and improve performance.

For example:

 

python
spark.conf.set("spark.sql.shuffle.partitions""1000")
 
  1. Coalesce the output dataframe before writing to Delta Lake:

When writing to Delta Lake, Spark first writes the output to a set of temporary files, and then merges them into the main table using a background job. If you have too many small files, this can lead to poor performance. To help mitigate this issue, you could coalesce the output dataframe to reduce the number of files created.

For example:

 

python
df.coalesce(16).write.mode("append").format("delta")...
  1. Simplify/ optimize your UDFs

Performing row level transformation using UDFs can be expensive, especially if the function is not optimized. You can try optimizing your UDFs by:

  • Broadcasting small DataFrames, making them available on all nodes.
  • Use vectorized UDFs
  • Avoid Python UDFs if possible.
  1. Increase instance types, adjust autoscaling:

You may want to consider using larger instance types. Alternatively, you could adjust your autoscaling policy to dynamically scale the cluster size based on incoming workload. You can use Databricks Autoscaling feature to help maximize utilization and reduce overall cluster costs.

  1. Use Delta's auto-optimize feature

Consider turning on Delta Lake's auto-optimize features explicitly. As you keep ingesting data, the options like optimize/compact can improve query performance.

These are some of the best practices to improve the Spark streaming application performance for ingesting large volumes of data from S3 using AutoLoader.

I hope that helps!

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group