cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Data getting missed while reading from azure event hub using spark streaming

Rishi045
New Contributor III

Hi All,

I am facing an issue of data getting missed.

I am reading the data from azure event hub and after flattening the json data I am storing it in a parquet file and then using another databricks notebook to perform the merge operations on my delta table by adding some etl columns to it.

However in between somewhere the records are getting missed.

I have scheduled the job to run every hour.

Can someone please help me out with this.

11 REPLIES 11

-werners-
Esteemed Contributor III

do you land the event hub data unprocessed on a data lake?  if so, you can check if everything is there.
If so: check the next step and so on.
If you do not save the raw data, with some luck you have it still in event hub.

Rishi045
New Contributor III

No the data is processed before landing on a datalake

Rishi045
New Contributor III

I could not find those missing records in data lake as well

 

Rishi045
New Contributor III

Using below code :

conf = {}
df = spark.readStream.format("eventhubs").options(**conf).load()
dataDF = df.select(col("body").cast("STRING"))
data = dataDF.select(json_tuple(col("body"),"table","op_type","records","op_ts")) \
            .toDF("table","op_type","records","op_ts")
 
final_data = data.withColumn("records_json",from_json(col("records"),reqSchema))
final_data = final_data.select(
    *[col("records_json." + field).alias(field) for field in reqSchema.fieldNames()],
    col("op_type"),
    col("op_ts"))
final_data.orderBy(col("op_ts").desc())
final_data = final_data.dropDuplicates([primaryKey])  
final_data = final_data.distinct()
final_data = final_data.drop(final_data.op_ts)
final_data = final_data.drop(final_data.op_type)
 
final_data.coalesce(1).writeStream \
    .format("parquet") \
    .outputMode("append") \
    .option("checkpointLocation",checkPoint_url) \
        .trigger(once=True)\
    .start(rawFilePath_url) \
    .awaitTermination()

-werners-
Esteemed Contributor III

without having the actual raw data, it is hard to figure out where the issue resides.  It could be the code or it could be event hub.
I'd store the raw event hub data in a data lake and use autoloader for further processing.

One thing in your code that caught my attention is the use of dropduplicates and distinct.
Are you sure you are not dropping too much?

Rishi045
New Contributor III

I am dropping duplicates based on my primary key as multiple records are coming from event hub for a single primary key and I want to take the latest one also i am doing order by on op_ts column so that I get the latest record only.

Distinct can create any issue in the code I am not sure.

-werners-
Esteemed Contributor III

Ideally you can still read the event hub events, and see if the missing ones are there.  If so: it must be spark related.
If they are already gone, hard to tell.  I'd store the events in a raw table so you can do validity checks.}

Rishi045
New Contributor III

in event hub i can see the missing records. should I be using forEachBatch somewhere in my code. ?or any other suggestion.

-werners-
Esteemed Contributor III

Basically you want to do some cleanup of duplicates. And you want to do this for each microbatch coming in I assume.
So indeed, create a function which does de dedup etc and call it in foreachbatch.
https://docs.databricks.com/en/delta/merge.html#data-deduplication-when-writing-into-delta-tables

Rishi045
New Contributor III

As of now I am not having any foreachbatch in my code. 

I am performing dedup on entire data coming from event hub

Hubert-Dudek
Esteemed Contributor III

- In the EventHub, you can preview the event hub job using Azure Analitycs, so please first check are all records there

- Please set in Databricks that it is saved directly to the bronze delta table without performing any aggregation, just 1 to 1, and check if all records are there.

- Please consider using Delta Live Table for ingestion from Event Hub. It will make your live easier regarding monitoring stream, data quality, and performing full refresh when needed.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group