a week ago
As part of a function I create df1 and df2 and aim to stack them and output the results. But the results do not display within the function, nor if I output the results and display after.
results = df1.unionByName(df2, allowMissingColumns=False)
display(results)
This is the error:
SparkConnectGrpcException: <_InactiveRpcError of RPC that terminated with:
status = StatusCode.RESOURCE_EXHAUSTED
details = "CLIENT: Sent message larger than max (200529144 vs. 134217728)"
df1 is 350 rows and df2 is 1500 rows. They share the same seven ordered columns, and I have checked they share the same schema. Though df1 does have pure nulls for c3 and c4.
|-- c1: long (nullable = true)
|-- c2: string (nullable = true)
|-- c3: string (nullable = true)
|-- c4: string (nullable = true)
|-- c5: string (nullable = true)
|-- c6: string (nullable = true)
|-- c7: double (nullable = true)
The input df into the function is 3800 rows and 8 columns, the function is not complicated, and yet resource is exhausted trying to union two tiny outputs. I can display df1 and df2 before the union, it is the union that crashes Databricks.
I tried manually inputting the data and creating two dataframes, and they union in less than 1s. In the same project I have used unionByName to union larger dataframes, as part of more complicated functions. Plus, this function used to work when I used test data smaller than 350 and 1500 rows.
What solutions could I try to fix this, repartitioning doesn't help. Thank you.
a week ago
Hi @CEH ,
Increase the message size limit from the default of 64 MB by changing the value of the spark.sql.session.localRelationCacheThreshold configuration. You can start by trying twice the default, or 128MB. Experiment with increasing further if the issue persists.
Alternatively, take advantage of temporary (temp) views. Using temp views in the intermediary steps caches the table and uses a cached_relation instead of a local_relation. Cached relations do not have a maximum message size, allowing you to avoid a message size limit.
The following code demonstrates how to use temp views. Write df_union as a temp view and then read it on every step of the loop to ensure the message being sent uses a cached relation.
# dfs is a list of dataframes to aggregate with union commands. df_union stores the aggregation, and starts with the first dataframe in the list.
df_union = dfs[0]
#loop through all other dataframes in the list performing unions
for df in dfs[1:]:
df_union = df_union.union(df)
#create the temp view
df_union.createOrReplaceTempView("df_union")
#make it so df_union now have data coming from the temp view and can take advantage of the cached_relation
df_union = spark.sql("SELECT * FROM df_union")
Source: RESOURCES_EXHAUSTED error message when trying to perform self-joins with Spark Connect - Databricks
a week ago
Hi @szymon_dybczak,
Thank you for your reply, but your advice doesn't work for me. I am unable to change the message size because I have to use Serverless. Also, I tried your code using temp but that also doesn't work. It gives this error:
SparkConnectGrpcException: <_MultiThreadedRendezvous of RPC that terminated with:
status = StatusCode.RESOURCE_EXHAUSTED
details = "CLIENT: Sent message larger than max (200529416 vs. 134217728)"
yesterday
Hey @CEH,
What youโre running into looks like a Spark Connect gRPC message-size limit, not a computational failure with the union itself. Even with smallish row counts, the serialized payload (either the inlined query plan or Arrow batch results) can blow past the default 128 MB gRPC cap and trigger a RESOURCE_EXHAUSTED error โ the classic โSent message larger than max (โฆ vs. 134217728)โ
Local-relation inlining explodes plan size.
If df1/df2 are created from local Python data, Spark inlines that data as a local_relation. When you union (or self-join) them, that data is duplicated in the plan, and the serialized plan can easily exceed 128 MB even if the row count looks tiny.
Arrow batches can exceed the limit.
display() or collect() sends Arrow batches back to the client. A single batch with wide string columns or many rows can tip the scale.
Limit is hard-coded at 128 MB.
Some configs exist to raise it, but Databricks environments may not honor them yet.
1๏ธโฃ Materialize first, then union via SQL.
This swaps local_relation for a cached relation.
df1.createOrReplaceTempView("df1")
df2.createOrReplaceTempView("df2")
results = spark.sql("""
SELECT c1, c2, c3, c4, c5, c6, c7 FROM df1
UNION ALL
SELECT c1, c2, c3, c4, c5, c6, c7 FROM df2
""")
display(results.limit(1000))
2๏ธโฃ Persist and read back (Delta or managed tables).
Catalog-backed relations avoid the message-size constraint.
df1.write.mode("overwrite").saveAsTable("tmp.df1")
df2.write.mode("overwrite").saveAsTable("tmp.df2")
results = spark.sql("""
SELECT * FROM tmp.df1
UNION ALL
SELECT * FROM tmp.df2
""")
display(results.limit(1000))
3๏ธโฃ Reduce Arrow batch size.
Smaller batches โ smaller gRPC messages.
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "2000")
display(results)
4๏ธโฃ Truncate wide string columns before display.
from pyspark.sql.functions import col, substring
small = results.select(
col("c1"),
*[substring(col(c), 1, 2000).alias(c) for c in ["c2","c3","c4","c5","c6"]],
col("c7")
).limit(1000)
display(small)
5๏ธโฃ Avoid big local Python objects.
Parallelize before creating DataFrames:
rdd = spark.sparkContext.parallelize(local_rows)
df1 = spark.createDataFrame(rdd, schema=my_schema)
6๏ธโฃ (Advanced) Try raising the limit โ if allowed:
spark.conf.set("spark.connect.grpc.maxInboundMessageSize", 268435456) # 256 MB
May or may not be honored depending on your workspace setup.
Repartitioning changes distribution, not the size of the serialized Arrow batch or query plan. The failure happens during serialization when the message crosses the 128 MB threshold โ not during computation.
Convert df1/df2 to temp views and UNION ALL via SQL.
Lower spark.sql.execution.arrow.maxRecordsPerBatch.
Truncate long string columns before display.
If data comes from local Python, use parallelize() or persist & read back.
Hope this helps, Lou.
yesterday
Hi @Louis_Frolio ,
1. I've suggested this approach already in my first reply - unfortunately it didn't help
3, 5, 6 - won't work here because @CEH is using serverless compute
2,4 - worth a try ๐
Passionate about hosting events and connecting people? Help us grow a vibrant local communityโsign up today to get started!
Sign Up Now