cancel
Showing results for 
Search instead for 
Did you mean: 
Community Discussions
Connect with fellow community members to discuss general topics related to the Databricks platform, industry trends, and best practices. Share experiences, ask questions, and foster collaboration within the community.
cancel
Showing results for 
Search instead for 
Did you mean: 

create UDF in pyspark

LiliL
New Contributor

Hi, 

Need the help of this community, unfortunately creating udfs is not my strongest skill set.

I need to create UDF that will join two tables together, the problem is that one table has two id columns 

Name Table has id1 and id2 

Transaction Table has only id

I need to join Transaction table with id table. the constraint is if id2 in name table populated then join with id2 else join with id1 

I have tried some things but none of them is 100% correct. 

LiliL_0-1690464091302.png

LiliL_1-1690464169973.png

Thank you

 

1 REPLY 1

Siebert_Looije
Contributor

Hi,

I am not sure if I understand your question directly but let me give it a try:
- The constraint is if id2 in name table populated then join with id2: So I think you can also could first make a column called 'id' in which you get id2 if it is populated and id1 if it is not populated like:

from pyspark.sql import functions as F

name_table = name_table.withColumn('id', F.when(F.col('id2').notnull(), F.col('id2')).otherwise(F.col('id1'))

- I need to join Transaction table with id table. 
After creating this, you can use this new id to join the names on the transaction table:

transaction_table = transaction_table.join(name_table, on=['id'],how='inner')

So expect that you would to do a inner join here, but it could also be a different join. More info is here: https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.join.html

Furthermore, if the name_table is a small table then it is suggest to use F.broadcast to make the join faster/smoother:

from pyspark.sql import functions as F
transaction_table = transaction_table.join(F.broadcast(name_table), on=['id'],how='inner')

More information is here: https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.Broadcast.html.

I hope this helps a bit and else please ask further.

 

Join 100K+ Data Experts: Register Now & Grow with Us!

Excited to expand your horizons with us? Click here to Register and begin your journey to success!

Already a member? Login and join your local regional user group! If there isn’t one near you, fill out this form and we’ll create one for you to join!