cancel
Showing results for 
Search instead for 
Did you mean: 

Structured Streaming of S3 source

dvmentalmadess
Contributor

I am trying to setup s3 as a structured streaming source. The bucket receives ~17K files/day and the original load to the bucket was ~54K files. The bucket was first loaded 3 months ago and we haven't started reading from it since. So let's say there's 1,530,054 files based on back of the napkin (54 + (17 x 30)). We are trying to find a way to reduce the number of files by increasing their size, but it isn't something we have much more control than providing file size hints and the actual files are nowhere near the hint we've specified.

We initially created the job as a streaming job in the hopes Spark would have no issues processing the files incrementally (17k / 24 ~= 700/hour). However, we can't currently process the backlog because Spark is dying w/ OOM errors.

First, I'm trying to confirm that our problem is actually due to the number of files on the initial read. Does Spark Streaming work the same way as batch does, i.e., it is failing when trying to load 1.5MM files rather than just trying to iterate over the files from S3 in order? Also, once we get the initial load done will using streaming w/ a checkpoint avoid the problem or will Spark still try to list and read all the file metadata on every run?

3 REPLIES 3

Kaniz
Community Manager
Community Manager

Hi @dvmentalmadess , 

Spark Structured Streaming processes data incrementally, unlike batch processing.
1. Processing a large number of files can cause OutOfMemory (OOM) errors.
2. To avoid processing the entire table, the starting point of the streaming query can be set using the startingVersion and startingTimestamp options.
3. Checkpointing saves the state of a streaming application to a reliable storage system for recovery in case of failure.
4.  Upgraded S3 connector based on Hadoop 3.1.3 has stability improvements and optimizations on LIST, reducing LIST cost.
6.  Consider using a different data format or partitioning data to read smaller subsets at a time if issues persist.

dvmentalmadess
Contributor

Thanks,

We were able to make things work by increasing the driver instance size so it has more memory for the initial load. After initial load we scaled the instance down for subsequent runs. We're still testing, if we aren't able to make it work we'll try the suggestions.

It's great to hear that increasing the driver instance size helped with your Structured Streaming setup for S3! If you have any more questions or need further assistance, feel free to ask.
Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.