Hi,
I have setup a streaming process that consumers files from HDFS staging directory and writes into target location. Input directory continuesouly gets files from another process.
Lets say file producer produces 5 million records sends it to hdfs staging directory in 50 different files in 5 mintues winodw. The streaming process picks up the files ( processing trigger set to 60 secs) and processing them all but the problem is it is missing 2-3% records after processing all the files from input directory.
I checked the checkpoint source directory and it shows that all the files are picked up and processed.
Not sure what else to check.
@Prakash Chockalingam Can you pls help ?
Dataset<Row> dataDF = spark.readStream() .option("sep", delimiter) .option("header", "false") .option("inferSchema", "false") .option("latestFirst","false") .schema(appConfig.getSchema()) .csv(appConfig.getHdfsSourceDir());
StreamingQuery query = dataDF.writeStream() .trigger(ProcessingTime.create(60, TimeUnit.SECONDS)) .format("parquet") .outputMode(OutputMode.Append()) .partitionBy(appConfig.getPartitionedBy().split(",")) .option("checkpointLocation", appConfig.getHdfsNNUri() + appConfig.getHdfsOutputDir()+ "/checkpoint") .option("compression", appConfig.getSparkCompressType()) .start(appConfig.getHdfsNNUri() + appConfig.getHdfsOutputDir()); query.awaitTermination();