cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Event hub streaming improve processing rate

Jreco
Contributor

Hi all,

I'm working with event hubs and data bricks to process and enrich data in real-time.

Doing a "simple" test, I'm getting some weird values (input rate vs processing rate) and I think I'm losing data:

imageIf you can see, there is a peak with 5k records but it is never processed in the 5 minutes after.

The script that I'm using is:

 
conf = {}
 
conf['eventhubs.connectionString'] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connectionString_bb_stream)
conf['eventhubs.consumerGroup'] = 'adb_jr_tesst'
conf['maxEventsPerTrigger'] = '350000'
conf['maxRatePerPartition'] = '350000'
conf['setStartingPosition'] = sc._jvm.org.apache.spark.eventhubs.EventPosition.fromEndOfStream
 
df = (spark.readStream 
                 .format("eventhubs") 
                 .options(**conf) 
                 .load()
            )
 
json_df = df.withColumn("body", from_json(col("body").cast('String'), jsonSchema))
Final_df = json_df.select(["sequenceNumber","offset", "enqueuedTime",col("body.*")])
Final_df = Final_df.withColumn("Key", sha2(concat(col('EquipmentId'), col('TagId'), col('Timestamp')), 256))
Final_df.display()

can you help me to understand why I'm "losing" data or how I can improve the process?

The cluster that I'm using is:

image 

I think is a cluster configuration issue, but I'm not sure how to tackle that.

Thanks for the help, guys!

14 REPLIES 14

Kaniz_Fatma
Community Manager
Community Manager

Hi @ Jreco ! My name is Kaniz, and I'm the technical moderator here. Great to meet you, and thanks for your question! Let's see if your peers in the community have an answer to your question first. Or else I will get back to you soon. Thanks.

Hi kaniz,
Thanks for your reply.

Hi @Kaniz Fatma​ , sorry for bothering you,

could you please take a look at this?

Thanks for your help!

-werners-
Esteemed Contributor III

Ok, the only thing I notice is that you have set a termination time which is not necessary for streaming (if you are doing real-time).

I also notice you do not set a checkpoint location, something you might consider.

You can also try to remove the maxEvent and maxRate config.

A snippet from the docs:

Here are the details of the recommended job configuration.

  • Cluster: Set this always to use a new cluster and use the latest Spark version (or at least version 2.1). Queries started in Spark 2.1 and above are recoverable after query and Spark version upgrades.
  • Alerts: Set this if you want email notification on failures.
  • Schedule: Do not set a schedule.
  • Timeout: Do not set a timeout. Streaming queries run for an indefinitely long time.
  • Maximum concurrent runs: Set to 1. There must be only one instance of each query concurrently active.
  • Retries: Set to Unlimited.

https://docs.databricks.com/spark/latest/structured-streaming/production.html

https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html

Thanks for the answer werners.

What do you mean when you say 'I have set a termination'? In wich part of the script?

I'm not using a check point because I just wanted see what is de behavior of the process at the beginning and try to figure out why I'm losing information.

-werners-
Esteemed Contributor III

The termination time in the cluster settings

(Terminate after 60 minutes of inactivity)

Ah OK, I have that parameter only for the dev cluster.

is possible that the issues is related to this?

-werners-
Esteemed Contributor III

Maybe, if your cluster shuts down your streaming will be interupted.

But in your case that is probably not the issue as it seems you are not running a long running streaming query.

But what makes you think you have missing records?

Did you count the #records incoming and outgoing?

Yes, I've counted the records for a specific range of time (5 min) and there is like +4k records missing... And is aligned with the streaming graph of the processing vs input rate... So, if I'm not losing data I'm not processing in a near real-time the records.

-werners-
Esteemed Contributor III

Hm odd. You don't use spot instances do you?

Sorry Werners, I'm not sure what do you mean with "sport instances"

-werners-
Esteemed Contributor III

imageThese are so called 'spot' instances that you can borrow from other customers at a way cheaper price.

But when these customers need them, they will get evicted from your account. In streaming that could be an issue, but I never tested that.

Thanks for the explanation.

I don't have that option checked.

jose_gonzalez
Moderator
Moderator

hi @Jhonatan Reyes​ ,

How many Event hubs partitions are you readying from? your micro-batch takes a few milliseconds to complete, which I think is good time, but I would like to undertand better what are you trying to improve here.

Also, in this case you are using memory sink (display), I will highly recommend to test it using another type of sink.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group