<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Records are missing while filtering the dataframe in multithreading in Get Started Discussions</title>
    <link>https://community.databricks.com/t5/get-started-discussions/records-are-missing-while-filtering-the-dataframe-in/m-p/43933#M5789</link>
    <description>&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;DIV&gt;&lt;P&gt;&lt;SPAN&gt;Hi,&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;I need to process nearly 30 files from different locations and insert records to RDS.&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;I am using multi-threading to process these files parallelly like below.&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp;Test data:&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;span class="lia-inline-image-display-wrapper lia-image-align-left" image-alt="Policepatil_0-1694077661899.png" style="width: 753px;"&gt;&lt;img src="https://community.databricks.com/t5/image/serverpage/image-id/3543i6F6148AED612F7CD/image-size/large/is-moderation-mode/true?v=v2&amp;amp;px=999" role="button" title="Policepatil_0-1694077661899.png" alt="Policepatil_0-1694077661899.png" /&gt;&lt;/span&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp;I have configuration like below based on column 4:&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;If column 4=0: sales records&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;If column 4=1: sales detail records&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;If column 4=2: discount records&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;Sales records positions: sales_positions=[0, 1, 2, 3, 4]&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;Sales records column names: sales_column_names=[store_id, tran_id, seq_no, sales_date, record_type]&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;Sales detail records positions: item_positions=[1, 4, 5, 6]&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;Sales detail records column names: item_column_names=[tran_id,&amp;nbsp; record_type, itemcd, item_price]&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;Sales discount records positions: discount_positions=[1, 4, 5, 6]&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;Sales discount records column names: discount_column_names=[tran_id,&amp;nbsp;record_type, discount_type, discount_value]&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;def map_records(df):&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; records_dict = {}&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; sales_df = df&lt;/SPAN&gt;&lt;SPAN&gt;.filter(df&lt;/SPAN&gt;&lt;SPAN&gt;[“4”] == “0”)\&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .select(*sales_positions)\&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .toDF(*sales_columns_name)&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; records_dict[“sales_df”] = sales_df&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; item_df = df.filter(df[“4”] == “1”)\&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .select(*item_positions)\&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .toDF(*item_columns_name)&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; records_dict[“items_df”] = item_df&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; discount_df = df.filter(df[“4”] == “2”)\&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .select(* discount_positions)\&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .toDF(* discount_columns_name)&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; records_dict[“discount_df”] = discount_df&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;return records_dict&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;def&amp;nbsp;process_files(file_path):&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; df = read files&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; records_dfs = map_records(df)&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp;&lt;BR /&gt;&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp;&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;pool = ThreadPool(len(files_list))&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;pool.map(process_files, ((file_path) for file_path in files_list))&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;Issue:&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;When filter the records based on record type, i get df with missing records and the missing records are not same every time. sometime from file1, sometime from file2 or same file but different record.&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;in below operation records are missing&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;item_df = df.filter(df[“4”] == “1”)\&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .select(*item_positions)\&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .toDF(*item_columns_name)&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;/DIV&gt;</description>
    <pubDate>Thu, 07 Sep 2023 09:17:43 GMT</pubDate>
    <dc:creator>Policepatil</dc:creator>
    <dc:date>2023-09-07T09:17:43Z</dc:date>
    <item>
      <title>Records are missing while filtering the dataframe in multithreading</title>
      <link>https://community.databricks.com/t5/get-started-discussions/records-are-missing-while-filtering-the-dataframe-in/m-p/43933#M5789</link>
      <description>&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;DIV&gt;&lt;P&gt;&lt;SPAN&gt;Hi,&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;I need to process nearly 30 files from different locations and insert records to RDS.&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;I am using multi-threading to process these files parallelly like below.&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp;Test data:&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;span class="lia-inline-image-display-wrapper lia-image-align-left" image-alt="Policepatil_0-1694077661899.png" style="width: 753px;"&gt;&lt;img src="https://community.databricks.com/t5/image/serverpage/image-id/3543i6F6148AED612F7CD/image-size/large/is-moderation-mode/true?v=v2&amp;amp;px=999" role="button" title="Policepatil_0-1694077661899.png" alt="Policepatil_0-1694077661899.png" /&gt;&lt;/span&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp;I have configuration like below based on column 4:&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;If column 4=0: sales records&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;If column 4=1: sales detail records&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;If column 4=2: discount records&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;Sales records positions: sales_positions=[0, 1, 2, 3, 4]&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;Sales records column names: sales_column_names=[store_id, tran_id, seq_no, sales_date, record_type]&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;Sales detail records positions: item_positions=[1, 4, 5, 6]&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;Sales detail records column names: item_column_names=[tran_id,&amp;nbsp; record_type, itemcd, item_price]&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;Sales discount records positions: discount_positions=[1, 4, 5, 6]&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;Sales discount records column names: discount_column_names=[tran_id,&amp;nbsp;record_type, discount_type, discount_value]&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;def map_records(df):&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; records_dict = {}&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; sales_df = df&lt;/SPAN&gt;&lt;SPAN&gt;.filter(df&lt;/SPAN&gt;&lt;SPAN&gt;[“4”] == “0”)\&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .select(*sales_positions)\&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .toDF(*sales_columns_name)&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; records_dict[“sales_df”] = sales_df&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; item_df = df.filter(df[“4”] == “1”)\&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .select(*item_positions)\&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .toDF(*item_columns_name)&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; records_dict[“items_df”] = item_df&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; discount_df = df.filter(df[“4”] == “2”)\&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .select(* discount_positions)\&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .toDF(* discount_columns_name)&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; records_dict[“discount_df”] = discount_df&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;return records_dict&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;def&amp;nbsp;process_files(file_path):&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; df = read files&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; records_dfs = map_records(df)&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp;&lt;BR /&gt;&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp;&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;pool = ThreadPool(len(files_list))&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;pool.map(process_files, ((file_path) for file_path in files_list))&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;Issue:&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;When filter the records based on record type, i get df with missing records and the missing records are not same every time. sometime from file1, sometime from file2 or same file but different record.&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;in below operation records are missing&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;item_df = df.filter(df[“4”] == “1”)\&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .select(*item_positions)\&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; .toDF(*item_columns_name)&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;/DIV&gt;</description>
      <pubDate>Thu, 07 Sep 2023 09:17:43 GMT</pubDate>
      <guid>https://community.databricks.com/t5/get-started-discussions/records-are-missing-while-filtering-the-dataframe-in/m-p/43933#M5789</guid>
      <dc:creator>Policepatil</dc:creator>
      <dc:date>2023-09-07T09:17:43Z</dc:date>
    </item>
    <item>
      <title>Re: Records are missing while filtering the dataframe in multithreading</title>
      <link>https://community.databricks.com/t5/get-started-discussions/records-are-missing-while-filtering-the-dataframe-in/m-p/44168#M5791</link>
      <description>&lt;P&gt;Looks like you are comparing to strings like "1", not values like 1 in your filter condition. It's hard to say, there are some details missing like the rest of the code and the DF schema, and what output you are observing.&lt;/P&gt;</description>
      <pubDate>Sat, 09 Sep 2023 15:49:24 GMT</pubDate>
      <guid>https://community.databricks.com/t5/get-started-discussions/records-are-missing-while-filtering-the-dataframe-in/m-p/44168#M5791</guid>
      <dc:creator>sean_owen</dc:creator>
      <dc:date>2023-09-09T15:49:24Z</dc:date>
    </item>
  </channel>
</rss>

