cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Support for Parquet brotli compression or a work around

Erik_L
Contributor II

Spark 3.3.1 supports the brotli compression codec, but when I use it to read parquet files from S3, I get:

INVALID_ARGUMENT: Unsupported codec for Parquet page: BROTLI

Example code:

df = (spark.read.format("parquet")
    .option("compression", "brotli")
    .load("s3://<bucket>/<path>/<file>.parquet")
df.write.saveAsTable("tmp_test")

I have a large amount of data stored with this compression, so switching right now would be difficult. It looks like Koalas supports it or I could manually ingest it by spinning up my own Spark session, but that would defeat the point of having Databricks / Delta Lake / Autoloader. Any suggestions on a work around?

edit:

More output:

Caused by: java.lang.RuntimeException: INVALID_ARGUMENT: Unsupported codec for Parquet page: BROTLI
	at com.databricks.sql.io.caching.NativePageWriter$.create(Native Method)
	at com.databricks.sql.io.caching.DiskCache$PageWriter.<init>(DiskCache.scala:318)
	at com.databricks.sql.io.parquet.CachingPageReadStore$UnifiedCacheColumn.populate(CachingPageReadStore.java:1183)
	at com.databricks.sql.io.parquet.CachingPageReadStore$UnifiedCacheColumn.lambda$getPageReader$0(CachingPageReadStore.java:1177)
	at com.databricks.sql.io.caching.NativeDiskCache$.get(Native Method)
	at com.databricks.sql.io.caching.DiskCache.get(DiskCache.scala:515)
	at com.databricks.sql.io.parquet.CachingPageReadStore$UnifiedCacheColumn.getPageReader(CachingPageReadStore.java:1178)
	at com.databricks.sql.io.parquet.CachingPageReadStore.getPageReader(CachingPageReadStore.java:1012)
	at com.databricks.sql.io.parquet.DatabricksVectorizedParquetRecordReader.checkEndOfRowGroup(DatabricksVectorizedParquetRecordReader.java:741)
	at com.databricks.sql.io.parquet.DatabricksVectorizedParquetRecordReader.nextBatch(DatabricksVectorizedParquetRecordReader.java:603)

1 ACCEPTED SOLUTION

Accepted Solutions

Erik_L
Contributor II

Given the new information I appended, I looked into the Delta caching and I can disable it:

.option("spark.databricks.io.cache.enabled", False)

This works as a work around while I read these files in to save them locally in DBFS, but does it have performance repercussions? I'm only doing this to ingest files from S3 uploaded from an external process. I'm worried there might be a larger number of reads from S3 increasing ingestion costs.

View solution in original post

3 REPLIES 3

Right. In the description there, it says the precedence is in order of 'compression' followed b y 'parquet.compression' followed by this option. As you can see in the code above, I am using 'compression', but I did test with this option as well. Same error.

I believe this to be an issue specific to Databrick's layer over Spark / Delta Tables, most likely that they have a codec validation and didn't add brotli, as its addition to Spark is 'more recent'.

Erik_L
Contributor II

Given the new information I appended, I looked into the Delta caching and I can disable it:

.option("spark.databricks.io.cache.enabled", False)

This works as a work around while I read these files in to save them locally in DBFS, but does it have performance repercussions? I'm only doing this to ingest files from S3 uploaded from an external process. I'm worried there might be a larger number of reads from S3 increasing ingestion costs.

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.