cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Get metadata of files present in a zip

seeker
New Contributor II

I have a .zip file present on an ADLS path which contains multiple files of different formats. I want to get metadata of the files like file name, modification time present in it without unzipping it. I have a code which works for smaller zip but runs into memory issues for large zip files leading to job failures. Is there a way to handle this within pyspark itself?

4 REPLIES 4

seeker
New Contributor II

 

Here is the code which i am using

 

def register_udf():
    def extract_file_metadata_from_zip(binary_content):
        metadata_list = []
        with io.BytesIO(binary_content) as bio:
            with zipfile.ZipFile(bio, "r") as zip_ref:
                for file_info in zip_ref.infolist():
                    file_name = file_info.filename
                    modification_time = datetime.datetime(*file_info.date_time)
                    metadata_list.append((file_name, modification_time))
        return metadata_list
    meta_schema = ArrayType(
        StructType(
            [
                StructField("file_name", StringType(), True),
                StructField("modification_time", TimestampType(), True),
            ]
        )
    )
    extract_metadata_udf = udf(extract_file_metadata_from_zip, meta_schema)
    return extract_metadata_udf
    
    
def get_last_modification_times(zip_file_path, expected_date, extract_metadata_udf):
    try:
        zip_file_df = (
            spark.read.format("binaryFile")
            .option("pathGlobFilter", "*.zip")
            .load(zip_file_path)
        )
        extracted_metadata_df = zip_file_df.withColumn(
            "file_metadata", extract_metadata_udf(col("content"))
        )
        exploded_metadata_df = extracted_metadata_df.select(
            explode("file_metadata").alias("metadata")
        )
        return exploded_metadata_df 
    except Exception as e:
        print("An error occurred: ", str(e))

 

 

 

Hi @seeker ,

I'm afraid there is no easy way to do that in pyspark. Spark supports the following compression formats:

  • bzip2
  • deflate
  • snappy
  • lz4
  • gzip

Thus, there is no native support for the zip format. And your solution will be slow, because you are using UDF which means it will apply this function on every row ๐Ÿ˜•

Kaniz_Fatma
Community Manager
Community Manager

Hi @seeker, Thanks for reaching out!

Please review the responses and let us know which best addresses your question. Your feedback is valuable to us and the community.

If the response resolves your issue, kindly mark it as the accepted solution. This will help close the thread and assist others with similar queries.

We appreciate your participation and are here if you need further assistance!

Kaniz_Fatma
Community Manager
Community Manager

Hi @seeker, There are only 2 ways I can think of to do it:

  • Write a UDF.
  • Write customized MapReduce logic instead of using Spark SQL.
But they are kind of the same. So I would say UDF is a good solution.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group