Skip to content

Commit

Permalink
Get rid of mutex in matching/liveness and reduce test duration
Browse files Browse the repository at this point in the history
  • Loading branch information
taylanisikdemir committed Apr 18, 2024
1 parent e98ce90 commit 5a2e133
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 89 deletions.
70 changes: 28 additions & 42 deletions service/matching/liveness.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,90 +36,76 @@ type (
status int32
timeSource clock.TimeSource
ttl time.Duration
// internal shutdown channel
shutdownChan chan struct{}

// stopCh is used to signal the liveness to stop
stopCh chan struct{}
// wg is used to wait for the liveness to stop
wg sync.WaitGroup

// broadcast shutdown functions
broadcastShutdownFn func()

sync.Mutex
lastEventTime time.Time
lastEventTimeNano int64
}
)

var _ common.Daemon = (*liveness)(nil)

func newLiveness(
timeSource clock.TimeSource,
ttl time.Duration,
broadcastShutdownFn func(),
) *liveness {
func newLiveness(timeSource clock.TimeSource, ttl time.Duration, broadcastShutdownFn func()) *liveness {
return &liveness{
status: common.DaemonStatusInitialized,
timeSource: timeSource,
ttl: ttl,
shutdownChan: make(chan struct{}),

status: common.DaemonStatusInitialized,
timeSource: timeSource,
ttl: ttl,
stopCh: make(chan struct{}),
broadcastShutdownFn: broadcastShutdownFn,

lastEventTime: timeSource.Now().UTC(),
lastEventTimeNano: timeSource.Now().UnixNano(),
}
}

func (l *liveness) Start() {
if !atomic.CompareAndSwapInt32(
&l.status,
common.DaemonStatusInitialized,
common.DaemonStatusStarted,
) {
if !atomic.CompareAndSwapInt32(&l.status, common.DaemonStatusInitialized, common.DaemonStatusStarted) {
return
}

l.wg.Add(1)
go l.eventLoop()
}

func (l *liveness) Stop() {
if !atomic.CompareAndSwapInt32(
&l.status,
common.DaemonStatusStarted,
common.DaemonStatusStopped,
) {
if !atomic.CompareAndSwapInt32(&l.status, common.DaemonStatusStarted, common.DaemonStatusStopped) {
return
}

close(l.shutdownChan)
close(l.stopCh)
l.broadcastShutdownFn()
l.wg.Wait()
}

func (l *liveness) eventLoop() {
ttlTimer := time.NewTicker(l.ttl)
defer ttlTimer.Stop()
defer l.wg.Done()
checkTimer := time.NewTicker(l.ttl / 2)
defer checkTimer.Stop()

for {
select {
case <-ttlTimer.C:
case <-checkTimer.C:
if !l.isAlive() {
l.Stop()
}

case <-l.shutdownChan:
case <-l.stopCh:
return
}
}
}

func (l *liveness) isAlive() bool {
l.Lock()
defer l.Unlock()
return l.lastEventTime.Add(l.ttl).After(l.timeSource.Now())
now := l.timeSource.Now().UnixNano()
lastUpdate := atomic.LoadInt64(&l.lastEventTimeNano)
return now-lastUpdate < int64(l.ttl)
}

func (l *liveness) markAlive(
now time.Time,
) {
l.Lock()
defer l.Unlock()
if l.lastEventTime.Before(now) {
l.lastEventTime = now.UTC()
}
func (l *liveness) markAlive() {
now := l.timeSource.Now().UnixNano()
atomic.StoreInt64(&l.lastEventTimeNano, now)
}
74 changes: 30 additions & 44 deletions service/matching/liveness_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type (
timeSource clock.MockedTimeSource
ttl time.Duration
shutdownFlag int32
liveness *liveness
}
)

Expand All @@ -49,75 +50,60 @@ func TestLivenessSuite(t *testing.T) {
suite.Run(t, s)
}

func (s *livenessSuite) SetupSuite() {
}

func (s *livenessSuite) TearDownSuite() {
}

func (s *livenessSuite) SetupTest() {
s.Assertions = require.New(s.T())

s.ttl = 2 * time.Second
s.ttl = 500 * time.Millisecond
s.timeSource = clock.NewMockedTimeSource()

s.shutdownFlag = 0
}

func (s *livenessSuite) TearDownTest() {

s.liveness = newLiveness(s.timeSource, s.ttl, func() { atomic.CompareAndSwapInt32(&s.shutdownFlag, 0, 1) })
}

func (s *livenessSuite) TestIsAlive_No() {
liveness := newLiveness(s.timeSource, s.ttl, func() { atomic.CompareAndSwapInt32(&s.shutdownFlag, 0, 1) })
s.timeSource.Advance(s.ttl * 2)
alive := liveness.isAlive()
s.timeSource.Advance(s.ttl)
alive := s.liveness.isAlive()
s.False(alive)
}

func (s *livenessSuite) TestIsAlive_Yes() {
liveness := newLiveness(s.timeSource, s.ttl, func() { atomic.CompareAndSwapInt32(&s.shutdownFlag, 0, 1) })
s.timeSource.Advance(s.ttl / 2)
alive := liveness.isAlive()
alive := s.liveness.isAlive()
s.True(alive)
}

func (s *livenessSuite) TestMarkAlive_Noop() {
liveness := newLiveness(s.timeSource, s.ttl, func() { atomic.CompareAndSwapInt32(&s.shutdownFlag, 0, 1) })
lastEventTime := liveness.lastEventTime
newEventTime := s.timeSource.Now().Add(-1)
liveness.markAlive(newEventTime)
s.True(lastEventTime.Equal(liveness.lastEventTime))
lastEventTime := s.liveness.lastEventTimeNano
// not advanding time so markAlive will be a noop
s.liveness.markAlive()
s.Equal(lastEventTime, s.liveness.lastEventTimeNano)
}

func (s *livenessSuite) TestMarkAlive_Updated() {
liveness := newLiveness(s.timeSource, s.ttl, func() { atomic.CompareAndSwapInt32(&s.shutdownFlag, 0, 1) })
newEventTime := s.timeSource.Now().Add(1)
liveness.markAlive(newEventTime)
s.True(newEventTime.Equal(liveness.lastEventTime))
s.timeSource.Advance(time.Duration(1))
s.liveness.markAlive()
s.Equal(s.timeSource.Now().UnixNano(), s.liveness.lastEventTimeNano)
}

func (s *livenessSuite) TestEventLoop_Noop() {
liveness := newLiveness(s.timeSource, s.ttl, func() { atomic.CompareAndSwapInt32(&s.shutdownFlag, 0, 1) })
liveness.Start()

s.timeSource.Advance(s.ttl * 4)
liveness.markAlive(s.timeSource.Now())

timer := time.NewTimer(s.ttl * 2)
select {
case <-liveness.shutdownChan:
s.Fail("should not shutdown")
case <-timer.C:
s.Equal(int32(0), atomic.LoadInt32(&s.shutdownFlag))
}
s.liveness.Start()
defer s.liveness.Stop()

// advance time ttl/2 and mark alive
s.timeSource.Advance(s.ttl / 2)
s.liveness.markAlive()
s.True(s.liveness.isAlive())

// advance time ttl/2 more and validate still alive
s.timeSource.Advance(s.ttl / 2)
time.Sleep(100 * time.Millisecond) // give event loop time to run
s.True(s.liveness.isAlive())
s.Equal(int32(0), atomic.LoadInt32(&s.shutdownFlag))
}

func (s *livenessSuite) TestEventLoop_Shutdown() {
liveness := newLiveness(s.timeSource, s.ttl, func() { atomic.CompareAndSwapInt32(&s.shutdownFlag, 0, 1) })
liveness.Start()
s.liveness.Start()
defer s.liveness.Stop()

s.timeSource.Advance(s.ttl * 4)
<-liveness.shutdownChan
s.timeSource.Advance(s.ttl)
<-s.liveness.stopCh
s.Equal(int32(1), atomic.LoadInt32(&s.shutdownFlag))
}
11 changes: 8 additions & 3 deletions service/matching/taskListManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,12 @@ func newTaskListManager(
taskListTypeMetricScope.UpdateGauge(metrics.PollerPerTaskListCounter,
float64(len(tlMgr.pollerHistory.getPollerInfo(time.Time{}))))
})
tlMgr.liveness = newLiveness(clock.NewRealTimeSource(), taskListConfig.IdleTasklistCheckInterval(), tlMgr.Stop)

livenessInterval := taskListConfig.IdleTasklistCheckInterval()
tlMgr.liveness = newLiveness(clock.NewRealTimeSource(), livenessInterval, func() {
tlMgr.logger.Info("Task list manager stopping because no recent events", tag.Dynamic("interval", livenessInterval))
tlMgr.Stop()
})
var isolationGroups []string
if tlMgr.isIsolationMatcherEnabled() {
isolationGroups = config.AllIsolationGroups
Expand Down Expand Up @@ -261,7 +266,7 @@ func (c *taskListManagerImpl) AddTask(ctx context.Context, params addTaskParams)
}
if params.forwardedFrom == "" {
// request sent by history service
c.liveness.markAlive(time.Now())
c.liveness.markAlive()
}
var syncMatch bool
_, err := c.executeWithRetry(func() (interface{}, error) {
Expand Down Expand Up @@ -347,7 +352,7 @@ func (c *taskListManagerImpl) GetTask(
c.Stop()
return nil, ErrNoTasks
}
c.liveness.markAlive(time.Now())
c.liveness.markAlive()
task, err := c.getTask(ctx, maxDispatchPerSecond)
if err != nil {
return nil, fmt.Errorf("couldn't get task: %w", err)
Expand Down

0 comments on commit 5a2e133

Please sign in to comment.