Here is the code which i am using
def register_udf():
def extract_file_metadata_from_zip(binary_content):
metadata_list = []
with io.BytesIO(binary_content) as bio:
with zipfile.ZipFile(bio, "r") as zip_ref:
for file_info in zip_ref.infolist():
file_name = file_info.filename
modification_time = datetime.datetime(*file_info.date_time)
metadata_list.append((file_name, modification_time))
return metadata_list
meta_schema = ArrayType(
StructType(
[
StructField("file_name", StringType(), True),
StructField("modification_time", TimestampType(), True),
]
)
)
extract_metadata_udf = udf(extract_file_metadata_from_zip, meta_schema)
return extract_metadata_udf
def get_last_modification_times(zip_file_path, expected_date, extract_metadata_udf):
try:
zip_file_df = (
spark.read.format("binaryFile")
.option("pathGlobFilter", "*.zip")
.load(zip_file_path)
)
extracted_metadata_df = zip_file_df.withColumn(
"file_metadata", extract_metadata_udf(col("content"))
)
exploded_metadata_df = extracted_metadata_df.select(
explode("file_metadata").alias("metadata")
)
return exploded_metadata_df
except Exception as e:
print("An error occurred: ", str(e))