โ08-03-2023 04:15 AM
Hi All,
I am facing an issue of data getting missed.
I am reading the data from azure event hub and after flattening the json data I am storing it in a parquet file and then using another databricks notebook to perform the merge operations on my delta table by adding some etl columns to it.
However in between somewhere the records are getting missed.
I have scheduled the job to run every hour.
Can someone please help me out with this.
โ08-03-2023 04:27 AM
do you land the event hub data unprocessed on a data lake? if so, you can check if everything is there.
If so: check the next step and so on.
If you do not save the raw data, with some luck you have it still in event hub.
โ08-03-2023 04:36 AM
No the data is processed before landing on a datalake
โ08-03-2023 04:38 AM
I could not find those missing records in data lake as well
โ08-03-2023 04:43 AM
Using below code :
โ08-03-2023 04:48 AM
without having the actual raw data, it is hard to figure out where the issue resides. It could be the code or it could be event hub.
I'd store the raw event hub data in a data lake and use autoloader for further processing.
One thing in your code that caught my attention is the use of dropduplicates and distinct.
Are you sure you are not dropping too much?
โ08-03-2023 04:54 AM
I am dropping duplicates based on my primary key as multiple records are coming from event hub for a single primary key and I want to take the latest one also i am doing order by on op_ts column so that I get the latest record only.
Distinct can create any issue in the code I am not sure.
โ08-03-2023 05:05 AM
Ideally you can still read the event hub events, and see if the missing ones are there. If so: it must be spark related.
If they are already gone, hard to tell. I'd store the events in a raw table so you can do validity checks.}
โ08-03-2023 05:09 AM
in event hub i can see the missing records. should I be using forEachBatch somewhere in my code. ?or any other suggestion.
โ08-03-2023 05:22 AM
Basically you want to do some cleanup of duplicates. And you want to do this for each microbatch coming in I assume.
So indeed, create a function which does de dedup etc and call it in foreachbatch.
https://docs.databricks.com/en/delta/merge.html#data-deduplication-when-writing-into-delta-tables
โ08-03-2023 05:35 AM
As of now I am not having any foreachbatch in my code.
I am performing dedup on entire data coming from event hub
โ08-03-2023 05:00 AM
- In the EventHub, you can preview the event hub job using Azure Analitycs, so please first check are all records there
- Please set in Databricks that it is saved directly to the bronze delta table without performing any aggregation, just 1 to 1, and check if all records are there.
- Please consider using Delta Live Table for ingestion from Event Hub. It will make your live easier regarding monitoring stream, data quality, and performing full refresh when needed.
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโt want to miss the chance to attend and share knowledge.
If there isnโt a group near you, start one and help create a community that brings people together.
Request a New Group