cancel
Showing results for 
Search instead for 
Did you mean: 
Get Started Discussions
cancel
Showing results for 
Search instead for 
Did you mean: 

databricks-connect 13.1.0 limitations

thibault
Contributor

Hi,

Quite excited to see the new release of databricks-connect, I started writing unit tests running pyspark on a databricks cluster using databricks-connect.

After some successful basic unit tests, I tested just more chained transformations on a dataframe including some forward fills, simple arithmetics, linear regressions slopes calculations via pandas udf. Nothing fancy. Then when running a test, I got the following error:

E           pyspark.errors.exceptions.connect.SparkConnectGrpcException: <_MultiThreadedRendezvous of RPC that terminated with:
E           	status = StatusCode.UNKNOWN
E           	details = ""
E           	debug_error_string = "UNKNOWN:Error received from peer  {grpc_message:"", grpc_status:2, created_time:"2023-07-06T13:29:00.033340701+00:00"}"

I do not get this error when I remove one simple column (a constant literal), and I do not get this error either if I run the same code on Databricks directly.

The error seems to point to grpc and a limitation of databricks-connect. Has anyone encountered this, and is there a place where we can check what current limitations of databricks-connect are?

11 REPLIES 11

thibault
Contributor

Often times just writing a question helps resolve it. For anyone facing issues with databricks-connect that don't show up using databricks directly, here is a list of limitations (rtfm to me):

https://learn.microsoft.com/en-us/azure/databricks/dev-tools/databricks-connect#limitations

And in particular watch for the size of the dataframe. databricks-connect doesn't support dataframes larger than 128 MB, which is not much. Hopefully next releases will allow larger dataframes.

Hope this helps!

Well my bad, I thought this was the issue, but eventually I reduced the number of rows so that the size became less than 1 MB, and it still failed with the same error, so I still don't know why this fails with databricks-connect, and I have checked that all used spark functions support spark connect. 

So if anyone has any idea, thanks for sharing.

jrand
New Contributor III

To add a bit more here I don't even this that the 128MB limit is really a limit. You can set "spark.connect.grpc.maxInboundMessageSize" to a larger value and also override the default limit on the client side by using a custom gRPC ChannelBuilder.

dsa
New Contributor II

Hey @jrand , can you shine some light as to where and how you set this setting?
I'm hitting the same issue right when we were started to get excited about databricks-connect.
i can see the setting in the spark-connect documentation but not in the databricks-connect one, i'm unsure as to where i can override that setting.

i'm on 13.3.0

thanks in advance,
dsa

jrand
New Contributor III

This is the PR that introduced the configurable limit: https://github.com/apache/spark/pull/40447/files

safonov
New Contributor II

I have the same issue, but with databricks-connect==13.2.1
Code to reproduce:

col_num = 49
data = [tuple([f'val_{n}' for n in range(1, col_num + 1)])]
df = spark.createDataFrame(data=data)
for i in range(1, len(data[0]) + 1):
    df = df.withColumnRenamed(f'_{i}', f'col_{i}')
df.printSchema()

The error is a bit different though:

pyspark.errors.exceptions.connect.SparkConnectGrpcException: <_InactiveRpcError of RPC that terminated with:
status = StatusCode.UNKNOWN
details = ""
debug_error_string = "UNKNOWN:Error received from peer {created_time:"2023-08-26T12:06:04.41660107+02:00", grpc_status:2, grpc_message:""}"
>

And if col_num == 48, everytning is fine. If col_num > 49, I will get a segfault without any error message.

safonov
New Contributor II

Looks like it ugly hits some limit of nested messages in protobuf.

Because this script generates a flat plan (as opposed to my example in the previous message) and works fine:

col_num = 1000
data = [tuple([f'val_{n}' for n in range(1, col_num + 1)])]
df = spark.createDataFrame(data=data)
collumns_renaming = []
for i in range(1, len(data[0]) + 1):
    collumns_renaming.append(
        f.col(f'_{i}').alias(f'col_{i}')
    )
df = df.select(*collumns_renaming)
df.printSchema()


But databricks-connect could handle it somehow or prevent segfaults, I suppose. And documented limitations would be nice.

jackson-nline
New Contributor III

