cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Writing large parquet file (500 millions row / 1000 columns) to S3 takes too much time

WajdiFATHALLAH
New Contributor

Hello community,

First let me introduce my use case, i daily receive a 500 million rows like so :ID | Categories

1 | cat1, cat2, cat3, ..., catn

2 | cat1, catx, caty, ..., anothercategory

Input data: 50 compressed csv files each file is 250 MB -> Total :12.5 GB Compressed

The purpose is to answer questions like : find all ids that belongs to Catx and Caty, find ids that belongs to cat3 and not caty etc...: ie : ids in cat1 U cat2 or ids cat3 ∩ catx

Assuming that categories are dynamically created (every day i have a new set of categories) and my business wants to explore all possible intersections and unions ( we don't have a fixed set of queries) i came up with the following solution :

I wrote a spark job that transforms the date into a fat sparse matrix where columns are all possible categories plus a column ID, for each row and column i set true were id belongs to this category and false if not:ID | cat1 | cat2 | cat3 |...| catn | catx | caty | anothercategory |....

1 | true | true | true |...| true | false |false |false |....

2 | true |false |false |...|false | true | true | true |....

SQL can simply answer my questions, for instance if i want to find all ids that belongs to category cat1 and category catx then i run the following sql query against this matrix :

Select id from MyTable where cat1 = true and catx=true;

I choose to save this sparse matrix as a compressed parquet file, i made this choice with regards to the sparsity and the queries nature, i believe columnar storage is the most appropriate storage format.

Now with my use case described here are my observations, i may be missing some optimization points :
  • 12.5GB compressed input data after transformation take ~300GB
  • writing this sparse matrix as parquet takes too much time and resources, it took 2,3 hours with spark1.6 stand alone cluster of 6 aws instances r4.4xlarge (i set enough parallelization to distribute work and take advantage of all the workers i have)
  • i ended up with too many parquet files, the more i parallelize the smallest parquet files are. Seems like each RDD gives a single parquet file -> too many small files is not optimal to scan as my queries go through all the column values
  • I went through a lot of posts but still don't understand why writing 500 Million/1000 column compressed parquet to S3 takes this much time, once on S3 the small files sums up to ~35G
  • Looking to the application master UI, the job hangs on the writing stage, the transformation stage and the shuffling don't seem to be resource/time consuming.
  • I tried to tweak parquet parameters like group_size, page_size an disable_dictionnary but did not see performance improvements.
  • I tried to partition to bigger RDDs and write them to S3 in order to get bigger parquet files but the job took too much time,finally i killed it.
  • I could run the job in ~ 1 hour using a spark 2.1 stand alone cluster of 4 aws instances of type r4.16xlarge, i feel like i am using a huge cluster to achieve a small improvement, the only benefit i got is running more parallel tasks. Am i missing something ? I can maybe leverage ~ 1 To RAM to achieve this better and get bigger parquet files.

Guys do you have any feedback regarding writing large parquet file on S3 using spark?

I would like to know your opinions/ critics about this solution too.

Thanks and Regards.

4 REPLIES 4

JordanThomas
New Contributor II

This is a great question. Why has no one at databricks answered this?

HariOhmPrasath
New Contributor II

Hi,

Its really good how you explained the problem. I ran into similar issue with too many parquet files & too much time to write or stages hanging in the middle when i have to create dynamic columns (more than 1000) and write atleast 10M rows to S3.

One mistake i was making was i was doing all the operations in RDD instead of dataframe something like sqlContext().toJavaRDD or .rdd(). You lose lot of the good things once you move to rdd from dataframe. The below approach kind of worked for me, try it out and let me know

You can split your spark job into two parts:

1. Run a reduce() job to determine the dynamic column names that is required for storing the data

2. Run a flatMap() to transform the data that you received and convert them in to the schema that was created using the dynamic columns in step 1. Once that is done you can directly write them out as delta. Since delta is transactional based and does lot of book keeping you will not end up with too many parquet files.

Here is a sample of the step that i explained above:

// Part 1 --> Determine the dynamic column names
Row schemaOutput = testDataSet.reduce((ReduceFunction<Row>) (v1, v2) -> {
    List<String> set1, set2;
 
    String data = v1.size() > OUTPUT_COLUMN ? v1.getString(OUTPUT_COLUMN) : "";
    if (data.startsWith("{"))
        set1 = ScoringTransformer.getColumns(data);
    elseset1 = new LinkedList<>((Collection<String>) (Collection<?>) JavaConverters.asJavaCollectionConverter(v1.toSeq()).asJavaCollection());
 
    data = v2.size() > OUTPUT_COLUMN ? v2.getString(OUTPUT_COLUMN) : "";
    if (data.startsWith("{"))
        set2 = ScoringTransformer.getColumns(data);
    elseset2 = new LinkedList<>((Collection<String>) (Collection<?>) JavaConverters.asJavaCollectionConverter(v2.toSeq()).asJavaCollection());
 
    Set<String> columns = new LinkedHashSet<>(set1);
    columns.addAll(set2);
 
    return RowFactory.create(columns.toArray());
});
List<String> columns = new LinkedList<>((Collection<String>) (Collection<?>)
        JavaConverters.asJavaCollectionConverter(schemaOutput.toSeq()).asJavaCollection());
 
// You get a schema out for all the dynamic column names that is required for the next step
ExpressionEncoder<Row> encoder = RowEncoder.apply(ReportUtils.getSchemaBasedOnString(columns));
// Part 2 --> Run Flatmap and encode it with the schema that you created out of dynamic columns, then repartition in whatever way you want and write it out to s3
testDataSet.flatMap((FlatMapFunction<Row, Row>) row -> {
    List<Map<String, String>> data = ScoringTransformer.getTransformedData(row.getString(OUTPUT_COLUMN));
    if (data != null && !data.isEmpty())
        return data.stream()
                .map(x -> RowFactory.create(columns.parallelStream()
                        .map(y -> x.getOrDefault(y, ""))
                        .collect(Collectors.toList()).toArray()))
                .collect(Collectors.toList())
                .iterator();
 
    return new LinkedList<Row>().iterator();
}, encoder)
        .repartition(col("date_group"))
        .write()
        .option("mergeSchema", "true")
        .format(FORMAT)
        .mode("append")
        .save(s3Output + "/xyz");

HariOhmPrasath
New Contributor II

Its really good how you explained the problem. I ran into similar issue with too many parquet files & too much time to write or stages hanging in the middle when i have to create dynamic columns (more than 1000) and write atleast 10M rows to S3.

One mistake i was making was i was doing all the operations in RDD instead of dataframe something like sqlContext().toJavaRDD or .rdd(). You lose lot of the good things once you move to rdd from dataframe. The below approach kind of worked for me, try it out and let me know

You can split your spark job into two parts:

1. Run a reduce() job to determine the dynamic column names that is required for storing the data

2. Run a flatMap() to transform the data that you received and convert them in to the schema that was created using the dynamic columns in step 1. Once that is done you can directly write them out as delta. Since delta is transactional based and does lot of book keeping you will not end up with too many parquet files.

Here is a sample of the step that i explained above:

  1. // Part 1 --> Determine the dynamic column names
  2. Row schemaOutput = testDataSet.reduce((ReduceFunction<Row>)(v1, v2)->{
  3. List<String> set1, set2;
  4. String data = v1.size()> OUTPUT_COLUMN ? v1.getString(OUTPUT_COLUMN):"";
  5. if(data.startsWith("{"))
  6. set1 =ScoringTransformer.getColumns(data);
  7. elseset1 =newLinkedList<>((Collection<String>)(Collection<?>)JavaConverters.asJavaCollectionConverter(v1.toSeq()).asJavaCollection());
  8. data = v2.size()> OUTPUT_COLUMN ? v2.getString(OUTPUT_COLUMN):"";
  9. if(data.startsWith("{"))
  10. set2 =ScoringTransformer.getColumns(data);
  11. elseset2 =newLinkedList<>((Collection<String>)(Collection<?>)JavaConverters.asJavaCollectionConverter(v2.toSeq()).asJavaCollection());
  12. Set<String> columns =newLinkedHashSet<>(set1);
  13. columns.addAll(set2);
  14. returnRowFactory.create(columns.toArray());
  15. });
  1. List<String> columns =newLinkedList<>((Collection<String>)(Collection<?>)
  2. JavaConverters.asJavaCollectionConverter(schemaOutput.toSeq()).asJavaCollection());
  3. // You get a schema out for all the dynamic column names that is required for the next step
  4. ExpressionEncoder<Row> encoder =RowEncoder.apply(ReportUtils.getSchemaBasedOnString(columns));
  1. // Part 2 --> Run Flatmap and encode it with the schema that you created out of dynamic columns, then repartition in whatever way you want and write it out to s3
  2. testDataSet.flatMap((FlatMapFunction<Row,Row>) row ->{
  3. List<Map<String,String>> data =ScoringTransformer.getTransformedData(row.getString(OUTPUT_COLUMN));
  4. if(data !=null&&!data.isEmpty())
  5. return data.stream()
  6. .map(x ->RowFactory.create(columns.parallelStream()
  7. .map(y -> x.getOrDefault(y,""))
  8. .collect(Collectors.toList()).toArray()))
  9. .collect(Collectors.toList())
  10. .iterator();
  11. returnnewLinkedList<Row>().iterator();
  12. }, encoder)
  13. .repartition(col("date_group"))
  14. .write()
  15. .option("mergeSchema","true")
  16. .format(FORMAT)
  17. .mode("append")
  18. .save(s3Output +"/xyz");

EliasHaydar
New Contributor II

So you are basically creating an inverted index ?

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group