cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

newAPIHadoopRDD Spark API doesn't retrieve unflushed data written to Hbase table

mshettar
New Contributor II

Reading from an HBase table with a few hundred records that haven't been persisted (flushed) to HDFS doesn't show up in Spark. However, the records become visible after forced flush via Hbase shell or system triggered flush (when size of Memstore crosses the configured threshold), and anything written after initial flush is immediately visible in Spark. Additionally, records in Memstore before flush are immediately visible to Hbase-client's scan api. 

Note: We cannot use spark-hbase connector as schema is unknown beforehand. Hence, we resort to use newAPIHadoopRDD API

Spark read API 

@transient val conf : Configuration = HBaseConfiguration.create()
 conf.set("hbase.zookeeper.quorum", ZOOKEEPER_QUORUM);
 conf.set("hbase.zookeeper.property.clientPort", ZOOKEEPER_PORT);
 conf.set(TableInputFormat.INPUT_TABLE, "test")
 spark.sparkContext.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])

We have tried passing custom scan object setting/unsetting several flags but none of them work. Below, we try to force Hbase Scan Api to read from replica-0 but to no avail. 

// Function to convert Scan object to a base64 string
 
def convertScanToString(scan: Scan): String = {
 val proto = ProtobufUtil.toScan(scan)
 Base64.encodeBytes(proto.toByteArray)
}
 
@transient val scan = new Scan()
scan.setReplicaId(0)
@transient val scanStr = convertScanToString(scan)
@transient val conf : Configuration = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", ZOOKEEPER_QUORUM);
conf.set("hbase.zookeeper.property.clientPort", ZOOKEEPER_PORT);
conf.set(TableInputFormat.INPUT_TABLE, "test")
 
spark.sparkContext.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])

Following Hbase client read API retrieves records stored in Memstore before flush as expected. 

val table = connection.getTable(TableName.valueOf(Bytes.toBytes("test")))
val scan = table.getScanner(new Scan())
scan.asScala.foreach(result => {
  println(result)
})

2 REPLIES 2

Anonymous
Not applicable

@Manjunath Shettar​ :

It seems that the issue is related to the fact that the records in the HBase table have not been flushed to HDFS and are still stored in the Memstore. Spark's newAPIHadoopRDD API reads data from the HBase table through HBase's TableInputFormat, which only reads data that has been persisted (flushed) to HDFS.

The reason why the records become visible after a forced flush via HBase shell or a system-triggered flush is that the data is now persisted to HDFS and can be read by TableInputFormat. Similarly, any data written after the initial flush is immediately visible in Spark because it is persisted to HDFS and can be read by TableInputFormat.

The HBase-client's scan API can read data from the Memstore before flush because it directly accesses the HBase region server's Memstore, bypassing the need for data to be persisted to HDFS first.

Regarding the attempt to force HBase Scan API to read from replica-0, it is not clear how the replica-0 configuration is set up in your HBase cluster, and whether it is properly configured and running. If replica-0 is not properly configured, then setting the replicaId to 0 in the Scan object may not have any effect. If replica-0 is properly configured, then it may be necessary to also configure the HBase client to read from replica-0, as described in the HBase documentation.

As for the issue of not being able to use the Spark-HBase connector because the schema is unknown beforehand, one option is to dynamically infer the schema from the first few rows of data using Spark's DataFrame API. Once the schema is inferred, it can be used to read the remaining data from the HBase table. Another option is to manually specify the schema using Spark's StructType API, based on the known structure of the HBase table.

mshettar
New Contributor II

Hi @suteja,

Thank you for the response. I didn't make myself clear on unflushed writes being visible after initial flush, I have explained it below. Our concern is with regards to the step 3. Per our assumption, the ten unflushed records shouldn't be visible for Spark reads but it is in our case.

  1. Write 100 records to Hbase. (Not visible for Spark read)
  2. Flush table. (100 records become visible for Spark read)
  3. Write 10 records to Hbase. Data is still in Memstore but is visible for Spark read.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group