10-21-2021 02:50 PM
Hi all,
I'm working with event hubs and data bricks to process and enrich data in real-time.
Doing a "simple" test, I'm getting some weird values (input rate vs processing rate) and I think I'm losing data:
If you can see, there is a peak with 5k records but it is never processed in the 5 minutes after.
The script that I'm using is:
conf = {}
conf['eventhubs.connectionString'] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connectionString_bb_stream)
conf['eventhubs.consumerGroup'] = 'adb_jr_tesst'
conf['maxEventsPerTrigger'] = '350000'
conf['maxRatePerPartition'] = '350000'
conf['setStartingPosition'] = sc._jvm.org.apache.spark.eventhubs.EventPosition.fromEndOfStream
df = (spark.readStream
.format("eventhubs")
.options(**conf)
.load()
)
json_df = df.withColumn("body", from_json(col("body").cast('String'), jsonSchema))
Final_df = json_df.select(["sequenceNumber","offset", "enqueuedTime",col("body.*")])
Final_df = Final_df.withColumn("Key", sha2(concat(col('EquipmentId'), col('TagId'), col('Timestamp')), 256))
Final_df.display()
can you help me to understand why I'm "losing" data or how I can improve the process?
The cluster that I'm using is:
I think is a cluster configuration issue, but I'm not sure how to tackle that.
Thanks for the help, guys!
10-21-2021 09:11 PM
10-22-2021 11:42 AM
Hi @Kaniz Fatma , sorry for bothering you,
could you please take a look at this?
Thanks for your help!
10-21-2021 11:12 PM
Ok, the only thing I notice is that you have set a termination time which is not necessary for streaming (if you are doing real-time).
I also notice you do not set a checkpoint location, something you might consider.
You can also try to remove the maxEvent and maxRate config.
A snippet from the docs:
Here are the details of the recommended job configuration.
https://docs.databricks.com/spark/latest/structured-streaming/production.html
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html
10-22-2021 04:27 AM
Thanks for the answer werners.
What do you mean when you say 'I have set a termination'? In wich part of the script?
I'm not using a check point because I just wanted see what is de behavior of the process at the beginning and try to figure out why I'm losing information.
10-22-2021 04:28 AM
The termination time in the cluster settings
(Terminate after 60 minutes of inactivity)
10-22-2021 04:33 AM
Ah OK, I have that parameter only for the dev cluster.
is possible that the issues is related to this?
10-22-2021 04:36 AM
Maybe, if your cluster shuts down your streaming will be interupted.
But in your case that is probably not the issue as it seems you are not running a long running streaming query.
But what makes you think you have missing records?
Did you count the #records incoming and outgoing?
10-22-2021 04:44 AM
Yes, I've counted the records for a specific range of time (5 min) and there is like +4k records missing... And is aligned with the streaming graph of the processing vs input rate... So, if I'm not losing data I'm not processing in a near real-time the records.
10-22-2021 05:20 AM
Hm odd. You don't use spot instances do you?
10-22-2021 05:54 AM
Sorry Werners, I'm not sure what do you mean with "sport instances"
10-22-2021 06:28 AM
10-22-2021 06:50 AM
Thanks for the explanation.
I don't have that option checked.
10-22-2021 03:24 PM
hi @Jhonatan Reyes ,
How many Event hubs partitions are you readying from? your micro-batch takes a few milliseconds to complete, which I think is good time, but I would like to undertand better what are you trying to improve here.
Also, in this case you are using memory sink (display), I will highly recommend to test it using another type of sink.
Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!
Sign Up Now