cancel
Showing results for 
Search instead for 
Did you mean: 
Community Platform Discussions
Connect with fellow community members to discuss general topics related to the Databricks platform, industry trends, and best practices. Share experiences, ask questions, and foster collaboration within the community.
cancel
Showing results for 
Search instead for 
Did you mean: 

Records are missing while filtering the dataframe in multithreading

Policepatil
New Contributor III

 

Hi, 

I need to process nearly 30 files from different locations and insert records to RDS. 

I am using multi-threading to process these files parallelly like below.  

 Test data:  

Policepatil_0-1694077661899.png

 

 

  

 

 

 

 

 

 

 

 

 I have configuration like below based on column 4: 

If column 4=0: sales records 

If column 4=1: sales detail records 

If column 4=2: discount records 

 

Sales records positions: sales_positions=[0, 1, 2, 3, 4] 

Sales records column names: sales_column_names=[store_id, tran_id, seq_no, sales_date, record_type] 

 

Sales detail records positions: item_positions=[1, 4, 5, 6] 

Sales detail records column names: item_column_names=[tran_id,  record_type, itemcd, item_price] 

 

Sales discount records positions: discount_positions=[1, 4, 5, 6] 

Sales discount records column names: discount_column_names=[tran_id, record_type, discount_type, discount_value] 

 

def map_records(df): 

    records_dict = {} 

    sales_df = df.filter(df[“4”] == “0”)\ 

            .select(*sales_positions)\ 

            .toDF(*sales_columns_name) 

    records_dict[“sales_df”] = sales_df 

    item_df = df.filter(df[“4”] == “1”)\ 

            .select(*item_positions)\ 

            .toDF(*item_columns_name) 

    records_dict[“items_df”] = item_df 

    discount_df = df.filter(df[“4”] == “2”)\ 

            .select(* discount_positions)\ 

            .toDF(* discount_columns_name) 

    records_dict[“discount_df”] = discount_df 

return records_dict 

 

def process_files(file_path): 

    df = read files 

    records_dfs = map_records(df) 

 
 

    

pool = ThreadPool(len(files_list)) 

pool.map(process_files, ((file_path) for file_path in files_list)) 

  

Issue:

When filter the records based on record type, i get df with missing records and the missing records are not same every time. sometime from file1, sometime from file2 or same file but different record.

in below operation records are missing

item_df = df.filter(df[“4”] == “1”)\ 

            .select(*item_positions)\ 

            .toDF(*item_columns_name) 

 

 

1 ACCEPTED SOLUTION

Accepted Solutions

sean_owen
Honored Contributor II

Looks like you are comparing to strings like "1", not values like 1 in your filter condition. It's hard to say, there are some details missing like the rest of the code and the DF schema, and what output you are observing.

View solution in original post

1 REPLY 1

sean_owen
Honored Contributor II

Looks like you are comparing to strings like "1", not values like 1 in your filter condition. It's hard to say, there are some details missing like the rest of the code and the DF schema, and what output you are observing.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group