Skip to content
This repository has been archived by the owner on Feb 18, 2021. It is now read-only.

Outputhost: Fix offset computations for ack/read levels #193

Merged
merged 3 commits into from
May 8, 2017
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 61 additions & 89 deletions services/outputhost/ackmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,23 +39,28 @@ const replicatorCallTimeout = 20 * time.Second
type storeHostAddress int64

type (
// internalMsg is the message which is stored locally on the ackMgr
internalMsg struct {
addr storeHostAddress
acked bool
ackIndex uint32

// msgCtx is the message context which is stored locally on the ackMgr
msgCtx struct {
addr storeHostAddress
seqnum common.SequenceNumber
acked bool
}

levels struct {
asOf common.UnixNanoTime // Timestamp when this level was calculated
readLevel common.SequenceNumber // -1 = nothing received, 0 = 1 message (#0) received, etc.
ackLevel common.SequenceNumber // -1 = nothing acked
readLevel ackIndex // -1 = nothing received, 0 = 1 message (#0) received, etc.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you update this comment? I don't think we initialize to -1 now..

readLevelSeq common.SequenceNumber // store msg seqnum corresponding to readLevel
ackLevel ackIndex // -1 = nothing acked
ackLevelSeq common.SequenceNumber // store msg seqnum corresponding to the ackLevel
lastAckedSeq common.SequenceNumber // the latest sequence which is acked
}

// ackManager is held per CG extent and it holds the addresses that we get from the store.
ackManager struct {
addrs map[common.SequenceNumber]*internalMsg // ‡
sealed bool // ‡
addrs map[ackIndex]*msgCtx // ‡
sealed bool // ‡
outputHostUUID string
cgUUID string
extUUID string
Expand Down Expand Up @@ -87,7 +92,7 @@ func newAckManager(
committer Committer,
logger bark.Logger) *ackManager {
ackMgr := &ackManager{
addrs: make(map[common.SequenceNumber]*internalMsg),
addrs: make(map[ackIndex]*msgCtx),
cgCache: cgCache,
outputHostUUID: outputHostUUID,
cgUUID: cgUUID,
Expand All @@ -102,8 +107,10 @@ func newAckManager(
}

ackMgr.levels = &levels{
readLevel: common.SequenceNumber(cge.GetAckLevelSeqNo()),
ackLevel: common.SequenceNumber(cge.GetAckLevelSeqNo()),
readLevel: ackIndex(cge.GetAckLevelSeqNo()),
readLevelSeq: common.SequenceNumber(cge.GetAckLevelSeqNo()),
ackLevel: ackIndex(cge.GetAckLevelSeqNo()),
ackLevelSeq: common.SequenceNumber(cge.GetAckLevelSeqNo()),
}

return ackMgr
Expand All @@ -112,75 +119,40 @@ func newAckManager(
// ackID is a string which is a base64 encoded string
// First we get the ackID and store the address locally in our data structure
// for maintaining the ack level
func (ackMgr *ackManager) getNextAckID(address int64, sequence common.SequenceNumber) (ackID string) {
func (ackMgr *ackManager) getNextAckID(address storeHostAddress, sequence common.SequenceNumber) (ackID string) {
ackMgr.lk.Lock()
defer ackMgr.lk.Unlock()

ackMgr.readLevel++ // This means that the first ID is '1'
ackMgr.readLevelSeq++

expectedReadLevel := ackMgr.levelOffset + ackMgr.readLevel
if sequence != ackMgr.readLevelSeq {

// NOTE: sequence should be zero for timer destinations, since they usually have discontinuous sequence numbers
if sequence != 0 && expectedReadLevel != sequence {
skippedMessages := sequence - expectedReadLevel
ackMgr.logger.WithFields(bark.Fields{
`old-readLevelSeq`: ackMgr.readLevelSeq,
`new-readLevelSeq`: sequence,
}).Error(`adjusting read-level sequence`)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make this a Warn level log and let's also record a metric here for easy tracking..


if skippedMessages < 0 {
ackMgr.logger.WithFields(bark.Fields{
`address`: address,
`sequence`: sequence,
`readLevel`: ackMgr.readLevel,
`levelOffset`: ackMgr.levelOffset,
`expectedReadLevel`: expectedReadLevel,
`skippedMessages`: skippedMessages,
}).Error(`negative discontinuity detected (rollback)`)
// Don't update gauge, since negative numbers aren't supported for M3 gauges
} else {
// update gauge here to say we skipped messages (potentially due to retention?)
ackMgr.cgCache.consumerM3Client.UpdateGauge(metrics.ConsConnectionScope, metrics.OutputhostCGSkippedMessages, int64(skippedMessages))
}
// TODO: Support message auditing by creating discontinuity events, rather than just updating the offset here
// TODO: Consumption rates will be spuriously high
// NOTE: Increasing the offset here also affects the AckLevelSeqNo, which will tend to push it into the discontinuity,
// which makes it appear that a retained message was received. This is OK, and it will cause queue depth to react
// to retention events a bit faster.
// NOTE: The same skipped messages can be reported twice if the outputhost is restarted before the ack level
// passes the discontinuity
ackMgr.addLevelOffsetImpl(skippedMessages)
ackMgr.readLevelSeq = sequence
}

ackID = common.ConstructAckID(ackMgr.sessionID, ackMgr.ackMgrID, uint32(ackMgr.readLevel), address)
ackID = common.ConstructAckID(ackMgr.sessionID, ackMgr.ackMgrID, uint32(ackMgr.readLevel), int64(address))

// now store the message in the data structure internally
ackMgr.addrs[ackMgr.readLevel] = &internalMsg{
addr: storeHostAddress(address),
ackMgr.addrs[ackMgr.readLevel] = &msgCtx{
addr: address,
seqnum: sequence,
}

// Let the committer know about the new read level
ackMgr.committer.SetReadLevel(CommitterLevel{
seqNo: sequence,
address: storeHostAddress(address),
address: address,
})

ackMgr.lk.Unlock()

return
}

// addLevelOffset adds an offset to the sequence number stored in cassandra. This ensures that our internal records always have
// continguous sequence numbers, but we can adjust the offset when/if the replica connection gets loaded (asynchronously)
func (ackMgr *ackManager) addLevelOffset(offset common.SequenceNumber) {
ackMgr.lk.Lock()
ackMgr.addLevelOffsetImpl(offset)
ackMgr.lk.Unlock()
}

func (ackMgr *ackManager) addLevelOffsetImpl(offset common.SequenceNumber) {
ackMgr.logger.WithFields(bark.Fields{
`oldOffset`: ackMgr.levelOffset,
`newOffset`: ackMgr.levelOffset + offset,
`change`: offset,
}).Info(`adjusting sequence number offset`)
ackMgr.levelOffset += offset
}

func (ackMgr *ackManager) stop() {
close(ackMgr.closeChannel)
ackMgr.doneWG.Wait()
Expand All @@ -194,7 +166,7 @@ func (ackMgr *ackManager) start() {

// getCurrentReadLevel returns the current read-level address and seqnum. this is called
// by extcache when it connects to a new replica, when one stream is disconnected.
func (ackMgr *ackManager) getCurrentReadLevel() (addr int64, seqNo common.SequenceNumber) {
func (ackMgr *ackManager) getCurrentReadLevel() (addr int64, seqnum common.SequenceNumber) {

ackMgr.lk.RLock()
defer ackMgr.lk.RUnlock()
Expand All @@ -206,7 +178,7 @@ func (ackMgr *ackManager) getCurrentReadLevel() (addr int64, seqNo common.Sequen
addr = int64(msg.addr)
}

seqNo = ackMgr.levelOffset + ackMgr.readLevel
seqnum = ackMgr.readLevelSeq

return
}
Expand Down Expand Up @@ -251,30 +223,30 @@ func (ackMgr *ackManager) updateAckLevel() {
ackMgr.lk.Lock()

count := 0
stop := ackMgr.ackLevel + common.SequenceNumber(int64(len(ackMgr.addrs)))
stop := ackMgr.ackLevel + ackIndex(len(ackMgr.addrs))

// We go through the map here and see if the messages are acked,
// moving the acklevel as we go forward.
for curr := ackMgr.ackLevel + 1; curr <= stop; curr++ {
if addrs, ok := ackMgr.addrs[curr]; ok {
if addrs.acked {
update = true
ackMgr.ackLevel = curr
ackLevelAddress = int64(addrs.addr)

// We need to commit every message we see here, since we may have an interleved stream,
// and only the committer knows how to report the level(s). This is true, e.g. for Kafka.
ackMgr.committer.SetCommitLevel(CommitterLevel{
seqNo: ackMgr.ackLevel + ackMgr.levelOffset,
address: addrs.addr,
})

// getCurrentAckLevelOffset needs addr[ackMgr.ackLevel], so delete the previous one if it exists
count++
delete(ackMgr.addrs, curr-1)
} else {
if !addrs.acked {
break
}

update = true
ackMgr.ackLevel = curr
ackLevelAddress = int64(addrs.addr)

// We need to commit every message we see here, since we may have an interleved stream,
// and only the committer knows how to report the level(s). This is true, e.g. for Kafka.
ackMgr.committer.SetCommitLevel(CommitterLevel{
seqNo: addrs.seqnum,
address: addrs.addr,
})

// getCurrentAckLevelOffset needs addr[ackMgr.ackLevel], so delete the previous one if it exists
count++
delete(ackMgr.addrs, curr-1)
}
}

Expand Down Expand Up @@ -333,31 +305,31 @@ func (ackMgr *ackManager) updateAckLevel() {
}
}

func (ackMgr *ackManager) acknowledgeMessage(ackID AckID, seqNum uint32, address int64, isNack bool) error {
func (ackMgr *ackManager) acknowledgeMessage(ackID AckID, seqNum ackIndex, address int64, isNack bool) error {
var err error
notifyCg := true
ackMgr.lk.Lock() // Read lock would be OK in this case (except for a benign race with two simultaneous acks for the same ackID), see below
// check if this id is present
if addrs, ok := ackMgr.addrs[common.SequenceNumber(seqNum)]; ok {
if msg, ok := ackMgr.addrs[seqNum]; ok {
// validate the address from the ackID
if addrs.addr != storeHostAddress(address) {
if msg.addr != storeHostAddress(address) {
ackMgr.logger.WithFields(bark.Fields{
`address`: address,
`expected`: addrs.addr,
`expected`: msg.addr,
}).Error(`ack address does not match!`)
err = errors.New("address of the ackID doesn't match with ackMgr")
notifyCg = false
} else {
if ackMgr.cgCache.cachedCGDesc.GetOwnerEmail() == SmartRetryDisableString {
ackMgr.logger.WithFields(bark.Fields{
`Address`: address,
`addr`: addrs.addr,
`addr`: msg.addr,
common.TagSeq: seqNum,
`isNack`: isNack,
}).Info(`msg ack`)
}
if !isNack {
addrs.acked = true // This is the only place that this field of addrs is changed. It was initially set under a write lock elsewhere, hence we can have a read lock
msg.acked = true // This is the only place that this field of msg is changed. It was initially set under a write lock elsewhere, hence we can have a read lock
// update the last acked sequence, if this is the most recent ack
if ackMgr.lastAckedSeq < common.SequenceNumber(seqNum) {
ackMgr.lastAckedSeq = common.SequenceNumber(seqNum)
Expand Down Expand Up @@ -402,14 +374,14 @@ func (ackMgr *ackManager) manageAckLevel() {

// get the number of acked and unacked messages from the last ack level
func (ackMgr *ackManager) getNumAckedAndUnackedMessages() (*int64, *int64) {
stop := ackMgr.ackLevel + common.SequenceNumber(int64(len(ackMgr.addrs)))
stop := ackMgr.ackLevel + ackIndex(len(ackMgr.addrs))

var acked int64
var unacked int64
// We go through the map here and see if the messages are acked,
for curr := ackMgr.ackLevel + 1; curr <= stop; curr++ {
if addrs, ok := ackMgr.addrs[curr]; ok {
if addrs.acked {
if msg, ok := ackMgr.addrs[curr]; ok {
if msg.acked {
acked++
} else {
unacked++
Expand Down
5 changes: 0 additions & 5 deletions services/outputhost/extcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,11 +307,6 @@ func (extCache *extentCache) loadReplicaStream(startAddress int64, startSequence
return
}

// successfully opened read stream on the replica; save this index
if startSequence != 0 {
extCache.ackMgr.addLevelOffset(startSequence) // Let ack manager know that the first message received is not sequence zero
}

logger.WithField(`startIndex`, startIndex).Debug(`opened read stream`)
pickedIndex = startIndex
replicaConnectionName := fmt.Sprintf(`replicaConnection{Extent: %s, Store: %s}`, extUUID, storeUUID)
Expand Down
2 changes: 1 addition & 1 deletion services/outputhost/outputhost.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ func (h *OutputHost) processAcks(ackIds []string, isNack bool) (invalidIDs []str
}

// let the ackMgr know; from the perspective of the ackManager, ack == nack
if err = ackMgr.acknowledgeMessage(ackID, seqNum, ackIDObj.Address, isNack); err == nil {
if err = ackMgr.acknowledgeMessage(ackID, ackIndex(seqNum), ackIDObj.Address, isNack); err == nil {
continue
} else {
h.logger.WithFields(bark.Fields{
Expand Down
4 changes: 2 additions & 2 deletions services/outputhost/outputhost_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -968,7 +968,7 @@ func (s *OutputHostSuite) TestOutputAckMgrReset() {
s.mockCons.On("Write", mock.Anything).Return(fmt.Errorf("breaking write pipe"))

// 8. get the ackMgr
var readLevel common.SequenceNumber
var readLevel ackIndex
var ackMgr *ackManager
outputHost.cgMutex.RLock()
if cg, ok := outputHost.cgCache[cgUUID]; ok {
Expand All @@ -994,7 +994,7 @@ func (s *OutputHostSuite) TestOutputAckMgrReset() {
outputHost.Shutdown()

// 9. Make sure we reset the readLevel
var newReadLevel common.SequenceNumber
var newReadLevel ackIndex
ackMgr.lk.RLock()
newReadLevel = ackMgr.readLevel
ackMgr.lk.RUnlock()
Expand Down
16 changes: 8 additions & 8 deletions services/outputhost/replicaconnection.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,28 +225,28 @@ func (conn *replicaConnection) readMessagesPump() {
case store.ReadMessageContentType_MESSAGE:

msg := rmc.GetMessage()
correctSequenceNumber := common.SequenceNumber(msg.Message.GetSequenceNumber())
msgSeqNum := common.SequenceNumber(msg.Message.GetSequenceNumber())

// XXX: Sequence number check to make sure we get monotonically increasing
// sequence number.
// We just log and move forward
// XXX: Note we skip the first message check here because we can start from
// a bigger sequence number in case of restarts
if conn.extCache.destType != shared.DestinationType_TIMER {
if lastSeqNum+1 != int64(correctSequenceNumber) {
if lastSeqNum+1 != int64(msgSeqNum) {
// FIXME: add metric to help alert this case
expectedSeqNum := 1 + lastSeqNum
skippedMessages := int64(correctSequenceNumber) - lastSeqNum
skippedMessages := int64(msgSeqNum) - lastSeqNum

conn.logger.WithFields(bark.Fields{
"correctSequenceNumber": correctSequenceNumber,
"expectedSeqNum": expectedSeqNum,
"skippedMessages": skippedMessages,
"msgSeqNum": msgSeqNum,
"expectedSeqNum": expectedSeqNum,
"skippedMessages": skippedMessages,
}).Error("sequence number out of order")
}
} else {
// T471157 For timers, do not signal discontinuities to ack manager, since discontinuities are frequent
correctSequenceNumber = 0
msgSeqNum = 0
}

// update the lastSeqNum to this value
Expand All @@ -257,7 +257,7 @@ func (conn *replicaConnection) readMessagesPump() {
cMsg.EnqueueTimeUtc = msg.Message.EnqueueTimeUtc
cMsg.Payload = msg.Message.Payload

cMsg.AckId = common.StringPtr(conn.extCache.ackMgr.getNextAckID(msg.GetAddress(), correctSequenceNumber))
cMsg.AckId = common.StringPtr(conn.extCache.ackMgr.getNextAckID(storeHostAddress(msg.GetAddress()), msgSeqNum))
// write the message to the msgsCh so that it can be delivered
// after being stored on the cache.
// 1. either there are no listeners
Expand Down