cancel
Showing results for 
Search instead for 
Did you mean: 
Administration & Architecture
Explore discussions on Databricks administration, deployment strategies, and architectural best practices. Connect with administrators and architects to optimize your Databricks environment for performance, scalability, and security.
cancel
Showing results for 
Search instead for 
Did you mean: 

Extreme RocksDB memory usage

PetePP
New Contributor II

During migration to production workload, I switched some queries to use RocksDB. I am concerned with its memory usage though. 

Here is sample output from my streaming query:

 

 

  "stateOperators" : [ {
    "operatorName" : "dedupeWithinWatermark",
    "numRowsTotal" : 611788,
    "numRowsUpdated" : 610009,
    "allUpdatesTimeMs" : 7303,
    "numRowsRemoved" : 633148,
    "allRemovalsTimeMs" : 6082,
    "commitTimeMs" : 10363,
    "memoryUsedBytes" : 32142263729,
    "numRowsDroppedByWatermark" : 0,
    "numShufflePartitions" : 4,
    "numStateStoreInstances" : 4,
    "customMetrics" : {
      "numDroppedDuplicateRows" : 0,
      "rocksdbBytesCopied" : 61561365,
      "rocksdbCommitCheckpointLatency" : 198,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 3856,
      "rocksdbCommitFlushLatency" : 6302,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 4,
      "rocksdbFilesReused" : 11,
      "rocksdbGetCount" : 1853166,
      "rocksdbGetLatency" : 5490,
      "rocksdbPinnedBlocksMemoryUsage" : 198117968,
      "rocksdbPutCount" : 1243157,
      "rocksdbPutLatency" : 2073,
      "rocksdbReadBlockCacheHitCount" : 1928135,
      "rocksdbReadBlockCacheMissCount" : 21340,
      "rocksdbSstFileSize" : 201411521,
      "rocksdbTotalBytesRead" : 10763516,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 139455496,
      "rocksdbTotalBytesWritten" : 146504893,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalBytesWrittenByFlush" : 61562581,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 2969,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 41221
    }
  } ]

 

 

If I understand this correctly, 611788 keys are stored in the database. The key is defined as:

 

 

.withWatermark('kafka_timestamp', '5 minutes')
[...]
.dropDuplicatesWithinWatermark(['brand', 'transaction_id', 'status'])

 

 

where kafka_timestamp is of type Timestamp, and other keys are all Strings with 16 characters maximum.

It gets even worse after the query is running for some time, over 40GB is used for just 20k entries.

Am I reading this incorrectly? Can I control this somehow? Or is this expected behaviour, as this is using 50 to 100x more than I would expect in the most extreme scenario. 

 

Any insight would be highly appreciated, thank you!

 

 

2 REPLIES 2

Tharun-Kumar
Databricks Employee
Databricks Employee

@PetePP 

memoryUsedBytes: 32142263729

32 gb is the current usage. This is the memory used to store the 611k records. This is the storage memory required to store all the columns in the dataset and not just the 4 columns mentioned in the watermark.

PetePP
New Contributor II

Thank you for the input. Is there any particular reason why deduplication watermark makes it store everything and not just the key needed for deduplication? The 1st record has to be written to the table anyway, and its content is irrelevant as it just drops later records that get a hit.

Is there any way to control this behavior?  I know I could do a constraint on write, but that seems really excessive as the table has millions of rows  and I really need to look just a few minutes past.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group