10-21-2021 02:50 PM
Hi all,
I'm working with event hubs and data bricks to process and enrich data in real-time.
Doing a "simple" test, I'm getting some weird values (input rate vs processing rate) and I think I'm losing data:
If you can see, there is a peak with 5k records but it is never processed in the 5 minutes after.
The script that I'm using is:
conf = {}
conf['eventhubs.connectionString'] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connectionString_bb_stream)
conf['eventhubs.consumerGroup'] = 'adb_jr_tesst'
conf['maxEventsPerTrigger'] = '350000'
conf['maxRatePerPartition'] = '350000'
conf['setStartingPosition'] = sc._jvm.org.apache.spark.eventhubs.EventPosition.fromEndOfStream
df = (spark.readStream
.format("eventhubs")
.options(**conf)
.load()
)
json_df = df.withColumn("body", from_json(col("body").cast('String'), jsonSchema))
Final_df = json_df.select(["sequenceNumber","offset", "enqueuedTime",col("body.*")])
Final_df = Final_df.withColumn("Key", sha2(concat(col('EquipmentId'), col('TagId'), col('Timestamp')), 256))
Final_df.display()
can you help me to understand why I'm "losing" data or how I can improve the process?
The cluster that I'm using is:
I think is a cluster configuration issue, but I'm not sure how to tackle that.
Thanks for the help, guys!
10-21-2021 09:11 PM
10-22-2021 11:42 AM
Hi @Kaniz Fatma , sorry for bothering you,
could you please take a look at this?
Thanks for your help!
10-21-2021 11:12 PM
Ok, the only thing I notice is that you have set a termination time which is not necessary for streaming (if you are doing real-time).
I also notice you do not set a checkpoint location, something you might consider.
You can also try to remove the maxEvent and maxRate config.
A snippet from the docs:
Here are the details of the recommended job configuration.
https://docs.databricks.com/spark/latest/structured-streaming/production.html
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html
10-22-2021 04:27 AM
Thanks for the answer werners.
What do you mean when you say 'I have set a termination'? In wich part of the script?
I'm not using a check point because I just wanted see what is de behavior of the process at the beginning and try to figure out why I'm losing information.
10-22-2021 04:28 AM
The termination time in the cluster settings
(Terminate after 60 minutes of inactivity)
10-22-2021 04:33 AM
Ah OK, I have that parameter only for the dev cluster.
is possible that the issues is related to this?
10-22-2021 04:36 AM
Maybe, if your cluster shuts down your streaming will be interupted.
But in your case that is probably not the issue as it seems you are not running a long running streaming query.
But what makes you think you have missing records?
Did you count the #records incoming and outgoing?
10-22-2021 04:44 AM
Yes, I've counted the records for a specific range of time (5 min) and there is like +4k records missing... And is aligned with the streaming graph of the processing vs input rate... So, if I'm not losing data I'm not processing in a near real-time the records.
10-22-2021 05:20 AM
Hm odd. You don't use spot instances do you?
10-22-2021 05:54 AM
Sorry Werners, I'm not sure what do you mean with "sport instances"
10-22-2021 06:28 AM
10-22-2021 06:50 AM
Thanks for the explanation.
I don't have that option checked.
10-22-2021 03:24 PM
hi @Jhonatan Reyes ,
How many Event hubs partitions are you readying from? your micro-batch takes a few milliseconds to complete, which I think is good time, but I would like to undertand better what are you trying to improve here.
Also, in this case you are using memory sink (display), I will highly recommend to test it using another type of sink.
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group