cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

How does pyspark work in these two scenarios?

Direo
Contributor

I have two scenarios with different outcomes:

Scenario 1:

from pyspark.sql.functions import *

# create sample dataframes

df1 = spark.createDataFrame([(1, 2, 3), (2, 3, 4)], ["a", "b", "c"])

df2 = spark.createDataFrame([(1, 5, 6, 7), (2, 8, 9, 10)], ["a", "d", "e", "f"])

df3 = spark.createDataFrame([(1, 11, 12), (2, 13, 14)], ["a", "g", "h"])

# join the dataframes and select columns

base = df1.join(df2, "a", "left").join(df3, "a", "left").select("a", "b", "e", "g")

# apply filter

final = base.filter(col("c") == 1)

Scenario 2:

from pyspark.sql.functions import *

# create sample dataframes

df1 = spark.createDataFrame([(1, 2, 3), (2, 3, 4)], ["a", "b", "c"])

df2 = spark.createDataFrame([(1, 5, 6, 7), (2, 8, 9, 10)], ["a", "d", "e", "f"])

df3 = spark.createDataFrame([(1, 11, 12), (2, 13, 14)], ["a", "g", "h"])

df4 = spark.createDataFrame([(1,), (2,)], ["a"])

df5 = spark.createDataFrame([(1, "x"), (2, "y")], ["a", "z"])

# join the dataframes

base = df1.join(df2, "a", "left").join(df3, "a", "left").select("a", "b", "e", "g")

base_join = base.join(df4, "a").join(df5, "a", "left").select("a", "b")

# apply filter

final = base_join .filter(col("c") == 1)

I the first scenario filter(col("c")==1) would be applied and dataframe called final created, while on the scenario 2 it would fail with error "Column 'c' does not exist" even though both base_join df in the second scenario and base df in the first does not have this column.

Whats the difference and why it is in the physical plan?

1 ACCEPTED SOLUTION

Accepted Solutions

Anonymous
Not applicable

@Direo Direo​ :

The difference between the two scenarios lies in the execution plan generated by Spark. When Spark optimizes and executes a query, it generates a physical plan that includes all the operations that need to be performed to produce the final result. The physical plan is generated by applying a series of optimizations to the logical plan of the query, such as predicate pushdown, projection pruning, and join reordering.

In the first scenario, the filter operation is applied directly to the base dataframe, which only contains the columns "a", "b", "e", and "g". Therefore, when Spark generates the physical plan, it does not include the "c" column, as it is not needed for the filter operation.

In the second scenario, the filter operation is applied to the base_join dataframe, which is the result of a join operation that includes the "a", "b", "e", and "g" columns. However, the df1 dataframe, which contains the "c" column, is also included in the join operation. Therefore, when Spark generates the physical plan, it includes the "c" column in the join operation, even though it is not used in the final filter operation.

To fix the error in the second scenario, you can add a projection operation to

base_join to remove the "c" column before applying the filter operation:

base_join = base.join(df4, "a").join(df5, "a", "left").select("a", "b", "e", "g")
 
base_join_projected = base_join.drop("c")
 
final = base_join_projected.filter(col("c") == 1)

This will remove the "c" column from the physical plan, and the filter operation should work as expected.

View solution in original post

2 REPLIES 2

Anonymous
Not applicable

@Direo Direo​ :

The difference between the two scenarios lies in the execution plan generated by Spark. When Spark optimizes and executes a query, it generates a physical plan that includes all the operations that need to be performed to produce the final result. The physical plan is generated by applying a series of optimizations to the logical plan of the query, such as predicate pushdown, projection pruning, and join reordering.

In the first scenario, the filter operation is applied directly to the base dataframe, which only contains the columns "a", "b", "e", and "g". Therefore, when Spark generates the physical plan, it does not include the "c" column, as it is not needed for the filter operation.

In the second scenario, the filter operation is applied to the base_join dataframe, which is the result of a join operation that includes the "a", "b", "e", and "g" columns. However, the df1 dataframe, which contains the "c" column, is also included in the join operation. Therefore, when Spark generates the physical plan, it includes the "c" column in the join operation, even though it is not used in the final filter operation.

To fix the error in the second scenario, you can add a projection operation to

base_join to remove the "c" column before applying the filter operation:

base_join = base.join(df4, "a").join(df5, "a", "left").select("a", "b", "e", "g")
 
base_join_projected = base_join.drop("c")
 
final = base_join_projected.filter(col("c") == 1)

This will remove the "c" column from the physical plan, and the filter operation should work as expected.

Anonymous
Not applicable

Hi @Direo Direo​ 

Thank you for posting your question in our community! We are happy to assist you.

To help us provide you with the most accurate information, could you please take a moment to review the responses and select the one that best answers your question?

This will also help other community members who may have similar questions in the future. Thank you for your participation and let us know if you need any further assistance! 

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group