Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

fix: do not tick after stop #13

Merged
merged 6 commits into from
Dec 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions net_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ func TestNetPing(t *testing.T) {
}

func TestNetPingDeadline(t *testing.T) {
t.Skip("skipping flaky test, see https://github.com/gotd/neo/pull/13#issuecomment-1001285136")

nt := &Net{
peers: make(map[string]*PacketConn),
}
Expand Down
24 changes: 19 additions & 5 deletions ticker.go
Original file line number Diff line number Diff line change
@@ -1,26 +1,40 @@
package neo

import (
"sync/atomic"
"time"
)

type ticker struct {
time *Time
ch chan time.Time
id int
dur int64
dur time.Duration
}

func (t *ticker) C() <-chan time.Time {
return t.ch
}

func (t *ticker) Stop() {
t.time.stopTimer(t.id)
t.time.stop(t.id)
}

func (t *ticker) Reset(d time.Duration) {
atomic.StoreInt64(&t.dur, int64(d))
t.time.resetTimer(d, t.id, t.ch)
t.time.reset(d, t.id, t.do, &t.dur)
}

// do is the ticker’s moment callback. It sends the now time to the underlying
// channel and plans a new moment for the next tick. Note that do runs under
// Time’s lock.
func (t *ticker) do(now time.Time) {
t.ch <- now

// It is safe to mutate ID without a lock since at most one moment
// exists for the given ticker and moments run under the Time’s lock.
t.time.resetUnlocked(t.dur, t.id, t.do, nil)

// Ticker used to create a new moment for each tick and that would close
// the observe channel. Maintain backwards compatibility for users that
// may rely on this behavior.
t.time.observeUnlocked()
}
98 changes: 48 additions & 50 deletions time.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package neo
import (
"sort"
"sync"
"sync/atomic"
"time"
)

Expand Down Expand Up @@ -33,6 +32,8 @@ func NewTime(now time.Time) *Time {
//
// All methods are goroutine-safe.
type Time struct {
// mux guards internal state. Note that all methods without Unlocked
// suffix acquire mux.
mux sync.Mutex
now time.Time
momentID int
Expand All @@ -42,36 +43,22 @@ type Time struct {
}

func (t *Time) Timer(d time.Duration) Timer {
done := make(chan time.Time, 1)

return timer{
tt := &timer{
time: t,
ch: done,
id: t.plan(t.When(d), func(now time.Time) {
done <- now
}),
ch: make(chan time.Time, 1),
}
tt.id = t.plan(t.When(d), tt.do)
return tt
}

func (t *Time) Ticker(d time.Duration) Ticker {
done := make(chan time.Time, 1)

tick := &ticker{
tt := &ticker{
time: t,
ch: done,
dur: int64(d),
}

var cb func(now time.Time)
cb = func(now time.Time) {
done <- now

dur := time.Duration(atomic.LoadInt64(&tick.dur))
t.planUnlocked(now.Add(dur), cb)
ch: make(chan time.Time, 1),
dur: d,
}
tick.id = t.plan(t.When(d), cb)

return tick
tt.id = t.plan(t.When(d), tt.do)
return tt
}

func (t *Time) planUnlocked(when time.Time, do func(now time.Time)) int {
Expand All @@ -81,7 +68,7 @@ func (t *Time) planUnlocked(when time.Time, do func(now time.Time)) int {
when: when,
do: do,
}
t.observe()
t.observeUnlocked()
return id
}

Expand All @@ -92,7 +79,9 @@ func (t *Time) plan(when time.Time, do func(now time.Time)) int {
return t.planUnlocked(when, do)
}

func (t *Time) stopTimer(id int) bool {
// stop removes the moment with the given ID from the list of scheduled moments.
// It returns true if a moment existed for the given ID, otherwise it is no-op.
func (t *Time) stop(id int) bool {
t.mux.Lock()
defer t.mux.Unlock()

Expand All @@ -101,27 +90,32 @@ func (t *Time) stopTimer(id int) bool {
return ok
}

func (t *Time) resetTimer(d time.Duration, id int, ch chan time.Time) {
// reset adjusts the moment with the given ID to run after the d duration. It
// creates a new moment if the moment does not already exist. If durp pointer
// is not nil, it is updated with d value while reset is holding Time’s lock.
func (t *Time) reset(d time.Duration, id int, do func(now time.Time), durp *time.Duration) {
t.mux.Lock()
defer t.mux.Unlock()
t.resetUnlocked(d, id, do, durp)
}

// resetUnlocked is like reset but does not acquire the Time’s lock.
func (t *Time) resetUnlocked(d time.Duration, id int, do func(now time.Time), durp *time.Duration) {
if durp != nil {
*durp = d
}

m, ok := t.moments[id]
if !ok {
m = moment{
do: func(now time.Time) {
ch <- now
},
}
m = moment{do: do}
}

m.when = t.now.Add(d)
t.moments[id] = m
}

// tick applies all scheduled temporal effects.
//
// The mux lock is expected.
func (t *Time) tick() moments {
// tickUnlocked applies all scheduled temporal effects.
func (t *Time) tickUnlocked() moments {
var past moments

for id, m := range t.moments {
Expand All @@ -139,30 +133,27 @@ func (t *Time) tick() moments {
// Now returns the current time.
func (t *Time) Now() time.Time {
t.mux.Lock()
now := t.now
t.mux.Unlock()
return now
defer t.mux.Unlock()
return t.now
}

// Set travels to specified time.
//
// Also triggers temporal effects.
func (t *Time) Set(now time.Time) {
t.mux.Lock()
t.now = now
t.mux.Unlock()
t.tick().do(now)
defer t.mux.Unlock()
t.setUnlocked(now)
}

// Travel adds duration to current time and returns result.
//
// Also triggers temporal effects.
func (t *Time) Travel(d time.Duration) time.Time {
t.mux.Lock()
defer t.mux.Unlock()
now := t.now.Add(d)
t.now = now
t.tick().do(now)
t.mux.Unlock()
t.setUnlocked(now)
return now
}

Expand All @@ -171,13 +162,19 @@ func (t *Time) Travel(d time.Duration) time.Time {
// Also triggers temporal effects.
func (t *Time) TravelDate(years, months, days int) time.Time {
t.mux.Lock()
defer t.mux.Unlock()
now := t.now.AddDate(years, months, days)
t.now = now
t.tick().do(now)
t.mux.Unlock()
t.setUnlocked(now)
return now
}

// setUnlocked sets the current time to the given now time and triggers temporal
// effects.
func (t *Time) setUnlocked(now time.Time) {
t.now = now
t.tickUnlocked().do(now)
}

// Sleep blocks until duration is elapsed.
func (t *Time) Sleep(d time.Duration) { <-t.After(d) }

Expand All @@ -196,7 +193,8 @@ func (t *Time) After(d time.Duration) <-chan time.Time {
return done
}

// Observe return channel that closes on clock calls.
// Observe return channel that closes on clock calls. The current implementation
// also closes the channel on Ticker’s ticks.
func (t *Time) Observe() <-chan struct{} {
observer := make(chan struct{})
t.mux.Lock()
Expand All @@ -206,7 +204,7 @@ func (t *Time) Observe() <-chan struct{} {
return observer
}

func (t *Time) observe() {
func (t *Time) observeUnlocked() {
for _, observer := range t.observers {
close(observer)
}
Expand Down
125 changes: 125 additions & 0 deletions time_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package neo

import (
"sync"
"testing"
"time"

Expand Down Expand Up @@ -171,3 +172,127 @@ func TestTime_Ticker(t *testing.T) {
}
}
}

func TestTime_TickerStop(t *testing.T) {
const interval = time.Second

now := time.Date(2049, 5, 6, 23, 55, 11, 1034, time.UTC)
sim := NewTime(now)

ticker := sim.Ticker(interval)
defer ticker.Stop()

// Tick once and stop ticker.
sim.Travel(interval)
select {
case <-ticker.C():
default:
t.Error("unexpected state")
}
ticker.Stop()

// Advance time by the tick interval and check that the tick was not
// sent on the channel.
sim.Travel(interval)
select {
case <-ticker.C():
t.Error("unexpected state")
default:
}

// Check that we can reset the ticker after stopping it and there are no
// erroneous ticks.
ticker.Reset(interval)
for range [3]struct{}{} {
select {
case <-ticker.C():
t.Error("unexpected done")
default:
}

sim.Travel(interval)

select {
case <-ticker.C():
default:
t.Error("unexpected state")
}
}
}

func TestTime_ObserveTick(t *testing.T) {
const interval = time.Second

now := time.Date(2049, 5, 6, 23, 55, 11, 1034, time.UTC)
sim := NewTime(now)

ticker := sim.Ticker(interval)
defer ticker.Stop()

// Check that we do not break existing users of the Time implementation:
// the observe channel must be closed on each tick.
for range [3]struct{}{} {
observe := sim.Observe()
sim.Travel(interval)
select {
case <-ticker.C():
default:
t.Error("unexpected state")
}
select {
case <-observe:
default:
t.Error("missing observation on tick")
}
}
}

func TestTime_Sleep(t *testing.T) {
const interval = time.Second

now := time.Date(2049, 5, 6, 23, 55, 11, 1034, time.UTC)
sim := NewTime(now)

observe := sim.Observe()

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
sim.Sleep(interval)
}()
<-observe
sim.Travel(interval)
wg.Wait()
}

func TestTime_TravelSteps(t *testing.T) {
const (
step = time.Second
n = 5
)

now := time.Date(2049, 5, 6, 23, 55, 11, 1034, time.UTC)
sim := NewTime(now)

timer := sim.Timer(n * step)
defer timer.Stop()

// Make all steps except the last one.
for i := 1; i < n; i++ {
sim.Travel(step)
select {
case <-timer.C():
t.Fatal("unexpected state")
default:
}
}

// Make the last step.
sim.Travel(step)
select {
case <-timer.C():
default:
t.Fatal("unexpected state")
}
}
Loading