cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Working with a text file that is both compressed by bz2 followed by zip in PySpark

MichTalebzadeh
Contributor
 I have downloaded Am azon reviews for sentiment analysis from here. The file is not particularly large (just over 500MB) but comes in the following format

test.ft.txt.bz2.zip

So it is a text file that is compressed by bz2 followed by zip. Now I like to do all these operations in PySpark. In PySpark a file cannot have both .bz2 and .zip simultaneously..

The way I do it is to  place the downloaded file in a local directory. Then just do some operations that are simple but messy.. I try to unzip the file using zipfile package. This works with bash style filename. as opposed to python style filename "file:///.." This necessitates using different style, one for OS type for zip and the other Python style to read bz2 file directory into df in PySpark
 

 

 

import os
import zipfile
data_path = "file:///d4T/hmduser/sentiments/"
input_file_path = os.path.join(data_path, "test.ft.txt.bz2")
output_file_path = os.path.join(data_path, "review_text_file")
dir_name = "/d4T/hmduser/sentiments/"
zipped_file=os.path.join(dir_name, "test.ft.txt.bz2.zip")
bz2_file=os.path.join(dir_name, "test.ft.txt.bz2")
try:
    # Unzip the file
    with zipfile.ZipFile(zipped_file, 'r') as zip_ref:
        zip_ref.extractall(os.path.dirname(bz2_file))
   
    # Now bz2_file should contain the path to the unzipped file
    print(f"Unzipped file: {bz2_file}")
except Exception as e:
    print(f"Error during unzipping: {str(e)}")

# Load the bz2 file into a DataFrame
df = spark.read.text(input_file_path)
# Remove the '__label__1' and '__label__2' prefixes
df = df.withColumn("review_text", expr("regexp_replace(value, '__label__[12] ', '')"))​

 

 

Then the rest is just spark-ml

Once I finished I remove the bz2 file to clean-up

 

 

if os.path.exists(bz2_file):  # Check if bz2 file exists
  try:
    os.remove(bz2_file)
    print(f"Successfully deleted {bz2_file}")
  except OSError as e:
    print(f"Error deleting {bz2_file}: {e}")
else:
    print(f"bz2 file {bz2_file} could not be found")

 

 

My question is can these operations be done more efficiently in Pyspark itself ideally with one df operation reading the original file (.bz2.zip)?
 
Thanks

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom

 
Mich Talebzadeh | Technologist | Data | Generative AI | Financial Fraud
London
United Kingdom

view my Linkedin profile



https://en.everybodywiki.com/Mich_Talebzadeh



Disclaimer: The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner Von Braun)".
2 REPLIES 2

Kaniz
Community Manager
Community Manager

Hi @MichTalebzadehLet’s optimize the process by directly reading the original .bz2.zip file into a PySpark DataFrame. We’ll achieve this using the spark.read.text() method, which can handle both the zip and bz2 formats seamlessly.

Here’s an improved version of your code:

from pyspark.sql import SparkSession
from pyspark.sql.functions import expr
import os

# Create a Spark session
spark = SparkSession.builder.appName('AmazonReviewsSentiment').getOrCreate()

# Define the path to your downloaded file
data_path = "file:///d4T/hmduser/sentiments/"
input_file_path = os.path.join(data_path, "test.ft.txt.bz2.zip")

try:
    # Read the bz2.zip file directly into a DataFrame
    df = spark.read.text(input_file_path)

    # Remove the '__label__1' and '__label__2' prefixes
    df = df.withColumn("review_text", expr("regexp_replace(value, '__label__[12] ', ''))"))

    # Now you can proceed with your Spark ML operations
    # ...

    # Clean up: Remove the downloaded file (no need to unzip)
    if os.path.exists(input_file_path):
        try:
            os.remove(input_file_path)
            print(f"Successfully deleted {input_file_path}")
        except OSError as e:
            print(f"Error deleting {input_file_path}: {e}")
    else:
        print(f"File {input_file_path} could not be found")
except Exception as e:
    print(f"Error reading the file: {str(e)}")

In this version:

  • We directly read the .bz2.zip file using spark.read.text().
  • The unzipping step is unnecessary since Spark can handle both formats.
  • The cleanup step removes the downloaded file without needing to unzip it.

This approach is more efficient and avoids unnecessary intermediate steps. Happy analyzing! 🚀

MichTalebzadeh
Contributor

Thanks for your reply @Kaniz 

On the face of it spark can handle both .bz2 and .zip . It practice it does not work with both at the same time. You end up with ineligible characters as text. I suspect it handles decompression of outer layer (in this case unzip) but leaves the other one as is.. Sorry I could not post it. 

In other words, PySpark can do one unzip or bz2 -d but not both at the same time.


Cheers

 

 

 

Mich Talebzadeh | Technologist | Data | Generative AI | Financial Fraud
London
United Kingdom

view my Linkedin profile



https://en.everybodywiki.com/Mich_Talebzadeh



Disclaimer: The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner Von Braun)".
Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.