Hi,
I need to process nearly 30 files from different locations and insert records to RDS.
I am using multi-threading to process these files parallelly like below.
Test data:
I have configuration like below based on column 4:
If column 4=0: sales records
If column 4=1: sales detail records
If column 4=2: discount records
Sales records positions: sales_positions=[0, 1, 2, 3, 4]
Sales records column names: sales_column_names=[store_id, tran_id, seq_no, sales_date, record_type]
Sales detail records positions: item_positions=[1, 4, 5, 6]
Sales detail records column names: item_column_names=[tran_id, record_type, itemcd, item_price]
Sales discount records positions: discount_positions=[1, 4, 5, 6]
Sales discount records column names: discount_column_names=[tran_id, record_type, discount_type, discount_value]
def map_records(df):
records_dict = {}
sales_df = df.filter(df[“4”] == “0”)\
.select(*sales_positions)\
.toDF(*sales_columns_name)
records_dict[“sales_df”] = sales_df
item_df = df.filter(df[“4”] == “1”)\
.select(*item_positions)\
.toDF(*item_columns_name)
records_dict[“items_df”] = item_df
discount_df = df.filter(df[“4”] == “2”)\
.select(* discount_positions)\
.toDF(* discount_columns_name)
records_dict[“discount_df”] = discount_df
return records_dict
# I have requirement to delete header records from the dataframe, number of header rows are may be 1 or 2 or n(any number of rows)
no_of_header_rows = 1
def filter_df():
header_df = main_df.limit(no_of_header_rows)
main_df= main_df.subtract(header_df)
return map_records(main_df)
def process_files(file_path):
df = read files
records_dfs = filter_df(df)
pool = ThreadPool(len(files_list))
pool.map(process_files, ((file_path) for file_path in files_list))
Issue:
When filter the records based on record type, i get df with missing records and the missing records are not same every time. sometime from file1, sometime from file2 or same file but different record.
in below operation records are missing
item_df = df.filter(df[“4”] == “1”)\
.select(*item_positions)\
.toDF(*item_columns_name)
Due to below code(code in red) records are missing: If i comment below lines works fine.
def filter_df():
header_df = main_df.limit(no_of_header_rows)
main_df= main_df.subtract(header_df)
return map_records(main_df)
When i run with single thread there is no issue, i am facing issue while running with multiple threads.