cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Pyspark Pandas column or index name appears to persist after being dropped or removed.

Callum
New Contributor II

So, I have this code for merging dataframes with pyspark pandas. And I want the index of the left dataframe to persist throughout the joins. So following suggestions from others wanting to keep the index after merging, I set the index to a column before the merge, and then bring it back to index after the merge and remove the index name.

import pyspark.pandas as ps
 
 
def merge_dataframes(left=None, right=None, how='inner', on=None, left_on=None,
                     right_on=None, left_index=False, right_index=False):
    merging_left = left.copy()
    merging_left['weird_index_name'] = merging_left.index
 
    new_df = merging_left.merge(right, on=on, how=how, left_on=left_on, right_on=right_on, suffixes=('', '_dupe_right'),
                                left_index=left_index, right_index=right_index)
 
    returning_df = new_df.set_index('weird_index_name')
 
    returning_df.index.name = None
    return returning_df
 
 
df_1 = ps.DataFrame({
    'join_column': [1, 2, 3, 4],
    'value1': ['A', 'B', 'C', 'D']
}, index=['Index1', 'Index2', 'Index3', 'Index4'])
df_2 = ps.DataFrame({
    'join_column': [1, 2, 3, 4, 5],
    'value2': ['a', 'b', 'c', 'd', 'e']
})
df_3 = ps.DataFrame({
    'join_column': [1, 2, 3, 4, 6, 7],
    'value3': [1.1, 2.2, 3.3, 4.4, 6.6, 7.7]
})
 
input_list = [df_1, df_2, df_3]
 
expected_result = ps.DataFrame({
    'join_column': [1, 2, 3, 4],
    'value1': ['A', 'B', 'C', 'D'],
    'value2': ['a', 'b', 'c', 'd'],
    'value3': [1.1, 2.2, 3.3, 4.4]
}, index=['Index1', 'Index2', 'Index3', 'Index4'])
 
final_df = input_list[0]
for next_df in input_list[1:]:
    final_df = merge_dataframes(left=final_df, right=next_df, how='left', on='join_column')
 
print(final_df)

This works perfectly fine for merging two dataframes but as soon as I have a list of dataframes to be merged together using a for loop. I get this error:

AnalysisException                         Traceback (most recent call last)
<command-1950358519842329> in <module>
     42 final_df = input_list[0]
     43 for next_df in input_list[1:]:
---> 44     final_df = merge_dataframes(left=final_df, right=next_df, how='left', on='join_column')
     45 
     46 print(final_df)
 
<command-1950358519842329> in merge_dataframes(left, right, how, on, left_on, right_on, left_index, right_index)
      7     merging_left['weird_index_name'] = merging_left.index
      8 
----> 9     new_df = merging_left.merge(right, on=on, how=how, left_on=left_on, right_on=right_on, suffixes=('', '_dupe_right'),
     10                                 left_index=left_index, right_index=right_index)
     11 
 
/databricks/spark/python/pyspark/pandas/usage_logging/__init__.py in wrapper(*args, **kwargs)
    192             start = time.perf_counter()
    193             try:
--> 194                 res = func(*args, **kwargs)
    195                 logger.log_success(
    196                     class_name, function_name, time.perf_counter() - start, signature
 
/databricks/spark/python/pyspark/pandas/frame.py in merge(self, right, how, on, left_on, right_on, left_index, right_index, suffixes)
   7655             )
   7656 
-> 7657         left_internal = self._internal.resolved_copy
   7658         right_internal = resolve(right._internal, "right")
   7659 
 
/databricks/spark/python/pyspark/pandas/utils.py in wrapped_lazy_property(self)
    578     def wrapped_lazy_property(self):
    579         if not hasattr(self, attr_name):
--> 580             setattr(self, attr_name, fn(self))
    581         return getattr(self, attr_name)
    582 
 
/databricks/spark/python/pyspark/pandas/internal.py in resolved_copy(self)
   1169         return self.copy(
   1170             spark_frame=sdf,
-> 1171             index_spark_columns=[scol_for(sdf, col) for col in self.index_spark_column_names],
   1172             data_spark_columns=[scol_for(sdf, col) for col in self.data_spark_column_names],
   1173         )
 
/databricks/spark/python/pyspark/pandas/internal.py in <listcomp>(.0)
   1169         return self.copy(
   1170             spark_frame=sdf,
-> 1171             index_spark_columns=[scol_for(sdf, col) for col in self.index_spark_column_names],
   1172             data_spark_columns=[scol_for(sdf, col) for col in self.data_spark_column_names],
   1173         )
 
