cancel
Showing results for 
Search instead for 
Did you mean: 
Administration & Architecture
cancel
Showing results for 
Search instead for 
Did you mean: 

Extreme RocksDB memory usage

PetePP
New Contributor II

During migration to production workload, I switched some queries to use RocksDB. I am concerned with its memory usage though. 

Here is sample output from my streaming query:

 

 

  "stateOperators" : [ {
    "operatorName" : "dedupeWithinWatermark",
    "numRowsTotal" : 611788,
    "numRowsUpdated" : 610009,
    "allUpdatesTimeMs" : 7303,
    "numRowsRemoved" : 633148,
    "allRemovalsTimeMs" : 6082,
    "commitTimeMs" : 10363,
    "memoryUsedBytes" : 32142263729,
    "numRowsDroppedByWatermark" : 0,
    "numShufflePartitions" : 4,
    "numStateStoreInstances" : 4,
    "customMetrics" : {
      "numDroppedDuplicateRows" : 0,
      "rocksdbBytesCopied" : 61561365,
      "rocksdbCommitCheckpointLatency" : 198,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 3856,
      "rocksdbCommitFlushLatency" : 6302,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 4,
      "rocksdbFilesReused" : 11,
      "rocksdbGetCount" : 1853166,
      "rocksdbGetLatency" : 5490,
      "rocksdbPinnedBlocksMemoryUsage" : 198117968,
      "rocksdbPutCount" : 1243157,
      "rocksdbPutLatency" : 2073,
      "rocksdbReadBlockCacheHitCount" : 1928135,
      "rocksdbReadBlockCacheMissCount" : 21340,
      "rocksdbSstFileSize" : 201411521,
      "rocksdbTotalBytesRead" : 10763516,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 139455496,
      "rocksdbTotalBytesWritten" : 146504893,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalBytesWrittenByFlush" : 61562581,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 2969,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 41221
    }
  } ]

 

 

If I understand this correctly, 611788 keys are stored in the database. The key is defined as:

 

 

.withWatermark('kafka_timestamp', '5 minutes')
[...]
.dropDuplicatesWithinWatermark(['brand', 'transaction_id', 'status'])

 

 

where kafka_timestamp is of type Timestamp, and other keys are all Strings with 16 characters maximum.

It gets even worse after the query is running for some time, over 40GB is used for just 20k entries.

Am I reading this incorrectly? Can I control this somehow? Or is this expected behaviour, as this is using 50 to 100x more than I would expect in the most extreme scenario. 

 

Any insight would be highly appreciated, thank you!

 

 

2 REPLIES 2

Tharun-Kumar
Honored Contributor II
Honored Contributor II

@PetePP 

memoryUsedBytes: 32142263729

32 gb is the current usage. This is the memory used to store the 611k records. This is the storage memory required to store all the columns in the dataset and not just the 4 columns mentioned in the watermark.

PetePP
New Contributor II

Thank you for the input. Is there any particular reason why deduplication watermark makes it store everything and not just the key needed for deduplication? The 1st record has to be written to the table anyway, and its content is irrelevant as it just drops later records that get a hit.

Is there any way to control this behavior?  I know I could do a constraint on write, but that seems really excessive as the table has millions of rows  and I really need to look just a few minutes past.

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.