Skip to content
This repository has been archived by the owner on Feb 18, 2021. It is now read-only.

Commit

Permalink
Fix DLQ backlog computation (#279)
Browse files Browse the repository at this point in the history
* Fix DLQ backlog computation

* fix non-dlq backlog
  • Loading branch information
Kiran RG authored Aug 19, 2017
1 parent d5b7d1f commit 2a2575a
Showing 1 changed file with 11 additions and 4 deletions.
15 changes: 11 additions & 4 deletions services/controllerhost/queueDepth.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,13 +335,19 @@ func (qdc *queueDepthCalculator) computeBacklog(cgDesc *shared.ConsumerGroupDesc
var iter = &qdc.iter

if qdc.iter.isDLQ {
// update backog, only if the begin/first seqnums are available
if storeMetadata.lastSequence != math.MaxInt64 && storeMetadata.beginSequence != math.MaxInt64 {
// update backog, only if the begin/first seqnums are available.
// {begin,last}Sequence of:
// 0 -> extent created, but store has not reported metrics yet (no updates on store).
// MaxInt64 -> extent loaded on store, but no msgs actually written yet; so store
// reported begin/last seq as 'unknown'.
if storeMetadata.lastSequence != 0 && storeMetadata.lastSequence != math.MaxInt64 &&
storeMetadata.beginSequence != 0 && storeMetadata.beginSequence != math.MaxInt64 {
backlog = storeMetadata.lastSequence - storeMetadata.beginSequence + 1
}
} else {
// update backlog, only if there is an available seqnum
if storeMetadata.availableSequence != math.MaxInt64 {
// update backlog, only if there is an available seqnum. see comment above for interpretation
// of "0" and "MaxInt64".
if storeMetadata.availableSequence != 0 && storeMetadata.availableSequence != math.MaxInt64 {
backlog = storeMetadata.availableSequence - cgExtent.GetAckLevelSeqNo()
}
}
Expand All @@ -351,6 +357,7 @@ func (qdc *queueDepthCalculator) computeBacklog(cgDesc *shared.ConsumerGroupDesc
common.TagStor: storeID,
`rsStore`: storeMetadata.storeID,
`rsAvailSeq`: storeMetadata.availableSequence,
`rsBeginSeq`: storeMetadata.beginSequence,
`rsLastSeq`: storeMetadata.lastSequence,
`rsLastSeqRate`: storeMetadata.lastSequenceRate,
`cgeAckLvlSeq`: cgExtent.GetAckLevelSeqNo(),
Expand Down

0 comments on commit 2a2575a

Please sign in to comment.