07-06-2023 07:24 AM
Hi,
Quite excited to see the new release of databricks-connect, I started writing unit tests running pyspark on a databricks cluster using databricks-connect.
After some successful basic unit tests, I tested just more chained transformations on a dataframe including some forward fills, simple arithmetics, linear regressions slopes calculations via pandas udf. Nothing fancy. Then when running a test, I got the following error:
E pyspark.errors.exceptions.connect.SparkConnectGrpcException: <_MultiThreadedRendezvous of RPC that terminated with:
E status = StatusCode.UNKNOWN
E details = ""
E debug_error_string = "UNKNOWN:Error received from peer {grpc_message:"", grpc_status:2, created_time:"2023-07-06T13:29:00.033340701+00:00"}"
I do not get this error when I remove one simple column (a constant literal), and I do not get this error either if I run the same code on Databricks directly.
The error seems to point to grpc and a limitation of databricks-connect. Has anyone encountered this, and is there a place where we can check what current limitations of databricks-connect are?
07-06-2023 07:56 AM
Often times just writing a question helps resolve it. For anyone facing issues with databricks-connect that don't show up using databricks directly, here is a list of limitations (rtfm to me):
https://learn.microsoft.com/en-us/azure/databricks/dev-tools/databricks-connect#limitations
And in particular watch for the size of the dataframe. databricks-connect doesn't support dataframes larger than 128 MB, which is not much. Hopefully next releases will allow larger dataframes.
Hope this helps!
07-06-2023 08:56 AM
Well my bad, I thought this was the issue, but eventually I reduced the number of rows so that the size became less than 1 MB, and it still failed with the same error, so I still don't know why this fails with databricks-connect, and I have checked that all used spark functions support spark connect.
So if anyone has any idea, thanks for sharing.
08-14-2023 03:30 AM
To add a bit more here I don't even this that the 128MB limit is really a limit. You can set "spark.connect.grpc.maxInboundMessageSize" to a larger value and also override the default limit on the client side by using a custom gRPC ChannelBuilder.
09-14-2023 12:27 AM
Hey @jrand , can you shine some light as to where and how you set this setting?
I'm hitting the same issue right when we were started to get excited about databricks-connect.
i can see the setting in the spark-connect documentation but not in the databricks-connect one, i'm unsure as to where i can override that setting.
i'm on 13.3.0
thanks in advance,
dsa
01-22-2024 03:10 AM
This is the PR that introduced the configurable limit: https://github.com/apache/spark/pull/40447/files
08-26-2023 03:26 AM - edited 08-26-2023 03:28 AM
I have the same issue, but with databricks-connect==13.2.1
Code to reproduce:
col_num = 49
data = [tuple([f'val_{n}' for n in range(1, col_num + 1)])]
df = spark.createDataFrame(data=data)
for i in range(1, len(data[0]) + 1):
df = df.withColumnRenamed(f'_{i}', f'col_{i}')
df.printSchema()
The error is a bit different though:
pyspark.errors.exceptions.connect.SparkConnectGrpcException: <_InactiveRpcError of RPC that terminated with:
status = StatusCode.UNKNOWN
details = ""
debug_error_string = "UNKNOWN:Error received from peer {created_time:"2023-08-26T12:06:04.41660107+02:00", grpc_status:2, grpc_message:""}"
>
And if col_num == 48, everytning is fine. If col_num > 49, I will get a segfault without any error message.
08-26-2023 03:52 AM - edited 08-26-2023 04:51 AM
Looks like it ugly hits some limit of nested messages in protobuf.
Because this script generates a flat plan (as opposed to my example in the previous message) and works fine:
col_num = 1000
data = [tuple([f'val_{n}' for n in range(1, col_num + 1)])]
df = spark.createDataFrame(data=data)
collumns_renaming = []
for i in range(1, len(data[0]) + 1):
collumns_renaming.append(
f.col(f'_{i}').alias(f'col_{i}')
)
df = df.select(*collumns_renaming)
df.printSchema()
But databricks-connect could handle it somehow or prevent segfaults, I suppose. And documented limitations would be nice.
10-17-2023 12:52 PM - edited 10-17-2023 12:53 PM
We are having what appears to be the same segfault issue when running some of our larger chained functions (likely with quite large plans). However, we can reliably trigger this by looping over `withColumns` to increase the plan size. Our case is also a segfault and ends up in the protobuf library as well. Has anyone found success with increasing `spark.connect.grpc.maxInboundMessageSize`, besides refactoring to flatten the plan as much as possible.
10-20-2023 11:12 AM
I doubled the `spark.connect.grpc.maxInboundMessageSize` parameter to 256mb but that didn't appear to resolve anything.
10-23-2023 12:08 AM
We've got the following info from databricks support which might be of interest for you:
1. “Received message larger than max” error for files with row size smaller than 128MB (For example parquet file you provided)
2. “Received message larger than max” error for files with row size larger than 128 MB. (For example, the binary file where rows are > 128 MB)
So as far as i understand it - you can't change the GRPC options when using databricks connect currently.
10-23-2023 12:35 PM - edited 10-23-2023 12:46 PM
Seemingly our issue doesn't contain a "larger than max" error. Out of curiosity I tried to double the maxRecordsPerBatch to 20000 and reduced it to 200 and it didn't appear to help.
```
Fatal Python error: Segmentation fault
Thread 0x0000ffff7bfff120 (most recent call first):
File "/usr/local/lib/python3.10/dist-packages/pyspark/sql/connect/client/core.py", line 1537 in _ping_handler
File "/usr/lib/python3.10/threading.py", line 953 in run
File "/usr/lib/python3.10/threading.py", line 1016 in _bootstrap_inner
File "/usr/lib/python3.10/threading.py", line 973 in _bootstrap
Current thread 0x0000ffffb5415420 (most recent call first):
File "/usr/local/lib/python3.10/dist-packages/google/protobuf/message.py", line 126 in CopyFrom
File "/usr/local/lib/python3.10/dist-packages/pyspark/sql/connect/plan.py", line 524 in plan
File "/usr/local/lib/python3.10/dist-packages/pyspark/sql/connect/plan.py", line 608 in plan
File "/usr/local/lib/python3.10/dist-packages/pyspark/sql/connect/plan.py", line 799 in plan
File "/usr/local/lib/python3.10/dist-packages/pyspark/sql/connect/plan.py", line 702 in plan
File "/usr/local/lib/python3.10/dist-packages/pyspark/sql/connect/plan.py", line 118 in to_proto
File "/usr/local/lib/python3.10/dist-packages/pyspark/sql/connect/dataframe.py", line 1654 in toPandas
File "/home/powerwatch-data-analysis/pwdata/analysis/clustering.py", line 138 in run
File "/home/powerwatch-data-analysis/pwdata/analysis/clustering.py", line 195 in _run_stdbscan
File "/home/powerwatch-data-analysis/pwdata/analysis/clustering.py", line 243 in process
File "/home/powerwatch-data-analysis/pwdata/pipeline/core/pipeline.py", line 184 in __new__
File "/home/powerwatch-data-analysis/pwdata/analysis/clustering.py", line 292 in process
File "/home/powerwatch-data-analysis/pwdata/pipeline/core/pipeline.py", line 184 in __new__
File "/home/powerwatch-data-analysis/pwdata/analysis/outages/outage_estimation.py", line 74 in process
File "/home/powerwatch-data-analysis/pwdata/pipeline/core/pipeline.py", line 184 in __new__
File "/home/powerwatch-data-analysis/pwdata/overrides/sierra_leone/outage_estimation.py", line 23 in process
File "/home/powerwatch-data-analysis/pwdata/pipeline/core/pipeline.py", line 184 in __new__
File "/home/powerwatch-data-analysis/pwdata/kpis/outage.py", line 551 in process
File "/home/powerwatch-data-analysis/pwdata/pipeline/core/pipeline.py", line 184 in __new__
File "/home/powerwatch-data-analysis/pwdata/kpis/outage.py", line 168 in process
File "/home/powerwatch-data-analysis/pwdata/pipeline/core/pipeline.py", line 184 in __new__
File "<stdin>", line 1 in <module>
Extension modules: numpy.core._multiarray_umath, numpy.core._multiarray_tests, numpy.linalg._umath_linalg, numpy.fft._pocketfft_internal, numpy.random._common, numpy.random.bit_generator, numpy.random._bounded_integers, numpy.random._mt19937, numpy.random.mtrand, numpy.random._philox, numpy.random._pcg64, numpy.random._sfc64, numpy.random._generator, psutil._psutil_linux, psutil._psutil_posix, charset_normalizer.md, grpc._cython.cygrpc, pyarrow.lib, pyarrow._hdfsio, pandas._libs.tslibs.np_datetime, pandas._libs.tslibs.dtypes, pandas._libs.tslibs.base, pandas._libs.tslibs.nattype, pandas._libs.tslibs.timezones, pandas._libs.tslibs.ccalendar, pandas._libs.tslibs.fields, pandas._libs.tslibs.timedeltas, pandas._libs.tslibs.tzconversion, pandas._libs.tslibs.timestamps, pandas._libs.properties, pandas._libs.tslibs.offsets, pandas._libs.tslibs.strptime, pandas._libs.tslibs.parsing, pandas._libs.tslibs.conversion, pandas._libs.tslibs.period, pandas._libs.tslibs.vectorized, pandas._libs.ops_dispatch, pandas._libs.missing, pandas._libs.hashtable, pandas._libs.algos, pandas._libs.interval, pandas._libs.lib, pandas._libs.ops, pyarrow._compute, pandas._libs.arrays, pandas._libs.tslib, pandas._libs.sparse, pandas._libs.indexing, pandas._libs.index, pandas._libs.internals, pandas._libs.join, pandas._libs.writers, pandas._libs.window.aggregations, pandas._libs.window.indexers, pandas._libs.reshape, pandas._libs.groupby, pandas._libs.json, pandas._libs.parsers, pandas._libs.testing, google._upb._message, yaml._yaml, shapely.lib, shapely._geos, shapely._geometry_helpers, pyproj._compat, pyproj._datadir, pyproj._network, pyproj._geod, pyproj.list, pyproj._crs, pyproj.database, pyproj._transformer, pyproj._sync, scipy._lib._ccallback_c, scipy.sparse._sparsetools, _csparsetools, scipy.sparse._csparsetools, scipy.sparse.linalg._isolve._iterative, scipy.linalg._fblas, scipy.linalg._flapack, scipy.linalg.cython_lapack, scipy.linalg._cythonized_array_utils, scipy.linalg._solve_toeplitz, scipy.linalg._decomp_lu_cython, scipy.linalg._matfuncs_sqrtm_triu, scipy.linalg.cython_blas, scipy.linalg._matfuncs_expm, scipy.linalg._decomp_update, scipy.linalg._flinalg, scipy.sparse.linalg._dsolve._superlu, scipy.sparse.linalg._eigen.arpack._arpack, scipy.sparse.csgraph._tools, scipy.sparse.csgraph._shortest_path, scipy.sparse.csgraph._traversal, scipy.sparse.csgraph._min_spanning_tree, scipy.sparse.csgraph._flow, scipy.sparse.csgraph._matching, scipy.sparse.csgraph._reordering, scipy.spatial._ckdtree, scipy._lib.messagestream, scipy.spatial._qhull, scipy.spatial._voronoi, scipy.spatial._distance_wrap, scipy.spatial._hausdorff, scipy.special._ufuncs_cxx, scipy.special._ufuncs, scipy.special._specfun, scipy.special._comb, scipy.special._ellip_harm_2, scipy.spatial.transform._rotation (total: 110)
/usr/local/bin/pyspark: line 60: 232 Segmentation fault $PYSPARK_DRIVER_PYTHON
```
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group