04-12-2023 11:30 AM
Hi, I have a dataFrame that I've been able to convert into a struct with each row being a JSON object.
I want the ability to split the data frame into 1MB chunks. Once I have the chunks, I would like to add all rows in each respective chunk into a single JSON array.
04-13-2023 03:06 AM
# Set the chunk size in bytes (1MB = 1048576 bytes)
chunk_size = 1048576
# Fix the number of partitions needed based on the chunk size
num_partitions = (df.rdd.map(lambda x: len(str(x))).sum() // chunk_size) + 1
# Repartition the DataFrame using maxRecordsPerFile option
df_chunks = df.repartition(num_partitions).write.option("maxRecordsPerFile", chunk_size).mode("overwrite").parquet("/output_path")
you can split the dataframe using the method above.
Once you have the partitions, traverse across each partition and collect the rows indiviudally. you can then add up these rows for each partition.
#DAIS2023
04-13-2023 12:10 PM
Thanks thats very helpful! As far as "traversing across each partition and collect the rows individually. you can then add up these rows for each partition" would this be a simple for loop or is there a recommended/optimized way of traversing & collecting the rows?
04-16-2023 12:21 AM
@Uma Maheswara Rao Desula : Glad to see your participation in the Raffle contest! 🙂
04-13-2023 12:10 PM
Thanks thats very helpful! As far as "traversing across each partition and collect the rows individually. you can then add up these rows for each partition" would this be a simple for loop or is there a recommended/optimized way of traversing & collecting the rows?
04-16-2023 12:23 AM
@Tamoor Mirza :
You can use the to_json method of a DataFrame to convert each chunk to a JSON string, and then append those JSON strings to a list. Here is an example code snippet that splits a DataFrame into 1MB chunks and creates a list of JSON arrays, with each row in each chunk being an array element:
import json
# assume df is your DataFrame
chunk_size = 1_000_000 # 1MB chunk size
json_arrays = []
for start in range(0, len(df), chunk_size):
end = min(start + chunk_size, len(df))
chunk = df.iloc[start:end]
json_str = chunk.to_json(orient='records')
json_array = json.loads(json_str)
json_arrays.append(json_array)
# merge all JSON arrays into a single array
merged_json_array = sum(json_arrays, [])
# convert the merged JSON array to a JSON string
merged_json_str = json.dumps(merged_json_array)
In the above code, we first define the chunk size as 1MB. We then loop over the DataFrame, slicing it into chunks of the specified size using the iloc method. For each chunk, we use the to_json method to convert it to a JSON string with the orient parameter set to 'records' , which produces a list of JSON objects (one for each row). We then use json.loads to parse the JSON string into a list of dictionaries. We append each list of dictionaries (which corresponds to a chunk of the DataFrame) to the
json_arrays list.
After we have processed all the chunks, we merge all the JSON arrays into a single array using the built-in sum function. Finally, we convert the merged JSON array to a JSON string using json.dumps.
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group