cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Handling of millions of xml in json files

Bbren
New Contributor

Hi all, i have some questions related to the handling of many smalls files and possible improvements and augmentations.

We have many small xml files. These files are previously processed by another system that puts them in our datalake, but as an additional step wraps the initial xml files in json files with some metadata. This json is flat and composed in jsonl format.

We currently use ADF to get triggered by the new files, after which it adds them to a delta lake table (DT1). This table has a column for each field, of which one is the original xml data. These xml files can be quite big, so this table with about 20M rows is 150GB.

The step after this table is to unpack the xml fields with a UDF and we get an usable version of the data inside the XML, also as delta table (DT2). The problem is that this initial delta table is extremely slow and big. Just doing a count takes 5 minutes. The table is currently not partitioned. We can add this, but apart from faster search I'm not sure if that helps other queries such as a count. I had a hard time finding info about which metadata is computed when and used when in delta lake.

We have considered auto-loader but since DLT is in it's very early stages we were not super keen on using this. The additional issue of not being able to run things interactively also seemed like a pretty ineffective structure, but maybe I'm missing something and this is the way to go. Any thoughts on better ways are appreciated.

All our steps after the initial ADF load are using pyspark structured streaming. Just running the setup of DT1 to DT2 takes about 2 days with a fairly big cluster, whereas we could process these using dbutils and the python variant of the UDF in about a day with a single node. So there seems to be some fundamental slowdowns that I am missing in the pyspark processing.

I have a hard time profiling the spark UI and understanding what is happing and where it is slowing down. This has helped in not really knowing how to move forward. I have looked at the pyspark profiler, but yet to run it. This could be helpful although seeing what I'm expecting, which is that the UDF is slow, is not gonna help per se.

Execution plan from DT1 to DT2:

== Physical Plan ==

*(1) Project [pythonUDF0#24304.Col1 AS COl1#24048, ... 97 more fields]

+- BatchEvalPython [<lambda>(body#24021, path#24025)#24037], [pythonUDF0#24304]

+- InMemoryTableScan [body#24021, path#24025], false

+- InMemoryRelation [body#24021, checksum#24022, format#24023, id#24024, path#24025, size#24026, timestamp#24027], StorageLevel(disk, memory, deserialized, 1 replicas)

+- *(1) ColumnarToRow

+- FileScan parquet [body#249,checksum#250,format#251,id#252,path#253,size#254,timestamp#255] Batched: true, DataFilters: [], Format: Parquet, Location: PreparedDeltaFileIndex(1 paths)[abfss://container@datalake.dfs.core.windows.net/bronz..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<body:string,checksum:string,format:string,id:string,path:string,size:int,timestamp:string>

Looking at this now, it seems that the UDF is applied at the first step rather than maybe splitting the data and then applying the UDF. Could that be an improvement?

1 ACCEPTED SOLUTION

Accepted Solutions

Anonymous
Not applicable

@Bauke Brenninkmeijer​ :

It sounds like you are facing a few performance and scalability challenges in your current process. Here are some thoughts and suggestions:

  1. Partitioning: Partitioning the Delta table can definitely help with query performance, especially for count queries. When you partition a table, Spark can prune the partitions that don't need to be read, leading to significant query speed-up. You can partition the Delta table based on a column that is frequently used in queries (e.g., timestamp), or any other column that makes sense for your data.
  2. Delta Lake optimization: Delta Lake provides several features that can help with performance and scalability, such as Delta caching, Z-Ordering, and auto-optimization. Delta caching allows you to cache frequently accessed tables in memory or on disk for faster query performance. Z-Ordering helps organize the data in a Delta table based on a particular column to improve query performance. Auto-optimization automatically applies various optimizations to a Delta table to improve query performance.
  3. Use PySpark instead of Structured Streaming: It sounds like you are using Structured Streaming for your processing steps. While Structured Streaming is great for handling real-time data streams, it might not be the most efficient solution for batch processing of large amounts of data. You could try using PySpark instead, which provides more flexibility and control over your processing pipeline.
  4. Consider using Databricks Runtime: If you're not already using it, consider using Databricks Runtime, which is a fully managed and optimized version of Apache Spark. Databricks Runtime includes many optimizations and features that can help with performance and scalability, such as Delta Lake integration, auto-scaling, and GPU support.
  5. Splitting data: It's possible that splitting the data and then applying the UDF could improve performance, depending on the size and structure of your data. You could try using the repartition method to split the data into smaller partitions before applying the UDF.
  6. Monitoring Spark UI: The Spark UI can provide valuable insights into the performance of your Spark jobs. You can use the Spark UI to monitor the execution plan, identify bottlenecks, and optimize your queries.

I hope these suggestions help you optimize your processing pipeline and improve performance and scalability.

View solution in original post

2 REPLIES 2

Anonymous
Not applicable

@Bauke Brenninkmeijer​ :

It sounds like you are facing a few performance and scalability challenges in your current process. Here are some thoughts and suggestions:

  1. Partitioning: Partitioning the Delta table can definitely help with query performance, especially for count queries. When you partition a table, Spark can prune the partitions that don't need to be read, leading to significant query speed-up. You can partition the Delta table based on a column that is frequently used in queries (e.g., timestamp), or any other column that makes sense for your data.
  2. Delta Lake optimization: Delta Lake provides several features that can help with performance and scalability, such as Delta caching, Z-Ordering, and auto-optimization. Delta caching allows you to cache frequently accessed tables in memory or on disk for faster query performance. Z-Ordering helps organize the data in a Delta table based on a particular column to improve query performance. Auto-optimization automatically applies various optimizations to a Delta table to improve query performance.
  3. Use PySpark instead of Structured Streaming: It sounds like you are using Structured Streaming for your processing steps. While Structured Streaming is great for handling real-time data streams, it might not be the most efficient solution for batch processing of large amounts of data. You could try using PySpark instead, which provides more flexibility and control over your processing pipeline.
  4. Consider using Databricks Runtime: If you're not already using it, consider using Databricks Runtime, which is a fully managed and optimized version of Apache Spark. Databricks Runtime includes many optimizations and features that can help with performance and scalability, such as Delta Lake integration, auto-scaling, and GPU support.
  5. Splitting data: It's possible that splitting the data and then applying the UDF could improve performance, depending on the size and structure of your data. You could try using the repartition method to split the data into smaller partitions before applying the UDF.
  6. Monitoring Spark UI: The Spark UI can provide valuable insights into the performance of your Spark jobs. You can use the Spark UI to monitor the execution plan, identify bottlenecks, and optimize your queries.

I hope these suggestions help you optimize your processing pipeline and improve performance and scalability.

Anonymous
Not applicable

Hi @Bauke Brenninkmeijer​ 

Thank you for posting your question in our community! We are happy to assist you.

To help us provide you with the most accurate information, could you please take a moment to review the responses and select the one that best answers your question?

This will also help other community members who may have similar questions in the future. Thank you for your participation and let us know if you need any further assistance! 

Join 100K+ Data Experts: Register Now & Grow with Us!

Excited to expand your horizons with us? Click here to Register and begin your journey to success!

Already a member? Login and join your local regional user group! If there isn’t one near you, fill out this form and we’ll create one for you to join!