cancel
Showing results for 
Search instead for 
Did you mean: 
Get Started Discussions
Start your journey with Databricks by joining discussions on getting started guides, tutorials, and introductory topics. Connect with beginners and experts alike to kickstart your Databricks experience.
cancel
Showing results for 
Search instead for 
Did you mean: 

Missing records while using limit in multithreading

Policepatil
New Contributor III

Hi,

I need to process nearly 30 files from different locations and insert records to RDS. 

I am using multi-threading to process these files parallelly like below. 

Test data: 

image.png

 

 

 

 

 

 

 

 

 

 

 

 

I have configuration like below based on column 4: 

If column 4=0: sales records 

If column 4=1: sales detail records 

If column 4=2: discount records 

Sales records positions: sales_positions=[0, 1, 2, 3, 4] 

Sales records column names: sales_column_names=[store_id, tran_id, seq_no, sales_date, record_type] 

 

Sales detail records positions: item_positions=[1, 4, 5, 6] 

Sales detail records column names: item_column_names=[tran_id,  record_type, itemcd, item_price] 

 

Sales discount records positions: discount_positions=[1, 4, 5, 6] 

Sales discount records column names: discount_column_names=[tran_id, record_type, discount_type, discount_value] 

def map_records(df): 

    records_dict = {} 

    sales_df = df.filter(df[“4”] == “0”)\ 

            .select(*sales_positions)\ 

            .toDF(*sales_columns_name) 

    records_dict[“sales_df”] = sales_df 

    item_df = df.filter(df[“4”] == “1”)\ 

            .select(*item_positions)\ 

            .toDF(*item_columns_name) 

    records_dict[“items_df”] = item_df 

    discount_df = df.filter(df[“4”] == “2”)\ 

            .select(* discount_positions)\ 

            .toDF(* discount_columns_name) 

    records_dict[“discount_df”] = discount_df 

return records_dict 

 # I have requirement to delete header records from the dataframe, number of header rows are may be 1 or 2 or n(any number of rows)

no_of_header_rows = 1

def filter_df():

    header_df = main_df.limit(no_of_header_rows)
    main_df= main_df.subtract(header_df)
    return map_records(main_df)

def process_files(file_path): 

    df = read files 

    records_dfs = filter_df(df) 

 

pool = ThreadPool(len(files_list)) 

pool.map(process_files, ((file_path) for file_path in files_list)) 

 

 
 

Issue:

When filter the records based on record type, i get df with missing records and the missing records are not same every time. sometime from file1, sometime from file2 or same file but different record.

in below operation records are missing

item_df = df.filter(df[“4”] == “1”)\ 

            .select(*item_positions)\ 

            .toDF(*item_columns_name) 

Due to below code(code in red) records are missing: If i comment below lines works fine.

def filter_df():

    header_df = main_df.limit(no_of_header_rows)
    main_df= main_df.subtract(header_df)
    return map_records(main_df)

 When i run with single thread there is no issue, i am facing issue while running with multiple threads.

 

0 REPLIES 0

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now