We are doing a first time implementation of data streaming reading from a partitioned pulsar topics to a delta table managed by UC. We are unable to scale the job beyond about ~ 40k msgs/sec. Beyond 40k msgs/sec , the job fails.  I'd imagine Databricks to be able to ingest and process much more than 40k msgs/sec. I did find this article but it does not provide any benchmarking details.
https://www.databricks.com/blog/streamnative-and-databricks-unite-power-real-time-data-processing-pu...
 
Are there any benchmarking information that is available for Pulsar Streaming?
Attached is the compute we are using for the job. Partial Exception stack trace below. The timeout occurs only when rate of message production to pulsar is above 40 - 45 k msgs/sec at which point broker backlog raises up causing huge backlogs.
 
 
24/04/09 21:42:12 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 1.0, New Ema: 1.0 
24/04/09 21:42:15 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 1.0, New Ema: 1.0 
24/04/09 21:42:18 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 1.0, New Ema: 1.0 
24/04/09 21:42:18 WARN TaskSetManager: Lost task 14.2 in stage 2141.0 (TID 134213) (100.73.34.11 executor 1): org.apache.spark.SparkException: [TASK_WRITE_FAILED] Task failed while writing rows to abfss://raw@125874d1stadlsraw.dfs.core.windows.net/raw/tables/__unitystorage/catalogs/f6690e6b-ecab-4f0c-b019-7929d8048177/tables/2ba26f23-7b8b-4f3c-a847-4ee4e821a85d. SQLSTATE: 58030
at org.apache.spark.sql.errors.QueryExecutionErrors$.taskFailedWhileWritingRowsError(QueryExecutionErrors.scala:989)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:590)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeWrite$3(FileFormatWriter.scala:363)
at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:82)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:82)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:201)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:186)
at org.apache.spark.scheduler.Task.$anonfun$run$5(Task.scala:151)
at com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:45)
at com.databricks.unity.HandleImpl.runWith(UCSHandle.scala:103)
at com.databricks.unity.HandleImpl.$anonfun$runWithAndClose$1(UCSHandle.scala:108)
at scala.util.Using$.resource(Using.scala:269)
at com.databricks.unity.HandleImpl.runWithAndClose(UCSHandle.scala:107)
at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:145)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$9(Executor.scala:951)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:105)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:954)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:846)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.pulsar.client.api.PulsarClientException$TimeoutException: request timeout {'durationMs': '30000', 'reqId':'3868285672746214474', 'remote':'dcwidavmxt0031.epg.nam.gm.com/10.127.9.49:6651', 'local':'/100.73.34.11:37670'}
at org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:1008)
at org.apache.pulsar.client.impl.ConsumerBase.close(ConsumerBase.java:639)
at org.apache.pulsar.client.impl.ReaderImpl.close(ReaderImpl.java:181)
at org.apache.spark.sql.pulsar.PulsarSourceRDDBase$$anon$1.close(PulsarSourceRDD.scala:161)
at org.apache.spark.util.NextIterator.closeIfNeeded(NextIterator.scala:66)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:75)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.writeWithIterator(FileFormatDataWriter.scala:92)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$2(FileFormatWriter.scala:570)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1546)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:577)
... 28 more