cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Pulsar Streaming (Read) - Benchmarking Information

surband
New Contributor III

We are doing a first time implementation of data streaming reading from a partitioned pulsar topics to a delta table managed by UC. We are unable to scale the job beyond about ~ 40k msgs/sec. Beyond 40k msgs/sec , the job fails.  I'd imagine Databricks to be able to ingest and process much more than 40k msgs/sec. I did find this article but it does not provide any benchmarking details.

https://www.databricks.com/blog/streamnative-and-databricks-unite-power-real-time-data-processing-pu...

 

Are there any benchmarking information that is available for Pulsar Streaming?

Attached is the compute we are using for the job. Partial Exception stack trace below. The timeout occurs only when rate of message production to pulsar is above 40 - 45 k msgs/sec at which point broker backlog raises up causing huge backlogs.

 

 

24/04/09 21:42:12 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 1.0, New Ema: 1.0 
24/04/09 21:42:15 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 1.0, New Ema: 1.0 
24/04/09 21:42:18 INFO ClusterLoadAvgHelper: Current cluster load: 1, Old Ema: 1.0, New Ema: 1.0 
24/04/09 21:42:18 WARN TaskSetManager: Lost task 14.2 in stage 2141.0 (TID 134213) (100.73.34.11 executor 1): org.apache.spark.SparkException: [TASK_WRITE_FAILED] Task failed while writing rows to abfss://raw@125874d1stadlsraw.dfs.core.windows.net/raw/tables/__unitystorage/catalogs/f6690e6b-ecab-4f0c-b019-7929d8048177/tables/2ba26f23-7b8b-4f3c-a847-4ee4e821a85d. SQLSTATE: 58030
at org.apache.spark.sql.errors.QueryExecutionErrors$.taskFailedWhileWritingRowsError(QueryExecutionErrors.scala:989)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:590)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeWrite$3(FileFormatWriter.scala:363)
at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:82)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:82)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:201)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:186)
at org.apache.spark.scheduler.Task.$anonfun$run$5(Task.scala:151)
at com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:45)
at com.databricks.unity.HandleImpl.runWith(UCSHandle.scala:103)
at com.databricks.unity.HandleImpl.$anonfun$runWithAndClose$1(UCSHandle.scala:108)
at scala.util.Using$.resource(Using.scala:269)
at com.databricks.unity.HandleImpl.runWithAndClose(UCSHandle.scala:107)
at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:145)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$9(Executor.scala:951)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:105)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:954)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:846)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.pulsar.client.api.PulsarClientException$TimeoutException: request timeout {'durationMs': '30000', 'reqId':'3868285672746214474', 'remote':'dcwidavmxt0031.epg.nam.gm.com/10.127.9.49:6651', 'local':'/100.73.34.11:37670'}
at org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:1008)
at org.apache.pulsar.client.impl.ConsumerBase.close(ConsumerBase.java:639)
at org.apache.pulsar.client.impl.ReaderImpl.close(ReaderImpl.java:181)
at org.apache.spark.sql.pulsar.PulsarSourceRDDBase$$anon$1.close(PulsarSourceRDD.scala:161)
at org.apache.spark.util.NextIterator.closeIfNeeded(NextIterator.scala:66)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:75)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.writeWithIterator(FileFormatDataWriter.scala:92)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$2(FileFormatWriter.scala:570)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1546)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:577)
... 28 more
 
9 REPLIES 9

surband
New Contributor III

Attached Grafana screenshots

surband
New Contributor III

@shan_chandra any suggestions?

shan_chandra
Esteemed Contributor
Esteemed Contributor

@surband  - can you please add this library to the cluster and try and let us know -  io.streamnative.connectors:pulsar-spark-connector_2.12:3.4.0.3 

Reference: https://github.com/streamnative/pulsar-spark

surband
New Contributor III

But that jar already seems to be available in the cluster class path, I checked in the Spark UI -->Environment -->Classpath Entries. Do you still suggest I go ahead and install? @shan_chandra  

shan_chandra
Esteemed Contributor
Esteemed Contributor

yes. Install from the maven library and see if it works. Per Open source Pulsar Spark connector documentation, Write to Pulsar sink is supported. However, within DBR only read from Pulsar source is supported as of now (as the feature is in public preview). 

surband
New Contributor III

@shan_chandra I was able to get the jar from https://mvnrepository.com/artifact/io.streamnative.connectors/pulsar-spark-connector_2.12/3.4.0.3

But unable to install from as the wizard does not allow me to select the file - see attached. Is there an alternative way of installing. Please let me know. Thank You !

shan_chandra
Esteemed Contributor
Esteemed Contributor

@surband - can you please use the maven option and install the connector. (I was able to attach it successfully in my local).

surband
New Contributor III

@shan_chandra I was able to install and rerun the tests and I am able to see 70k avg process/sec - and job is not crashing any longer. My goal is to see if I can achieve 500k msgs/sec. I will continue testing next week. 

Will databricks runtime update it's package with the library we needed to install manually - what will be ETA?

Thanks so much for your help. 

surband
New Contributor III

@shan_chandra Is it Databrick's official recommendation that customers manually install the following to achieve higher throughputs ? 

https://mvnrepository.com/artifact/io.streamnative.connectors/pulsar-spark-connector_2.12/3.4.0.3