From 12329c1a3befa259e26f5c2d49a4a896c39a532b Mon Sep 17 00:00:00 2001 From: Ivan Trubach Date: Mon, 27 Dec 2021 02:11:50 +0300 Subject: [PATCH 1/6] fix: update ticker moment ID on tick MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Stopping a ticker after the first tick before this change was a no-op since the Ticker’s moment ID was not updated. --- ticker.go | 18 ++++++++++++++---- time.go | 35 ++++++++++++++++++++--------------- time_test.go | 47 +++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 81 insertions(+), 19 deletions(-) diff --git a/ticker.go b/ticker.go index ca9a6e3..c5bf583 100644 --- a/ticker.go +++ b/ticker.go @@ -1,7 +1,6 @@ package neo import ( - "sync/atomic" "time" ) @@ -9,7 +8,7 @@ type ticker struct { time *Time ch chan time.Time id int - dur int64 + dur time.Duration } func (t *ticker) C() <-chan time.Time { @@ -21,6 +20,17 @@ func (t *ticker) Stop() { } func (t *ticker) Reset(d time.Duration) { - atomic.StoreInt64(&t.dur, int64(d)) - t.time.resetTimer(d, t.id, t.ch) + t.time.resetTicker(t, d) +} + +// do is the ticker’s moment callback. It sends the now time to the underlying +// channel and plans a new moment for the next tick. Note that do runs under +// Time’s lock. +func (t *ticker) do(now time.Time) { + t.ch <- now + + // It is safe to mutate ID without a lock since at most one + // moment exists for the given ticker and moments run under + // the Time’s lock. + t.id = t.time.planUnlocked(now.Add(t.dur), t.do) } diff --git a/time.go b/time.go index ca627c2..3386abf 100644 --- a/time.go +++ b/time.go @@ -3,7 +3,6 @@ package neo import ( "sort" "sync" - "sync/atomic" "time" ) @@ -54,23 +53,12 @@ func (t *Time) Timer(d time.Duration) Timer { } func (t *Time) Ticker(d time.Duration) Ticker { - done := make(chan time.Time, 1) - tick := &ticker{ time: t, - ch: done, - dur: int64(d), - } - - var cb func(now time.Time) - cb = func(now time.Time) { - done <- now - - dur := time.Duration(atomic.LoadInt64(&tick.dur)) - t.planUnlocked(now.Add(dur), cb) + ch: make(chan time.Time, 1), + dur: d, } - tick.id = t.plan(t.When(d), cb) - + tick.id = t.plan(t.When(d), tick.do) return tick } @@ -118,6 +106,23 @@ func (t *Time) resetTimer(d time.Duration, id int, ch chan time.Time) { t.moments[id] = m } +// resetTicker resets the moment of the given ticker to run after the duration d. +func (t *Time) resetTicker(tick *ticker, d time.Duration) { + t.mux.Lock() + defer t.mux.Unlock() + + tick.dur = d + id := tick.id + + m, ok := t.moments[id] + if !ok { + m = moment{do: tick.do} + } + + m.when = t.now.Add(d) + t.moments[id] = m +} + // tick applies all scheduled temporal effects. // // The mux lock is expected. diff --git a/time_test.go b/time_test.go index 6c88b5f..234ef5f 100644 --- a/time_test.go +++ b/time_test.go @@ -171,3 +171,50 @@ func TestTime_Ticker(t *testing.T) { } } } + +func TestTime_TickerStop(t *testing.T) { + const interval = time.Second + + now := time.Date(2049, 5, 6, 23, 55, 11, 1034, time.UTC) + sim := NewTime(now) + + ticker := sim.Ticker(interval) + defer ticker.Stop() + + // Tick once and stop ticker. + sim.Travel(interval) + select { + case <-ticker.C(): + default: + t.Error("unexpected state") + } + ticker.Stop() + + // Advance time by the tick interval and check that the tick was not + // sent on the channel. + sim.Travel(interval) + select { + case <-ticker.C(): + t.Error("unexpected state") + default: + } + + // Check that we can reset the ticker after stopping it and there are no + // erroneous ticks. + ticker.Reset(interval) + for range [3]struct{}{} { + select { + case <-ticker.C(): + t.Error("unexpected done") + default: + } + + sim.Travel(interval) + + select { + case <-ticker.C(): + default: + t.Error("unexpected state") + } + } +} From 797add8cc06c5f52b18897ed305caae9a9a2c3cb Mon Sep 17 00:00:00 2001 From: Ivan Trubach Date: Mon, 27 Dec 2021 03:43:06 +0300 Subject: [PATCH 2/6] refactor: implement do method for timer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This change adds a do method for timer (similar to ticker’s do method). It also adds a test case to verifies that backwards compatibility is preserved for a quirk in the current Observe and ticker implementation. That is, a tick triggers the observeUnlocked method. I’m not aware of any piece of code actually relying on this behavior, but it’s better to change that separately if we want to. Additionally, we introduce a small convention for Time’s methods: if the method does not acquire the lock, it has Unlocked suffix. A notable exception is the do method on timer and ticker that is expected to run under Time’s lock when scheduled temporal effects are triggered. --- ticker.go | 17 +++++++--- time.go | 95 ++++++++++++++++++++++++---------------------------- time_test.go | 27 +++++++++++++++ timer.go | 16 ++++++--- 4 files changed, 94 insertions(+), 61 deletions(-) diff --git a/ticker.go b/ticker.go index c5bf583..32a4b76 100644 --- a/ticker.go +++ b/ticker.go @@ -16,11 +16,11 @@ func (t *ticker) C() <-chan time.Time { } func (t *ticker) Stop() { - t.time.stopTimer(t.id) + t.time.stop(t.id) } func (t *ticker) Reset(d time.Duration) { - t.time.resetTicker(t, d) + t.time.reset(d, t.id, t.do, &t.dur) } // do is the ticker’s moment callback. It sends the now time to the underlying @@ -29,8 +29,15 @@ func (t *ticker) Reset(d time.Duration) { func (t *ticker) do(now time.Time) { t.ch <- now - // It is safe to mutate ID without a lock since at most one - // moment exists for the given ticker and moments run under - // the Time’s lock. + // It is safe to mutate ID without a lock since at most one moment + // exists for the given ticker and moments run under the Time’s lock. + // + // Additionally, while we probably should be resetting the moment with + // the initial ticker’s ID, it is not possible since that would break + // backwards compatibility for users that rely on Time’s Observe method + // to observe ticks. + // + // t.time.resetUnlocked(t.dur, t.id, t.do, nil) + // t.id = t.time.planUnlocked(now.Add(t.dur), t.do) } diff --git a/time.go b/time.go index 3386abf..2baa057 100644 --- a/time.go +++ b/time.go @@ -32,6 +32,8 @@ func NewTime(now time.Time) *Time { // // All methods are goroutine-safe. type Time struct { + // mux guards internal state. Note that all methods without Unlocked + // suffix acquire mux. mux sync.Mutex now time.Time momentID int @@ -41,25 +43,22 @@ type Time struct { } func (t *Time) Timer(d time.Duration) Timer { - done := make(chan time.Time, 1) - - return timer{ + tt := &timer{ time: t, - ch: done, - id: t.plan(t.When(d), func(now time.Time) { - done <- now - }), + ch: make(chan time.Time, 1), } + tt.id = t.plan(t.When(d), tt.do) + return tt } func (t *Time) Ticker(d time.Duration) Ticker { - tick := &ticker{ + tt := &ticker{ time: t, ch: make(chan time.Time, 1), dur: d, } - tick.id = t.plan(t.When(d), tick.do) - return tick + tt.id = t.plan(t.When(d), tt.do) + return tt } func (t *Time) planUnlocked(when time.Time, do func(now time.Time)) int { @@ -69,7 +68,7 @@ func (t *Time) planUnlocked(when time.Time, do func(now time.Time)) int { when: when, do: do, } - t.observe() + t.observeUnlocked() return id } @@ -80,7 +79,9 @@ func (t *Time) plan(when time.Time, do func(now time.Time)) int { return t.planUnlocked(when, do) } -func (t *Time) stopTimer(id int) bool { +// stop removes the moment with the given ID from the list of scheduled moments. +// It returns true if a moment existed for the given ID, otherwise it is no-op. +func (t *Time) stop(id int) bool { t.mux.Lock() defer t.mux.Unlock() @@ -89,44 +90,32 @@ func (t *Time) stopTimer(id int) bool { return ok } -func (t *Time) resetTimer(d time.Duration, id int, ch chan time.Time) { +// reset adjusts the moment with the given ID to run after the d duration. It +// creates a new moment if the moment does not already exist. If durp pointer +// is not nil, it is updated with d value while reset is holding Time’s lock. +func (t *Time) reset(d time.Duration, id int, do func(now time.Time), durp *time.Duration) { t.mux.Lock() defer t.mux.Unlock() - - m, ok := t.moments[id] - if !ok { - m = moment{ - do: func(now time.Time) { - ch <- now - }, - } - } - - m.when = t.now.Add(d) - t.moments[id] = m + t.resetUnlocked(d, id, do, durp) } -// resetTicker resets the moment of the given ticker to run after the duration d. -func (t *Time) resetTicker(tick *ticker, d time.Duration) { - t.mux.Lock() - defer t.mux.Unlock() - - tick.dur = d - id := tick.id +// resetUnlocked is like reset but does not acquire the Time’s lock. +func (t *Time) resetUnlocked(d time.Duration, id int, do func(now time.Time), durp *time.Duration) { + if durp != nil { + *durp = d + } m, ok := t.moments[id] if !ok { - m = moment{do: tick.do} + m = moment{do: do} } m.when = t.now.Add(d) t.moments[id] = m } -// tick applies all scheduled temporal effects. -// -// The mux lock is expected. -func (t *Time) tick() moments { +// tickUnlocked applies all scheduled temporal effects. +func (t *Time) tickUnlocked() moments { var past moments for id, m := range t.moments { @@ -144,9 +133,8 @@ func (t *Time) tick() moments { // Now returns the current time. func (t *Time) Now() time.Time { t.mux.Lock() - now := t.now - t.mux.Unlock() - return now + defer t.mux.Unlock() + return t.now } // Set travels to specified time. @@ -154,9 +142,8 @@ func (t *Time) Now() time.Time { // Also triggers temporal effects. func (t *Time) Set(now time.Time) { t.mux.Lock() - t.now = now - t.mux.Unlock() - t.tick().do(now) + defer t.mux.Unlock() + t.setUnlocked(now) } // Travel adds duration to current time and returns result. @@ -164,10 +151,9 @@ func (t *Time) Set(now time.Time) { // Also triggers temporal effects. func (t *Time) Travel(d time.Duration) time.Time { t.mux.Lock() + defer t.mux.Unlock() now := t.now.Add(d) - t.now = now - t.tick().do(now) - t.mux.Unlock() + t.setUnlocked(now) return now } @@ -176,13 +162,19 @@ func (t *Time) Travel(d time.Duration) time.Time { // Also triggers temporal effects. func (t *Time) TravelDate(years, months, days int) time.Time { t.mux.Lock() + defer t.mux.Unlock() now := t.now.AddDate(years, months, days) - t.now = now - t.tick().do(now) - t.mux.Unlock() + t.setUnlocked(now) return now } +// setUnlocked sets the current time to the given now time and triggers temporal +// effects. +func (t *Time) setUnlocked(now time.Time) { + t.now = now + t.tickUnlocked().do(now) +} + // Sleep blocks until duration is elapsed. func (t *Time) Sleep(d time.Duration) { <-t.After(d) } @@ -201,7 +193,8 @@ func (t *Time) After(d time.Duration) <-chan time.Time { return done } -// Observe return channel that closes on clock calls. +// Observe return channel that closes on clock calls. The current implementation +// also closes the channel on Ticker’s ticks. func (t *Time) Observe() <-chan struct{} { observer := make(chan struct{}) t.mux.Lock() @@ -211,7 +204,7 @@ func (t *Time) Observe() <-chan struct{} { return observer } -func (t *Time) observe() { +func (t *Time) observeUnlocked() { for _, observer := range t.observers { close(observer) } diff --git a/time_test.go b/time_test.go index 234ef5f..4f61581 100644 --- a/time_test.go +++ b/time_test.go @@ -218,3 +218,30 @@ func TestTime_TickerStop(t *testing.T) { } } } + +func TestTime_ObserveTick(t *testing.T) { + const interval = time.Second + + now := time.Date(2049, 5, 6, 23, 55, 11, 1034, time.UTC) + sim := NewTime(now) + + ticker := sim.Ticker(interval) + defer ticker.Stop() + + // Check that we do not break existing users of the Time implementation: + // the observe channel must be closed on each tick. + for range [3]struct{}{} { + observe := sim.Observe() + sim.Travel(interval) + select { + case <-ticker.C(): + default: + t.Error("unexpected state") + } + select { + case <-observe: + default: + t.Error("missing observation on tick") + } + } +} diff --git a/timer.go b/timer.go index d6edcc5..6ce4fee 100644 --- a/timer.go +++ b/timer.go @@ -8,14 +8,20 @@ type timer struct { id int } -func (t timer) C() <-chan time.Time { +func (t *timer) C() <-chan time.Time { return t.ch } -func (t timer) Stop() bool { - return t.time.stopTimer(t.id) +func (t *timer) Stop() bool { + return t.time.stop(t.id) } -func (t timer) Reset(d time.Duration) { - t.time.resetTimer(d, t.id, t.ch) +func (t *timer) Reset(d time.Duration) { + t.time.reset(d, t.id, t.do, nil) +} + +// do is the timer’s moment callback. It sends the now time to the underlying +// channel. Note that do runs under Time’s lock. +func (t *timer) do(now time.Time) { + t.ch <- now } From edf377ba201fb9b4d711b09d3c6bb06e9f71e086 Mon Sep 17 00:00:00 2001 From: Ivan Trubach Date: Mon, 27 Dec 2021 04:56:10 +0300 Subject: [PATCH 3/6] chore: skip flaky TestNetPingDeadline test --- net_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/net_test.go b/net_test.go index 91ee926..fddc0af 100644 --- a/net_test.go +++ b/net_test.go @@ -126,6 +126,8 @@ func TestNetPing(t *testing.T) { } func TestNetPingDeadline(t *testing.T) { + t.Skip("skipping flaky test, see https://github.com/gotd/neo/pull/13#issuecomment-1001285136") + nt := &Net{ peers: make(map[string]*PacketConn), } From 36f739957595b05defb6d9dbd4270beb613b59f0 Mon Sep 17 00:00:00 2001 From: Ivan Trubach Date: Mon, 27 Dec 2021 05:08:18 +0300 Subject: [PATCH 4/6] chore: added test for Sleep method --- time_test.go | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/time_test.go b/time_test.go index 4f61581..69b6853 100644 --- a/time_test.go +++ b/time_test.go @@ -1,6 +1,7 @@ package neo import ( + "sync" "testing" "time" @@ -245,3 +246,22 @@ func TestTime_ObserveTick(t *testing.T) { } } } + +func TestTime_Sleep(t *testing.T) { + const interval = time.Second + + now := time.Date(2049, 5, 6, 23, 55, 11, 1034, time.UTC) + sim := NewTime(now) + + observe := sim.Observe() + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + sim.Sleep(interval) + }() + <-observe + sim.Travel(interval) + wg.Wait() +} From 6047b6d28fe0215a36c21c3ac9c58bd840e3813f Mon Sep 17 00:00:00 2001 From: Ivan Trubach Date: Mon, 27 Dec 2021 05:13:48 +0300 Subject: [PATCH 5/6] chore: added test for Travel with steps --- time_test.go | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/time_test.go b/time_test.go index 69b6853..7e18fc1 100644 --- a/time_test.go +++ b/time_test.go @@ -265,3 +265,34 @@ func TestTime_Sleep(t *testing.T) { sim.Travel(interval) wg.Wait() } + +func TestTime_TravelSteps(t *testing.T) { + const ( + step = time.Second + n = 5 + ) + + now := time.Date(2049, 5, 6, 23, 55, 11, 1034, time.UTC) + sim := NewTime(now) + + timer := sim.Timer(n * step) + defer timer.Stop() + + // Make all steps except the last one. + for i := 1; i < n; i++ { + sim.Travel(step) + select { + case <-timer.C(): + t.Fatal("unexpected state") + default: + } + } + + // Make the last step. + sim.Travel(step) + select { + case <-timer.C(): + default: + t.Fatal("unexpected state") + } +} From ac808570f28667a15bb98b0fcfb17c7c8bde3075 Mon Sep 17 00:00:00 2001 From: Ivan Trubach Date: Mon, 27 Dec 2021 05:29:50 +0300 Subject: [PATCH 6/6] fix: do not create a new moment for each tick MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This change avoid creating a new moment for each Ticker’s tick but still maintains backwards compatibility for tick observeration. It also fixes an accidentally introduced data race for id field between Reset and do methods in ticker. --- ticker.go | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/ticker.go b/ticker.go index 32a4b76..66cd062 100644 --- a/ticker.go +++ b/ticker.go @@ -31,13 +31,10 @@ func (t *ticker) do(now time.Time) { // It is safe to mutate ID without a lock since at most one moment // exists for the given ticker and moments run under the Time’s lock. - // - // Additionally, while we probably should be resetting the moment with - // the initial ticker’s ID, it is not possible since that would break - // backwards compatibility for users that rely on Time’s Observe method - // to observe ticks. - // - // t.time.resetUnlocked(t.dur, t.id, t.do, nil) - // - t.id = t.time.planUnlocked(now.Add(t.dur), t.do) + t.time.resetUnlocked(t.dur, t.id, t.do, nil) + + // Ticker used to create a new moment for each tick and that would close + // the observe channel. Maintain backwards compatibility for users that + // may rely on this behavior. + t.time.observeUnlocked() }