Skip to content

Commit

Permalink
Move to new cache lib
Browse files Browse the repository at this point in the history
  • Loading branch information
weeco committed Aug 25, 2021
1 parent 02dbc3e commit c4e5e82
Showing 1 changed file with 60 additions and 30 deletions.
90 changes: 60 additions & 30 deletions e2e/message_tracker.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package e2e

import (
"fmt"
"github.com/ReneKroon/ttlcache/v2"
"strconv"
"time"

goCache "github.com/patrickmn/go-cache"
"go.uber.org/zap"
)

Expand All @@ -20,52 +21,82 @@ import (
type messageTracker struct {
svc *Service
logger *zap.Logger
cache *goCache.Cache
cache *ttlcache.Cache
}

func newMessageTracker(svc *Service) *messageTracker {
defaultExpirationTime := svc.config.Consumer.RoundtripSla
cleanupInterval := 1 * time.Second
defaultExpirationDuration := svc.config.Consumer.RoundtripSla
cache := ttlcache.NewCache()
cache.SetTTL(defaultExpirationDuration)

t := &messageTracker{
svc: svc,
logger: svc.logger.Named("message_tracker"),
cache: goCache.New(defaultExpirationTime, cleanupInterval),
cache: cache,
}

t.cache.OnEvicted(func(key string, item interface{}) {
t.onMessageExpired(key, item.(*EndToEndMessage))
t.cache.SetExpirationReasonCallback(func(key string, reason ttlcache.EvictionReason, value interface{}) {
t.onMessageExpired(key, reason, value.(*EndToEndMessage))
})

return t
}

func (t *messageTracker) addToTracker(msg *EndToEndMessage) {
t.cache.SetDefault(msg.MessageID, msg)
t.cache.Set(msg.MessageID, msg)
}

func (t *messageTracker) removeFromTracker(messageID string) {
t.cache.Delete(messageID)
// updateItemIfExists only updates a message if it still exists in the cache. The remaining time to live will not
// be refreshed.
// If it doesn't exist an ttlcache.ErrNotFound error will be returned.
func (t *messageTracker) updateItemIfExists(msg *EndToEndMessage) error {
_, ttl, err := t.cache.GetWithTTL(msg.MessageID)
if err != nil {
if err == ttlcache.ErrNotFound {
return err
}
panic(err)
}

// Because the returned TTL is set to the original TTL duration (and not the remaining TTL) we have to calculate
// the remaining TTL now as we want to updat the existing cache item without changing the remaining time to live.
expiryTimestamp := msg.creationTime().Add(ttl)
remainingTTL := expiryTimestamp.Sub(time.Now())
if remainingTTL < 0 {
// This entry should have been deleted already. Race condition.
return ttlcache.ErrNotFound
}

t.cache.SetWithTTL(msg.MessageID, msg, remainingTTL)
return nil
}

// removeFromTracker removes an entry from the cache. If the key does not exist it will return an ttlcache.ErrNotFound error.
func (t *messageTracker) removeFromTracker(messageID string) error {
return t.cache.Remove(messageID)
}

func (t *messageTracker) onMessageArrived(arrivedMessage *EndToEndMessage) {
cm, found := t.cache.Get(arrivedMessage.MessageID)
if !found {
// message expired and was removed from the cache
// it arrived too late, nothing to do here...
return
cm, err := t.cache.Get(arrivedMessage.MessageID)
if err != nil {
if err == ttlcache.ErrNotFound {
// message expired and was removed from the cache
// it arrived too late, nothing to do here...
return
} else {
panic(fmt.Errorf("failed to get message from cache: %w", err))
}
}

msg := cm.(*EndToEndMessage)

expireTime := arrivedMessage.creationTime().Add(t.svc.config.Consumer.RoundtripSla)
isOnTime := time.Now().Before(expireTime)
expireTime := msg.creationTime().Add(t.svc.config.Consumer.RoundtripSla)
isExpired := time.Now().Before(expireTime)
latency := time.Now().Sub(msg.creationTime())

if !isOnTime {
if !isExpired {
// Message arrived late, but was still in cache. We don't increment the lost counter here because eventually
// it will be evicted from the cache. This case should only pop up if the sla time is exceeded, but if the
// item has not been evicted from the cache yet (because we clean it only every second).
// item has not been evicted from the cache yet.
t.logger.Info("message arrived late, will be marked as a lost message",
zap.Int64("delay_ms", latency.Milliseconds()),
zap.String("id", msg.MessageID))
Expand All @@ -77,27 +108,26 @@ func (t *messageTracker) onMessageArrived(arrivedMessage *EndToEndMessage) {
t.svc.messagesReceived.WithLabelValues(pID).Inc()
t.svc.roundtripLatency.WithLabelValues(pID).Observe(latency.Seconds())

// We mark the message as arrived so that we won't mark the message as lost and overwrite that modified message
// into the cache.
msg.hasArrived = true
t.cache.Set(msg.MessageID, msg, 0)
t.cache.Delete(msg.MessageID)
// Remove message from cache, so that we don't track it any longer and won't mark it as lost when the entry expires.
t.cache.Remove(msg.MessageID)
}

func (t *messageTracker) onMessageExpired(_ string, msg *EndToEndMessage) {
// Because `t.cache.Delete` will invoke the onEvicted method we have to expect some calls to this function
// even though messages have arrived. Thus, we quit early if we receive such a message.
if msg.hasArrived || msg.failedToProduce {
func (t *messageTracker) onMessageExpired(_ string, reason ttlcache.EvictionReason, value interface{}) {
if reason == ttlcache.Removed {
// We are not interested in messages that have been removed by us!
return
}

msg := value.(*EndToEndMessage)

created := msg.creationTime()
age := time.Since(created)
t.svc.lostMessages.WithLabelValues(strconv.Itoa(msg.partition)).Inc()

t.logger.Info("message lost/expired",
t.logger.Info("message expired/lost",
zap.Int64("age_ms", age.Milliseconds()),
zap.Int("partition", msg.partition),
zap.String("message_id", msg.MessageID),
zap.Bool("successfully_produced", msg.state == EndToEndeMessageStateProducedSuccessfully),
)
}

0 comments on commit c4e5e82

Please sign in to comment.