Skip to content

Commit

Permalink
perf(card): Adding config support for DS card flushCount and perf log…
Browse files Browse the repository at this point in the history
…s for cardinality calculation time (filodb#1666)

* Adding config support for DS Card flushCount and perf logs for cardinality calculation time
  • Loading branch information
sandeep6189 committed Sep 11, 2023
1 parent 7adc382 commit bf8ead0
Showing 1 changed file with 57 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,15 @@ class CardinalityManager(datasetRef: DatasetRef,
// physical resources for duplicate calculation
private var isCardTriggered: Boolean = false

// number of partKeys to aggregate in memory at any given time, before we write the cardinality information to
// rocksDB. This is done to reduce the time taken to calculate cardinality of partitions with high number of
// TS by reducing the number writes issued to RocksDB ( From our profiling, high number of RocksDB writes lead to
// performance bottlenecks)
// For cardFlushCount = 500,000, we conservatively expect to consume - 128 bytes * 500,000 = ~64 MB
private val cardFlushCount: Option[Int] =
if (filodbConfig.hasPath("card-flush-count")) {
Some(filodbConfig.getInt("card-flush-count"))
} else None

/**
* `dataset-configs` is an string array where each string is a file path for a dataset config. This function reads
Expand Down Expand Up @@ -132,9 +141,6 @@ class CardinalityManager(datasetRef: DatasetRef,

/**
* Triggers cardinalityCount if metering is enabled and the required criteria matches.
* It creates a new instance of CardinalityTracker and uses the PartKeyLuceneIndex to calculate cardinality count
* and store data in a local CardinalityStore. We then close the previous instance of CardinalityTracker and switch
* it with the new one we created in this call.
* @param indexRefreshCount The number of time the indexRefresh has already happened. This is used in the logic of
* shouldTriggerCardinalityCount
*/
Expand All @@ -143,34 +149,7 @@ class CardinalityManager(datasetRef: DatasetRef,
try {
if (shouldTriggerCardinalityCount(shardNum, numShardsPerNode, indexRefreshCount)) {
isCardTriggered = true
val newCardTracker = getNewCardTracker()
var cardCalculationComplete = false
try {
partKeyIndex.calculateCardinality(partSchema, newCardTracker)
cardCalculationComplete = true
} catch {
case ex: Exception =>
logger.error(s"[CardinalityManager]Error while calculating cardinality using" +
s" PartKeyLuceneIndex! shardNum=$shardNum indexRefreshCount=$indexRefreshCount", ex)
// cleanup resources used by the newCardTracker tracker to avoid leaking of resources
newCardTracker.close()
}
if (cardCalculationComplete) {
try {
// close the cardinality store and release the physical resources of the current cardinality store
close()
cardTracker = Some(newCardTracker)
logger.info(s"[CardinalityManager] Triggered cardinality count successfully for" +
s" shardNum=$shardNum indexRefreshCount=$indexRefreshCount")
} catch {
case ex: Exception =>
// Very unlikely scenario, but can happen if the disk call fails.
logger.error(s"[CardinalityManager]Error closing card tracker! shardNum=$shardNum", ex)
// setting cardTracker to None in this case, since the exception happened on the close. We
// can't rely on the card store. The next trigger should re-build the card store and card tracker
cardTracker = None
}
}
createNewCardinalityTrackerAndCalculate(indexRefreshCount)
isCardTriggered = false
}
else {
Expand All @@ -189,6 +168,50 @@ class CardinalityManager(datasetRef: DatasetRef,
}
}

/**
* Creates a new instance of CardinalityTracker and uses the PartKeyLuceneIndex to calculate cardinality count
* and store data in a local CardinalityStore. We then close the previous instance of CardinalityTracker and switch
* it with the new one we created in this call.
*
* @param indexRefreshCount The number of time the indexRefresh has already happened. This is used in the logic of
* shouldTriggerCardinalityCount
*/
private def createNewCardinalityTrackerAndCalculate(indexRefreshCount: Int): Unit = {
val newCardTracker = getNewCardTracker()
var cardCalculationComplete = false
val startTimeMs = System.currentTimeMillis()
try {
logger.info(s"[CardinalityManager]Triggering cardinality count for shardNum=$shardNum " +
s"indexRefreshCount=$indexRefreshCount")
partKeyIndex.calculateCardinality(partSchema, newCardTracker)
cardCalculationComplete = true
} catch {
case ex: Exception =>
logger.error(s"[CardinalityManager]Error while calculating cardinality using" +
s" PartKeyLuceneIndex! shardNum=$shardNum indexRefreshCount=$indexRefreshCount", ex)
// cleanup resources used by the newCardTracker tracker to avoid leaking of resources
newCardTracker.close()
}
if (cardCalculationComplete) {
try {
// close and release the physical resources of the outdated/previous cardinality store
close()
// reassign the cardTracker with the newly created CardinalityTracker object
cardTracker = Some(newCardTracker)
val timeTakenInSeconds = ((System.currentTimeMillis() - startTimeMs)/1000.0)
logger.info(s"[CardinalityManager] Triggered cardinality count successfully for" +
s" shardNum=$shardNum indexRefreshCount=$indexRefreshCount timeTakenInSeconds=$timeTakenInSeconds")
} catch {
case ex: Exception =>
// Very unlikely scenario, but can happen if the disk call fails.
logger.error(s"[CardinalityManager]Error closing card tracker! shardNum=$shardNum", ex)
// setting cardTracker to None in this case, since the exception happened on the close. We
// can't rely on the card store. The next trigger should re-build the card store and card tracker
cardTracker = None
}
}
}

/**
* Helper method to create a CardinalityTracker instance
*
Expand All @@ -197,7 +220,9 @@ class CardinalityManager(datasetRef: DatasetRef,
private def getNewCardTracker(): CardinalityTracker = {
val cardStore = new RocksDbCardinalityStore(datasetRef, shardNum)
val defaultQuota = quotaSource.getDefaults(datasetRef)
val tracker = new CardinalityTracker(datasetRef, shardNum, shardKeyLen, defaultQuota, cardStore)
logger.info(s"[CardinalityManager] Creating new CardinalityTracker with flushCount=$cardFlushCount")
val tracker = new CardinalityTracker(datasetRef, shardNum, shardKeyLen, defaultQuota, cardStore,
flushCount = cardFlushCount)
quotaSource.getQuotas(datasetRef).foreach { q =>
tracker.setQuota(q.shardKeyPrefix, q.quota)
}
Expand Down

0 comments on commit bf8ead0

Please sign in to comment.