โ05-18-2017 02:18 AM
1 | cat1, cat2, cat3, ..., catn
2 | cat1, catx, caty, ..., anothercategory
Input data: 50 compressed csv files each file is 250 MB -> Total :12.5 GB CompressedThe purpose is to answer questions like : find all ids that belongs to Catx and Caty, find ids that belongs to cat3 and not caty etc...: ie : ids in cat1 U cat2 or ids cat3 โฉ catxAssuming that categories are dynamically created (every day i have a new set of categories) and my business wants to explore all possible intersections and unions ( we don't have a fixed set of queries) i came up with the following solution :I wrote a spark job that transforms the date into a fat sparse matrix where columns are all possible categories plus a column ID, for each row and column i set true were id belongs to this category and false if not:ID | cat1 | cat2 | cat3 |...| catn | catx | caty | anothercategory |....1 | true | true | true |...| true | false |false |false |....
2 | true |false |false |...|false | true | true | true |....
SQL can simply answer my questions, for instance if i want to find all ids that belongs to category cat1 and category catx then i run the following sql query against this matrix :Select id from MyTable where cat1 = true and catx=true;I choose to save this sparse matrix as a compressed parquet file, i made this choice with regards to the sparsity and the queries nature, i believe columnar storage is the most appropriate storage format.Now with my use case described here are my observations, i may be missing some optimization points :Guys do you have any feedback regarding writing large parquet file on S3 using spark?
I would like to know your opinions/ critics about this solution too.Thanks and Regards.
โ10-16-2017 09:07 AM
This is a great question. Why has no one at databricks answered this?
โ07-24-2018 12:43 AM
Hi,
Its really good how you explained the problem. I ran into similar issue with too many parquet files & too much time to write or stages hanging in the middle when i have to create dynamic columns (more than 1000) and write atleast 10M rows to S3.
One mistake i was making was i was doing all the operations in RDD instead of dataframe something like sqlContext().toJavaRDD or .rdd(). You lose lot of the good things once you move to rdd from dataframe. The below approach kind of worked for me, try it out and let me know
You can split your spark job into two parts:
1. Run a reduce() job to determine the dynamic column names that is required for storing the data
2. Run a flatMap() to transform the data that you received and convert them in to the schema that was created using the dynamic columns in step 1. Once that is done you can directly write them out as delta. Since delta is transactional based and does lot of book keeping you will not end up with too many parquet files.
Here is a sample of the step that i explained above:
// Part 1 --> Determine the dynamic column names
Row schemaOutput = testDataSet.reduce((ReduceFunction<Row>) (v1, v2) -> {
List<String> set1, set2;
String data = v1.size() > OUTPUT_COLUMN ? v1.getString(OUTPUT_COLUMN) : "";
if (data.startsWith("{"))
set1 = ScoringTransformer.getColumns(data);
elseset1 = new LinkedList<>((Collection<String>) (Collection<?>) JavaConverters.asJavaCollectionConverter(v1.toSeq()).asJavaCollection());
data = v2.size() > OUTPUT_COLUMN ? v2.getString(OUTPUT_COLUMN) : "";
if (data.startsWith("{"))
set2 = ScoringTransformer.getColumns(data);
elseset2 = new LinkedList<>((Collection<String>) (Collection<?>) JavaConverters.asJavaCollectionConverter(v2.toSeq()).asJavaCollection());
Set<String> columns = new LinkedHashSet<>(set1);
columns.addAll(set2);
return RowFactory.create(columns.toArray());
});
List<String> columns = new LinkedList<>((Collection<String>) (Collection<?>)
JavaConverters.asJavaCollectionConverter(schemaOutput.toSeq()).asJavaCollection());
// You get a schema out for all the dynamic column names that is required for the next step
ExpressionEncoder<Row> encoder = RowEncoder.apply(ReportUtils.getSchemaBasedOnString(columns));
// Part 2 --> Run Flatmap and encode it with the schema that you created out of dynamic columns, then repartition in whatever way you want and write it out to s3
testDataSet.flatMap((FlatMapFunction<Row, Row>) row -> {
List<Map<String, String>> data = ScoringTransformer.getTransformedData(row.getString(OUTPUT_COLUMN));
if (data != null && !data.isEmpty())
return data.stream()
.map(x -> RowFactory.create(columns.parallelStream()
.map(y -> x.getOrDefault(y, ""))
.collect(Collectors.toList()).toArray()))
.collect(Collectors.toList())
.iterator();
return new LinkedList<Row>().iterator();
}, encoder)
.repartition(col("date_group"))
.write()
.option("mergeSchema", "true")
.format(FORMAT)
.mode("append")
.save(s3Output + "/xyz");
โ07-26-2018 12:36 PM
Its really good how you explained the problem. I ran into similar issue with too many parquet files & too much time to write or stages hanging in the middle when i have to create dynamic columns (more than 1000) and write atleast 10M rows to S3.
One mistake i was making was i was doing all the operations in RDD instead of dataframe something like sqlContext().toJavaRDD or .rdd(). You lose lot of the good things once you move to rdd from dataframe. The below approach kind of worked for me, try it out and let me know
You can split your spark job into two parts:
1. Run a reduce() job to determine the dynamic column names that is required for storing the data
2. Run a flatMap() to transform the data that you received and convert them in to the schema that was created using the dynamic columns in step 1. Once that is done you can directly write them out as delta. Since delta is transactional based and does lot of book keeping you will not end up with too many parquet files.
Here is a sample of the step that i explained above:
โ08-13-2018 05:16 AM
So you are basically creating an inverted index ?
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโt want to miss the chance to attend and share knowledge.
If there isnโt a group near you, start one and help create a community that brings people together.
Request a New Group