/databricks/spark/python/pyspark/pandas/utils.py in scol_for(sdf, column_name)
    590 def scol_for(sdf: SparkDataFrame, column_name: str) -> Column:
    591     """Return Spark Column for the given column name."""
--> 592     return sdf["`{}`".format(column_name)]
    593 
    594 
 
/databricks/spark/python/pyspark/sql/dataframe.py in __getitem__(self, item)
   1775         """
   1776         if isinstance(item, str):
-> 1777             jc = self._jdf.apply(item)
   1778             return Column(jc)
   1779         elif isinstance(item, Column):
 
/databricks/spark/python/lib/py4j-0.10.9.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1302 
   1303         answer = self.gateway_client.send_command(command)
-> 1304         return_value = get_return_value(
   1305             answer, self.gateway_client, self.target_id, self.name)
   1306 
 
/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
    121                 # Hide where the exception came from that shows a non-Pythonic
    122                 # JVM exception message.
--> 123                 raise converted from None
    124             else:
    125                 raise
 
AnalysisException: Reference 'weird_index_name' is ambiguous, could be: weird_index_name, weird_index_name.

From my understanding this suggests there is another column called "weird_index_name". However, displaying the dataframe before going into the merge I get a dataframe where there is only one column "weird_index_name" on both calls of the function.

This led me to think that "weird_index_name" persists as the index name after:

returning_df = new_df.set_index('weird_index_name')
returning_df.index.name = None

However, printing out the returning_df.index.name returns None after the merge of the first two dataframes, before the second call of the function. This is also true when printing the merging_left.index.name before the second call of the function.

If I introduce this line before the merge:

print(merging_left.index.to_list())

The merge of the first two dataframes works and then I get this error when trying to join on the third (second call of the function) at that line:

Length mismatch: Expected axis has 5 elements, new values have 4 elements

This leads me to believe there is an issue with the index and "weird_index_name" somehow persists.

Any help would be appreciated!!

3 REPLIES 3

irfanaziz
Contributor II

This worked for me:

import pandas as ps
 
 
 
def merge_dataframes(left=None, right=None, how='inner', on=None, left_on=None,
                     right_on=None, left_index=False, right_index=False):
    merging_left = left.copy()
    merging_left['weird_index_name'] = merging_left.index
 
    new_df = merging_left.merge(right, on=on, how=how, left_on=left_on, right_on=right_on, suffixes=('', '_dupe_right'),
                                left_index=left_index, right_index=right_index)
 
    returning_df = new_df.set_index('weird_index_name')
 
    returning_df.index.name = None
    return returning_df
 
 
 
df_1 = ps.DataFrame({
    'join_column': [1, 2, 3, 4],
    'value1': ['A', 'B', 'C', 'D']
}, index=['Index1', 'Index2', 'Index3', 'Index4'])
df_2 = ps.DataFrame({
    'join_column': [1, 2, 3, 4, 5],
    'value2': ['a', 'b', 'c', 'd', 'e']
})
df_3 = ps.DataFrame({
    'join_column': [1, 2, 3, 4, 5, 6],
    'value3': [1.1, 2.2, 3.3, 4.4, 6.6, 7.7]
})
 
input_list = [df_1, df_2, df_3]
 
 
 
#print(type(input_list[0]))
final_df = input_list[0]
#print(final_df)
 
 
for next_df in input_list[1:]:
    final_df = merge_dataframes(left=final_df, right=next_df, how='left', on='join_column')
 
print(final_df)

Callum
New Contributor II

Yeah, I found it works perfectly fine in normal pandas but not in pyspark.pandas, ultimately, I want to use pyspark.pandas. Apologies, I should have included that in the original post. It appears to be a pyspark problem.

Serlal
New Contributor III

Hi!

I tried debugging your code and I think that the error you get is simply because the column exists in two instances of your dataframe within your loop.

I tried adding some extra debug lines in your merge_dataframes function:

Screenshot 2023-01-31 at 11.46.10and after executing that, I also executed the rest of the code but I did stop before the loop.

Instead of running the loop, i broke the code down and try to run it piece by piece.

First lets load the first df:

Screenshot 2023-01-31 at 11.49.07 

Then lets use the merge trying to use the first element (1).

Screenshot 2023-01-31 at 11.54.55 

We can see that the "weird_index_name" gets created on this df.

To show why it would fail, i will execute final_df = input_list[0] again to reinitialize the df and then run the merge on the second element instead:fi

Screenshot 2023-01-31 at 11.57.40 

You can see that in both cases the intermediate dataframe is created with the same column "weird_index_name" .

So if you try to run your original code, in the third iteration there is already a dataframe with this column which explains why you get the error: Reference 'weird_index_name' is ambiguous

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group