cancel
Showing results for 
Search instead for 
Did you mean: 
Community Platform Discussions
Connect with fellow community members to discuss general topics related to the Databricks platform, industry trends, and best practices. Share experiences, ask questions, and foster collaboration within the community.
cancel
Showing results for 
Search instead for 
Did you mean: 

Records are missing while filtering the dataframe in multithreading

Policepatil
New Contributor III

 

Hi, 

I need to process nearly 30 files from different locations and insert records to RDS. 

I am using multi-threading to process these files parallelly like below.  

 Test data:  

Policepatil_0-1694077661899.png

 

 

  

 

 

 

 

 

 

 

 

 I have configuration like below based on column 4: 

If column 4=0: sales records 

If column 4=1: sales detail records 

If column 4=2: discount records 

 

Sales records positions: sales_positions=[0, 1, 2, 3, 4] 

Sales records column names: sales_column_names=[store_id, tran_id, seq_no, sales_date, record_type] 

 

Sales detail records positions: item_positions=[1, 4, 5, 6] 

Sales detail records column names: item_column_names=[tran_id,  record_type, itemcd, item_price] 

 

Sales discount records positions: discount_positions=[1, 4, 5, 6] 

Sales discount records column names: discount_column_names=[tran_id, record_type, discount_type, discount_value] 

 

def map_records(df): 

    records_dict = {} 

    sales_df = df.filter(df[“4”] == “0”)\ 

            .select(*sales_positions)\ 

            .toDF(*sales_columns_name) 

    records_dict[“sales_df”] = sales_df 

    item_df = df.filter(df[“4”] == “1”)\ 

            .select(*item_positions)\ 

            .toDF(*item_columns_name) 

    records_dict[“items_df”] = item_df 

    discount_df = df.filter(df[“4”] == “2”)\ 

            .select(* discount_positions)\ 

            .toDF(* discount_columns_name) 

    records_dict[“discount_df”] = discount_df 

return records_dict 

 

def process_files(file_path): 

    df = read files 

    records_dfs = map_records(df) 

 
 

    

pool = ThreadPool(len(files_list)) 

pool.map(process_files, ((file_path) for file_path in files_list)) 

  

Issue:

When filter the records based on record type, i get df with missing records and the missing records are not same every time. sometime from file1, sometime from file2 or same file but different record.

in below operation records are missing

item_df = df.filter(df[“4”] == “1”)\ 

            .select(*item_positions)\ 

            .toDF(*item_columns_name) 

 

 

1 ACCEPTED SOLUTION

Accepted Solutions

sean_owen
Honored Contributor II
Honored Contributor II

Looks like you are comparing to strings like "1", not values like 1 in your filter condition. It's hard to say, there are some details missing like the rest of the code and the DF schema, and what output you are observing.

View solution in original post

2 REPLIES 2

Kaniz_Fatma
Community Manager
Community Manager

Hi @Policepatil , To process nearly 30 files from different locations and insert records to RDS using multi-threading, you can employ the COPY INTO command in a loop that iterates through each file path. This command will allow you to insert records from each file into an existing table.

 

The issue may be related to the parallel processing of files and the mutable state in the program.
- The problem is likely due to concurrent modification of the DataFrame in the map_records function when running in parallel threads.
- To solve the issue, ensure that each thread is working with its own copy of the DataFrame by loading the file and creating the DataFrame within the thread itself.
- If the issue persists, set the Apache Spark property spark.sql.files.ignoreCorruptFiles to true to handle corrupted or mismatched schema files.
- Modify the code by creating a map_records function that filters the DataFrame based on specific conditions and returns a dictionary of DataFrames.
- Modify the process_files function to read the parquet file and call the map_records function.
- Use a thread pool to parallelize the processing of multiple files.
- Set the Spark property spark.sql.files.ignoreCorruptFiles to true using spark.conf.set().

sean_owen
Honored Contributor II
Honored Contributor II

Looks like you are comparing to strings like "1", not values like 1 in your filter condition. It's hard to say, there are some details missing like the rest of the code and the DF schema, and what output you are observing.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group