Reading from an HBase table with a few hundred records that haven't been persisted (flushed) to HDFS doesn't show up in Spark. However, the records become visible after forced flush via Hbase shell or system triggered flush (when size of Memstore crosses the configured threshold), and anything written after initial flush is immediately visible in Spark. Additionally, records in Memstore before flush are immediately visible to Hbase-client's scan api.
Note: We cannot use spark-hbase connector as schema is unknown beforehand. Hence, we resort to use newAPIHadoopRDD API
Spark read API
@transient val conf : Configuration = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", ZOOKEEPER_QUORUM);
conf.set("hbase.zookeeper.property.clientPort", ZOOKEEPER_PORT);
conf.set(TableInputFormat.INPUT_TABLE, "test")
spark.sparkContext.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
We have tried passing custom scan object setting/unsetting several flags but none of them work. Below, we try to force Hbase Scan Api to read from replica-0 but to no avail.
// Function to convert Scan object to a base64 string
def convertScanToString(scan: Scan): String = {
val proto = ProtobufUtil.toScan(scan)
Base64.encodeBytes(proto.toByteArray)
}
@transient val scan = new Scan()
scan.setReplicaId(0)
@transient val scanStr = convertScanToString(scan)
@transient val conf : Configuration = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", ZOOKEEPER_QUORUM);
conf.set("hbase.zookeeper.property.clientPort", ZOOKEEPER_PORT);
conf.set(TableInputFormat.INPUT_TABLE, "test")
spark.sparkContext.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
Following Hbase client read API retrieves records stored in Memstore before flush as expected.
val table = connection.getTable(TableName.valueOf(Bytes.toBytes("test")))
val scan = table.getScanner(new Scan())
scan.asScala.foreach(result => {
println(result)
})