Hi,
I need to process nearly 30 files from different locations and insert records to RDS.
I am using multi-threading to process these files parallelly like below.
Test data:
I have configuration like below based on column 4:
If column 4=0: sales records
If column 4=1: sales detail records
If column 4=2: discount records
Sales records positions: sales_positions=[0, 1, 2, 3, 4]
Sales records column names: sales_column_names=[store_id, tran_id, seq_no, sales_date, record_type]
Sales detail records positions: item_positions=[1, 4, 5, 6]
Sales detail records column names: item_column_names=[tran_id, record_type, itemcd, item_price]
Sales discount records positions: discount_positions=[1, 4, 5, 6]
Sales discount records column names: discount_column_names=[tran_id, record_type, discount_type, discount_value]
def map_records(df):
records_dict = {}
sales_df = df.filter(df[“4”] == “0”)\
.select(*sales_positions)\
.toDF(*sales_columns_name)
records_dict[“sales_df”] = sales_df
item_df = df.filter(df[“4”] == “1”)\
.select(*item_positions)\
.toDF(*item_columns_name)
records_dict[“items_df”] = item_df
discount_df = df.filter(df[“4”] == “2”)\
.select(* discount_positions)\
.toDF(* discount_columns_name)
records_dict[“discount_df”] = discount_df
return records_dict
def process_files(file_path):
df = read files
records_dfs = map_records(df)
pool = ThreadPool(len(files_list))
pool.map(process_files, ((file_path) for file_path in files_list))
Issue:
When filter the records based on record type, i get df with missing records and the missing records are not same every time. sometime from file1, sometime from file2 or same file but different record.
in below operation records are missing
item_df = df.filter(df[“4”] == “1”)\
.select(*item_positions)\
.toDF(*item_columns_name)