We are having what appears to be the same segfault issue when running some of our larger chained functions (likely with quite large plans). However, we can reliably trigger this by looping over `withColumns` to increase the plan size. Our case is also a segfault and ends up in the protobuf library as well. Has anyone found success with increasing `spark.connect.grpc.maxInboundMessageSize`, besides refactoring to flatten the plan as much as possible.

jackson-nline
New Contributor III

I doubled the `spark.connect.grpc.maxInboundMessageSize` parameter to 256mb but that didn't appear to resolve anything.

dsa
New Contributor II

We've got the following info from databricks support which might be of interest for you:

1. “Received message larger than max” error for files with row size smaller than 128MB (For example parquet file you provided)

  • We were able to mitigate this issue by adjusting maxRecordsPerBatch.
  • The engineering team has prepared and merged a fix which should resolve this issue.
  • After the fix is deployed, any file containing a row size smaller than 128 MB will not receive this error.
  • This fix will be part of our next maintenance release (The next tentative maintenance is scheduled between the 23rd to 29th Oct in multiple stages. Meanwhile, you can use maxRecordsPerBatch config as a mitigation.

2. “Received message larger than max” error for files with row size larger than 128 MB. (For example, the binary file where rows are > 128 MB)

  • We do not officially support rows larger than 128 MB.
  • This request has been taken as a feature request and has been added to our engineering team's backlog for the next quarter to decide if we can allow the user to change GRPC_DEFAULT_OPTIONS

 

So as far as i understand it - you can't change the GRPC options when using databricks connect currently.

jackson-nline
New Contributor III

Seemingly our issue doesn't contain a "larger than max" error. Out of curiosity I tried to double the maxRecordsPerBatch to 20000 and reduced it to 200 and it didn't appear to help.

```
Fatal Python error: Segmentation fault

Thread 0x0000ffff7bfff120 (most recent call first):
File "/usr/local/lib/python3.10/dist-packages/pyspark/sql/connect/client/core.py", line 1537 in _ping_handler
File "/usr/lib/python3.10/threading.py", line 953 in run
File "/usr/lib/python3.10/threading.py", line 1016 in _bootstrap_inner
File "/usr/lib/python3.10/threading.py", line 973 in _bootstrap

Current thread 0x0000ffffb5415420 (most recent call first):
File "/usr/local/lib/python3.10/dist-packages/google/protobuf/message.py", line 126 in CopyFrom
File "/usr/local/lib/python3.10/dist-packages/pyspark/sql/connect/plan.py", line 524 in plan
File "/usr/local/lib/python3.10/dist-packages/pyspark/sql/connect/plan.py", line 608 in plan
File "/usr/local/lib/python3.10/dist-packages/pyspark/sql/connect/plan.py", line 799 in plan
File "/usr/local/lib/python3.10/dist-packages/pyspark/sql/connect/plan.py", line 702 in plan
File "/usr/local/lib/python3.10/dist-packages/pyspark/sql/connect/plan.py", line 118 in to_proto
File "/usr/local/lib/python3.10/dist-packages/pyspark/sql/connect/dataframe.py", line 1654 in toPandas
File "/home/powerwatch-data-analysis/pwdata/analysis/clustering.py", line 138 in run
File "/home/powerwatch-data-analysis/pwdata/analysis/clustering.py", line 195 in _run_stdbscan
File "/home/powerwatch-data-analysis/pwdata/analysis/clustering.py", line 243 in process
File "/home/powerwatch-data-analysis/pwdata/pipeline/core/pipeline.py", line 184 in __new__
File "/home/powerwatch-data-analysis/pwdata/analysis/clustering.py", line 292 in process
File "/home/powerwatch-data-analysis/pwdata/pipeline/core/pipeline.py", line 184 in __new__
File "/home/powerwatch-data-analysis/pwdata/analysis/outages/outage_estimation.py", line 74 in process
File "/home/powerwatch-data-analysis/pwdata/pipeline/core/pipeline.py", line 184 in __new__
File "/home/powerwatch-data-analysis/pwdata/overrides/sierra_leone/outage_estimation.py", line 23 in process
File "/home/powerwatch-data-analysis/pwdata/pipeline/core/pipeline.py", line 184 in __new__
File "/home/powerwatch-data-analysis/pwdata/kpis/outage.py", line 551 in process
File "/home/powerwatch-data-analysis/pwdata/pipeline/core/pipeline.py", line 184 in __new__
File "/home/powerwatch-data-analysis/pwdata/kpis/outage.py", line 168 in process
File "/home/powerwatch-data-analysis/pwdata/pipeline/core/pipeline.py", line 184 in __new__
File "<stdin>", line 1 in <module>

Extension modules: numpy.core._multiarray_umath, numpy.core._multiarray_tests, numpy.linalg._umath_linalg, numpy.fft._pocketfft_internal, numpy.random._common, numpy.random.bit_generator, numpy.random._bounded_integers, numpy.random._mt19937, numpy.random.mtrand, numpy.random._philox, numpy.random._pcg64, numpy.random._sfc64, numpy.random._generator, psutil._psutil_linux, psutil._psutil_posix, charset_normalizer.md, grpc._cython.cygrpc, pyarrow.lib, pyarrow._hdfsio, pandas._libs.tslibs.np_datetime, pandas._libs.tslibs.dtypes, pandas._libs.tslibs.base, pandas._libs.tslibs.nattype, pandas._libs.tslibs.timezones, pandas._libs.tslibs.ccalendar, pandas._libs.tslibs.fields, pandas._libs.tslibs.timedeltas, pandas._libs.tslibs.tzconversion, pandas._libs.tslibs.timestamps, pandas._libs.properties, pandas._libs.tslibs.offsets, pandas._libs.tslibs.strptime, pandas._libs.tslibs.parsing, pandas._libs.tslibs.conversion, pandas._libs.tslibs.period, pandas._libs.tslibs.vectorized, pandas._libs.ops_dispatch, pandas._libs.missing, pandas._libs.hashtable, pandas._libs.algos, pandas._libs.interval, pandas._libs.lib, pandas._libs.ops, pyarrow._compute, pandas._libs.arrays, pandas._libs.tslib, pandas._libs.sparse, pandas._libs.indexing, pandas._libs.index, pandas._libs.internals, pandas._libs.join, pandas._libs.writers, pandas._libs.window.aggregations, pandas._libs.window.indexers, pandas._libs.reshape, pandas._libs.groupby, pandas._libs.json, pandas._libs.parsers, pandas._libs.testing, google._upb._message, yaml._yaml, shapely.lib, shapely._geos, shapely._geometry_helpers, pyproj._compat, pyproj._datadir, pyproj._network, pyproj._geod, pyproj.list, pyproj._crs, pyproj.database, pyproj._transformer, pyproj._sync, scipy._lib._ccallback_c, scipy.sparse._sparsetools, _csparsetools, scipy.sparse._csparsetools, scipy.sparse.linalg._isolve._iterative, scipy.linalg._fblas, scipy.linalg._flapack, scipy.linalg.cython_lapack, scipy.linalg._cythonized_array_utils, scipy.linalg._solve_toeplitz, scipy.linalg._decomp_lu_cython, scipy.linalg._matfuncs_sqrtm_triu, scipy.linalg.cython_blas, scipy.linalg._matfuncs_expm, scipy.linalg._decomp_update, scipy.linalg._flinalg, scipy.sparse.linalg._dsolve._superlu, scipy.sparse.linalg._eigen.arpack._arpack, scipy.sparse.csgraph._tools, scipy.sparse.csgraph._shortest_path, scipy.sparse.csgraph._traversal, scipy.sparse.csgraph._min_spanning_tree, scipy.sparse.csgraph._flow, scipy.sparse.csgraph._matching, scipy.sparse.csgraph._reordering, scipy.spatial._ckdtree, scipy._lib.messagestream, scipy.spatial._qhull, scipy.spatial._voronoi, scipy.spatial._distance_wrap, scipy.spatial._hausdorff, scipy.special._ufuncs_cxx, scipy.special._ufuncs, scipy.special._specfun, scipy.special._comb, scipy.special._ellip_harm_2, scipy.spatial.transform._rotation (total: 110)
/usr/local/bin/pyspark: line 60: 232 Segmentation fault $PYSPARK_DRIVER_PYTHON
```