cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Inconsistent duplicated row with Spark (Databricks on MS Azure)

whatthespark
New Contributor II

I'm having a weird behavior with Apache Spark, which I run in a Python Notebook on Azure Databricks. I have a dataframe with some data, with 2 columns of interest: name and ftime

I found that I sometime have duplicated values, sometime not, depending on how I fetch the data:

df.where(col('name') == 'test').where(col('ftime') == '2022-07-18').count()
# Result is 1

But when I run

len(df.where(col('name') == 'test').where(col('ftime') == '2022-07-18').collect())
 
# Result is 2

, I now have a result of 2 rows, which are exactly the same. Those two cells are ran one after the other, the order doesn't change anything.

I tried creating a temp view in spark with

df.createOrReplaceTempView('df_referential')

but I run in the same problem:

SELECT name, ftime, COUNT(*)
FROM df_referential
GROUP BY name, ftime
HAVING COUNT(*) > 1

returns no result, while

SELECT *
FROM df_referential
WHERE name = 'test' AND ftime = '2022-07-18'

returns two rows, perfectly identical.

And if I try to

df.filter((col('name') == 'test') & (col('ftime') == '2022-07-18')).show()

I have 2 rows, exactly identical, but

df.filter((col('name') == 'test') & (col('ftime') == '2022-07-18')).select('name', 'ftime').show()

gives only one row

I'm having a hard time understanding why this happens. I expect these to returns only one row, and the JSON file that the data is read from contains only one occurrence of the data.

If someone can point me at what I'm doing wrong, this would be of great help

4 REPLIES 4

-werners-
Esteemed Contributor III

I would like to see how you create the df dataframe.

In pyspark you can get weird results if you do not clear state, or when you reuse dataframe names.

I mount an AZ blob storage with this

try:
  dbutils.fs.mount(
    source = "wasbs://client@client.blob.core.windows.net",
    mount_point = "/mnt/client",
    extra_configs = {"fs.azure.account.key."+storage_account_name+".blob.core.windows.net": storage_account_access_key})
except Exception as e:
  pass

I then load the data JSON files with:

from pyspark.sql import types
 
def extra_cat_name(categories):
    """Extract the categories name, handling the fact that different types co-exist because of data migration"""
    true = True
    false = False
    null = None
    if isinstance(categories, str):
        categories = eval(categories)
    try:
        return [categories['translatedNames'][0]['translation']]
    except Exception as e:
        return categories
 
cat_extract = spark.udf.register("cat_extract", extra_cat_name, types.ArrayType(types.StringType()))
 
def add_time_from_filename(df, indicator):
    """Add a timestamp column extracted from the filename
    :param df: the dataframe to work on
    :param indicator: the last word before the date in the file name"""
    return df\
            .withColumn('file_name', F.input_file_name())\
            .withColumn('regtime', F.regexp_extract('file_name', f'(_{indicator}_)(.*)(\.json)', 2))\
            .withColumn('ftime', F.to_date(col("regtime"),"yyyy_MM_dd"))\
 
def load_data():
  referential_files = f"/mnt/client/{GIVEN_CLIENT_NAME}/internal_api/referential_data/*.json"
  user_files = f"/mnt/client/{GIVEN_CLIENT_NAME}/internal_api/users_data/*.json"
 
  
  df_users = spark.read.json(user_files)
  df = spark.read.json(referential_files)\
       .withColumn("data", F.explode("skills")).select("data.*")\
       .withColumn('categories', cat_extract('categories'))
  df = add_time_from_filename(df, 'referential')

-werners-
Esteemed Contributor III

ok so each time you do an action, the code for your df is executed.

I suspect if you clear state between the two cells that your issue is gone.

Or try to assign to a new dataframe like

df2 = add_time_from_filename(df, 'referential')

Thanks a lot, it seems that disabling the extra_cat_name(categories) udf fixes the duplicates, I however don't understant why, if you have an idea

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group