cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Spark StreamingQuery not processing all data from source directory

RajaLakshmanan
New Contributor

Hi,

I have setup a streaming process that consumers files from HDFS staging directory and writes into target location. Input directory continuesouly gets files from another process.

Lets say file producer produces 5 million records sends it to hdfs staging directory in 50 different files in 5 mintues winodw. The streaming process picks up the files ( processing trigger set to 60 secs) and processing them all but the problem is it is missing 2-3% records after processing all the files from input directory.

I checked the checkpoint source directory and it shows that all the files are picked up and processed.

Not sure what else to check.

@Prakash Chockalingam​ Can you pls help ?

Dataset<Row> dataDF = spark.readStream() .option("sep", delimiter) .option("header", "false") .option("inferSchema", "false") .option("latestFirst","false") .schema(appConfig.getSchema()) .csv(appConfig.getHdfsSourceDir());

StreamingQuery query = dataDF.writeStream() .trigger(ProcessingTime.create(60, TimeUnit.SECONDS)) .format("parquet") .outputMode(OutputMode.Append()) .partitionBy(appConfig.getPartitionedBy().split(",")) .option("checkpointLocation", appConfig.getHdfsNNUri() + appConfig.getHdfsOutputDir()+ "/checkpoint") .option("compression", appConfig.getSparkCompressType()) .start(appConfig.getHdfsNNUri() + appConfig.getHdfsOutputDir()); query.awaitTermination();

1 ACCEPTED SOLUTION

Accepted Solutions

jose_gonzalez
Databricks Employee
Databricks Employee

Hi,

Did you check the logs for the micro-batch metrics? these metrics will help you to identify the number of records you process and the time it took to process the data. This article shows the metrics that are capture and how you can use them to know how is your streaming job working. Link https://www.waitingforcode.com/apache-spark-structured-streaming/query-metrics-apache-spark-structur...

Thank you

View solution in original post

2 REPLIES 2

jose_gonzalez
Databricks Employee
Databricks Employee

Hi,

Did you check the logs for the micro-batch metrics? these metrics will help you to identify the number of records you process and the time it took to process the data. This article shows the metrics that are capture and how you can use them to know how is your streaming job working. Link https://www.waitingforcode.com/apache-spark-structured-streaming/query-metrics-apache-spark-structur...

Thank you

User16763506586
Contributor

If it helps , you run try running the Left-Anti join on source and sink to identify missing records and see whether the record is in match with the schema provided or not

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group