cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Spark StreamingQuery not processing all data from source directory

RajaLakshmanan
New Contributor

Hi,

I have setup a streaming process that consumers files from HDFS staging directory and writes into target location. Input directory continuesouly gets files from another process.

Lets say file producer produces 5 million records sends it to hdfs staging directory in 50 different files in 5 mintues winodw. The streaming process picks up the files ( processing trigger set to 60 secs) and processing them all but the problem is it is missing 2-3% records after processing all the files from input directory.

I checked the checkpoint source directory and it shows that all the files are picked up and processed.

Not sure what else to check.

@Prakash Chockalingam​ Can you pls help ?

Dataset<Row> dataDF = spark.readStream() .option("sep", delimiter) .option("header", "false") .option("inferSchema", "false") .option("latestFirst","false") .schema(appConfig.getSchema()) .csv(appConfig.getHdfsSourceDir());

StreamingQuery query = dataDF.writeStream() .trigger(ProcessingTime.create(60, TimeUnit.SECONDS)) .format("parquet") .outputMode(OutputMode.Append()) .partitionBy(appConfig.getPartitionedBy().split(",")) .option("checkpointLocation", appConfig.getHdfsNNUri() + appConfig.getHdfsOutputDir()+ "/checkpoint") .option("compression", appConfig.getSparkCompressType()) .start(appConfig.getHdfsNNUri() + appConfig.getHdfsOutputDir()); query.awaitTermination();

1 ACCEPTED SOLUTION

Accepted Solutions

jose_gonzalez
Moderator
Moderator

Hi,

Did you check the logs for the micro-batch metrics? these metrics will help you to identify the number of records you process and the time it took to process the data. This article shows the metrics that are capture and how you can use them to know how is your streaming job working. Link https://www.waitingforcode.com/apache-spark-structured-streaming/query-metrics-apache-spark-structur...

Thank you

View solution in original post

3 REPLIES 3

Kaniz
Community Manager
Community Manager

Hi @RajaLakshmanan! My name is Kaniz, and I'm a technical moderator here. Great to meet you, and thanks for your question! Let's see if your peers on the Forum have an answer to your questions first. Or else I will follow up shortly with a response.

jose_gonzalez
Moderator
Moderator

Hi,

Did you check the logs for the micro-batch metrics? these metrics will help you to identify the number of records you process and the time it took to process the data. This article shows the metrics that are capture and how you can use them to know how is your streaming job working. Link https://www.waitingforcode.com/apache-spark-structured-streaming/query-metrics-apache-spark-structur...

Thank you

User16763506586
Contributor

If it helps , you run try running the Left-Anti join on source and sink to identify missing records and see whether the record is in match with the schema provided or not

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.