<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Missing records while using limit in multithreading in Get Started Discussions</title>
    <link>https://community.databricks.com/t5/get-started-discussions/missing-records-while-using-limit-in-multithreading/m-p/44300#M5798</link>
    <description>&lt;P&gt;Hi,&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;I need to process nearly 30 files from different locations and insert records to RDS.&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;I am using multi-threading to process these files parallelly like below.&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;Test data:&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;span class="lia-inline-image-display-wrapper lia-image-align-left" image-alt="image.png" style="width: 717px;"&gt;&lt;img src="https://community.databricks.com/t5/image/serverpage/image-id/3660iB6374B5DBCA58688/image-size/large/is-moderation-mode/true?v=v2&amp;amp;px=999" role="button" title="image.png" alt="image.png" /&gt;&lt;/span&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;I have configuration like below based on column 4:&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;If column 4=0: sales records&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;If column 4=1: sales detail records&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;If column 4=2: discount records&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;Sales records positions: sales_positions=[0, 1, 2, 3, 4]&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;Sales records column names: sales_column_names=[store_id, tran_id, seq_no, sales_date, record_type]&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;Sales detail records positions: item_positions=[1, 4, 5, 6]&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;Sales detail records column names: item_column_names=[tran_id,&amp;nbsp; record_type, itemcd, item_price]&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;Sales discount records positions: discount_positions=[1, 4, 5, 6]&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;Sales discount records column names: discount_column_names=[tran_id,&amp;nbsp;record_type, discount_type, discount_value]&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;def map_records(df):&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; records_dict = {}&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; sales_df = df&lt;/SPAN&gt;&lt;SPAN&gt;.filter(df&lt;/SPAN&gt;&lt;SPAN&gt;[“4”] == “0”)\&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .select(*sales_positions)\&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .toDF(*sales_columns_name)&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; records_dict[“sales_df”] = sales_df&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; item_df = df.filter(df[“4”] == “1”)\&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .select(*item_positions)\&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .toDF(*item_columns_name)&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; records_dict[“items_df”] = item_df&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; discount_df = df.filter(df[“4”] == “2”)\&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .select(* discount_positions)\&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .toDF(* discount_columns_name)&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; records_dict[“discount_df”] = discount_df&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;return records_dict&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp;#&amp;nbsp;I have requirement to delete header records from the dataframe, number of header rows are may be 1 or 2 or n(any number of rows)&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;no_of_header_rows = 1&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;def filter_df():&lt;/P&gt;&lt;DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; header_df = main_df.limit(no_of_header_rows)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;DIV&gt;&lt;DIV&gt;&lt;DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; main_df= main_df.subtract(header_df)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; return map_records(main_df)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;P&gt;&lt;SPAN&gt;def&amp;nbsp;process_files(file_path):&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; df = read files&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; records_dfs = filter_df(df)&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp;&lt;BR /&gt;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;pool = ThreadPool(len(files_list))&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;pool.map(process_files, ((file_path) for file_path in files_list))&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;DIV&gt;&lt;DIV&gt;&lt;DIV&gt;&lt;DIV&gt;&lt;DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&lt;P&gt;&lt;STRONG&gt;Issue:&lt;/STRONG&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;When filter the records based on record type, i get df with missing records and the missing records are not same every time. sometime from file1, sometime from file2 or same file but different record.&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;in below operation records are missing&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;item_df = df.filter(df[“4”] == “1”)\&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .select(*item_positions)\&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .toDF(*item_columns_name)&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;Due to below code(code in red) records are missing: If i comment below lines works fine.&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;def filter_df():&lt;/P&gt;&lt;DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&lt;STRONG&gt;&amp;nbsp; &amp;nbsp; header_df = main_df.limit(no_of_header_rows)&lt;/STRONG&gt;&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;DIV&gt;&lt;DIV&gt;&lt;DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&lt;STRONG&gt;&amp;nbsp; &amp;nbsp; main_df= main_df.subtract(header_df)&lt;/STRONG&gt;&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; return map_records(main_df)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;P&gt;&amp;nbsp;When i run with single thread there is no issue, i am facing issue while running with multiple threads.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
    <pubDate>Mon, 11 Sep 2023 08:38:28 GMT</pubDate>
    <dc:creator>Policepatil</dc:creator>
    <dc:date>2023-09-11T08:38:28Z</dc:date>
    <item>
      <title>Missing records while using limit in multithreading</title>
      <link>https://community.databricks.com/t5/get-started-discussions/missing-records-while-using-limit-in-multithreading/m-p/44300#M5798</link>
      <description>&lt;P&gt;Hi,&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;I need to process nearly 30 files from different locations and insert records to RDS.&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;I am using multi-threading to process these files parallelly like below.&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;Test data:&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;span class="lia-inline-image-display-wrapper lia-image-align-left" image-alt="image.png" style="width: 717px;"&gt;&lt;img src="https://community.databricks.com/t5/image/serverpage/image-id/3660iB6374B5DBCA58688/image-size/large/is-moderation-mode/true?v=v2&amp;amp;px=999" role="button" title="image.png" alt="image.png" /&gt;&lt;/span&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;I have configuration like below based on column 4:&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;If column 4=0: sales records&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;If column 4=1: sales detail records&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;If column 4=2: discount records&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;Sales records positions: sales_positions=[0, 1, 2, 3, 4]&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;Sales records column names: sales_column_names=[store_id, tran_id, seq_no, sales_date, record_type]&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;Sales detail records positions: item_positions=[1, 4, 5, 6]&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;Sales detail records column names: item_column_names=[tran_id,&amp;nbsp; record_type, itemcd, item_price]&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;Sales discount records positions: discount_positions=[1, 4, 5, 6]&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;Sales discount records column names: discount_column_names=[tran_id,&amp;nbsp;record_type, discount_type, discount_value]&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;def map_records(df):&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; records_dict = {}&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; sales_df = df&lt;/SPAN&gt;&lt;SPAN&gt;.filter(df&lt;/SPAN&gt;&lt;SPAN&gt;[“4”] == “0”)\&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .select(*sales_positions)\&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .toDF(*sales_columns_name)&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; records_dict[“sales_df”] = sales_df&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; item_df = df.filter(df[“4”] == “1”)\&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .select(*item_positions)\&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .toDF(*item_columns_name)&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; records_dict[“items_df”] = item_df&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; discount_df = df.filter(df[“4”] == “2”)\&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .select(* discount_positions)\&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .toDF(* discount_columns_name)&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; records_dict[“discount_df”] = discount_df&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;return records_dict&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp;#&amp;nbsp;I have requirement to delete header records from the dataframe, number of header rows are may be 1 or 2 or n(any number of rows)&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;no_of_header_rows = 1&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;def filter_df():&lt;/P&gt;&lt;DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; header_df = main_df.limit(no_of_header_rows)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;DIV&gt;&lt;DIV&gt;&lt;DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; main_df= main_df.subtract(header_df)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; return map_records(main_df)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;P&gt;&lt;SPAN&gt;def&amp;nbsp;process_files(file_path):&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; df = read files&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; records_dfs = filter_df(df)&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp;&lt;BR /&gt;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;pool = ThreadPool(len(files_list))&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;pool.map(process_files, ((file_path) for file_path in files_list))&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;DIV&gt;&lt;DIV&gt;&lt;DIV&gt;&lt;DIV&gt;&lt;DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&lt;P&gt;&lt;STRONG&gt;Issue:&lt;/STRONG&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;When filter the records based on record type, i get df with missing records and the missing records are not same every time. sometime from file1, sometime from file2 or same file but different record.&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;in below operation records are missing&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;item_df = df.filter(df[“4”] == “1”)\&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .select(*item_positions)\&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .toDF(*item_columns_name)&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;Due to below code(code in red) records are missing: If i comment below lines works fine.&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;def filter_df():&lt;/P&gt;&lt;DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&lt;STRONG&gt;&amp;nbsp; &amp;nbsp; header_df = main_df.limit(no_of_header_rows)&lt;/STRONG&gt;&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;DIV&gt;&lt;DIV&gt;&lt;DIV&gt;&lt;DIV&gt;&lt;FONT color="#FF0000"&gt;&lt;EM&gt;&lt;STRONG&gt;&amp;nbsp; &amp;nbsp; main_df= main_df.subtract(header_df)&lt;/STRONG&gt;&lt;/EM&gt;&lt;/FONT&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; return map_records(main_df)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;P&gt;&amp;nbsp;When i run with single thread there is no issue, i am facing issue while running with multiple threads.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Mon, 11 Sep 2023 08:38:28 GMT</pubDate>
      <guid>https://community.databricks.com/t5/get-started-discussions/missing-records-while-using-limit-in-multithreading/m-p/44300#M5798</guid>
      <dc:creator>Policepatil</dc:creator>
      <dc:date>2023-09-11T08:38:28Z</dc:date>
    </item>
  </channel>
</rss>

