Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
Merge pull request #6 from tdakkota/feature/add-ticker-and-timer
Browse files Browse the repository at this point in the history
Add Timer and Ticker implementation
  • Loading branch information
ernado authored Apr 7, 2021
2 parents d34a6fb + 911e4b5 commit 90d3dd5
Show file tree
Hide file tree
Showing 6 changed files with 256 additions and 30 deletions.
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a h1:DcqTD9SDLc+1P/r1EmRBwnVsrOwW+kk2vWf9n+1sGhs=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
28 changes: 28 additions & 0 deletions moment.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package neo

import "time"

type moment struct {
when time.Time
do func(time time.Time)
}

type moments []moment

func (m moments) do(t time.Time) {
for _, doer := range m {
doer.do(t)
}
}

func (m moments) Len() int {
return len(m)
}

func (m moments) Less(i, j int) bool {
return m[i].when.Before(m[j].when)
}

func (m moments) Swap(i, j int) {
m[i], m[j] = m[j], m[i]
}
26 changes: 26 additions & 0 deletions ticker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package neo

import (
"sync/atomic"
"time"
)

type ticker struct {
time *Time
ch chan time.Time
id int
dur int64
}

func (t *ticker) C() <-chan time.Time {
return t.ch
}

func (t *ticker) Stop() {
t.time.stopTimer(t.id)
}

func (t *ticker) Reset(d time.Duration) {
atomic.StoreInt64(&t.dur, int64(d))
t.time.resetTimer(d, t.id, t.ch)
}
131 changes: 101 additions & 30 deletions time.go
Original file line number Diff line number Diff line change
@@ -1,65 +1,136 @@
package neo

import (
"sort"
"sync"
"sync/atomic"
"time"
)

// Timer abstracts a single event.
type Timer interface {
C() <-chan time.Time
Stop() bool
Reset(d time.Duration)
}

// Ticker abstracts a channel that delivers ``ticks'' of a clock at intervals.
type Ticker interface {
C() <-chan time.Time
Stop()
Reset(d time.Duration)
}

// NewTime returns new temporal simulator.
func NewTime(now time.Time) *Time {
return &Time{
now: now,
now: now,
moments: map[int]moment{},
}
}

// Time simulates temporal interactions.
//
// All methods are goroutine-safe.
type Time struct {
mux sync.Mutex
now time.Time
mux sync.Mutex
now time.Time
momentID int

moments []moment
moments map[int]moment
observers []chan struct{}
}

func (t *Time) plan(when time.Time, do func(now time.Time)) {
func (t *Time) Timer(d time.Duration) Timer {
done := make(chan time.Time, 1)

return timer{
time: t,
ch: done,
id: t.plan(t.When(d), func(now time.Time) {
done <- now
}),
}
}

func (t *Time) Ticker(d time.Duration) Ticker {
done := make(chan time.Time, 1)

tick := &ticker{
time: t,
ch: done,
dur: int64(d),
}

var cb func(now time.Time)
cb = func(now time.Time) {
done <- now

dur := time.Duration(atomic.LoadInt64(&tick.dur))
t.plan(now.Add(dur), cb)
}
tick.id = t.plan(t.When(d), cb)

return tick
}

func (t *Time) plan(when time.Time, do func(now time.Time)) int {
t.mux.Lock()
defer t.mux.Unlock()

t.moments = append(t.moments, moment{
id := t.momentID
t.momentID++
t.moments[t.momentID] = moment{
when: when,
do: do,
})
}
t.observe()
return id
}

func (t *Time) stopTimer(id int) bool {
t.mux.Lock()
defer t.mux.Unlock()

_, ok := t.moments[id]
delete(t.moments, id)
return !ok
}

func (t *Time) resetTimer(d time.Duration, id int, ch chan time.Time) {
t.mux.Lock()
defer t.mux.Unlock()

m, ok := t. /* bruh */ moments[id]
if !ok {
m = moment{
do: func(now time.Time) {
ch <- now
},
}
}

delete(t.moments, id)
m.when = t.now.Add(d)
t.moments[id] = m
}

// tick applies all scheduled temporal effects.
//
// The mux lock is expected.
func (t *Time) tick() {
var (
past []moment
future []moment
)
for _, m := range t.moments {
switch {
case m.when.After(t.now):
future = append(future, m)
case m.when.Before(t.now):
past = append(past, m)
}
}
func (t *Time) tick() moments {
var past moments

t.moments = future
for _, m := range past {
m.do(t.now)
for id, m := range t.moments {
if m.when.After(t.now) {
continue
}
delete(t.moments, id)
past = append(past, m)
}
}
sort.Sort(past)

type moment struct {
when time.Time
do func(time time.Time)
return past
}

// Now returns the current time.
Expand All @@ -76,8 +147,8 @@ func (t *Time) Now() time.Time {
func (t *Time) Set(now time.Time) {
t.mux.Lock()
t.now = now
t.tick()
t.mux.Unlock()
t.tick().do(now)
}

// Travel adds duration to current time and returns result.
Expand All @@ -87,8 +158,8 @@ func (t *Time) Travel(d time.Duration) time.Time {
t.mux.Lock()
now := t.now.Add(d)
t.now = now
t.tick()
t.mux.Unlock()
t.tick().do(now)
return now
}

Expand All @@ -99,8 +170,8 @@ func (t *Time) TravelDate(years, months, days int) time.Time {
t.mux.Lock()
now := t.now.AddDate(years, months, days)
t.now = now
t.tick()
t.mux.Unlock()
t.tick().do(now)
return now
}

Expand Down
78 changes: 78 additions & 0 deletions time_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,81 @@ func TestTime_Observe(t *testing.T) {
t.Error(err)
}
}

func TestTime_Timer(t *testing.T) {
now := time.Date(2049, 5, 6, 23, 55, 11, 1034, time.UTC)
sim := NewTime(now)

after := sim.Timer(time.Second)
defer after.Stop()

select {
case <-after.C():
t.Error("unexpected done")
default:
}

sim.Travel(time.Second*1 + time.Microsecond)
select {
case <-after.C():
default:
t.Error("unexpected state")
}

after.Reset(time.Second)

select {
case <-after.C():
t.Error("unexpected done")
default:
}

sim.Travel(time.Second*1 + time.Microsecond)
select {
case <-after.C():
default:
t.Error("unexpected state")
}
}

func TestTime_Ticker(t *testing.T) {
now := time.Date(2049, 5, 6, 23, 55, 11, 1034, time.UTC)
sim := NewTime(now)

after := sim.Ticker(2 * time.Second)
defer after.Stop()

// Tick a bit.
for range [3]struct{}{} {
select {
case <-after.C():
t.Error("unexpected done")
default:
}

sim.Travel(2*time.Second + time.Microsecond)
select {
case <-after.C():
default:
t.Error("unexpected state")
}
}

after.Reset(time.Second)

// Tick faster a bit.
for range [3]struct{}{} {
select {
case <-after.C():
t.Error("unexpected done")
default:
}

sim.Travel(time.Second + time.Microsecond)
select {
case <-after.C():
default:
t.Error("unexpected state")
}
}
}
21 changes: 21 additions & 0 deletions timer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package neo

import "time"

type timer struct {
time *Time
ch chan time.Time
id int
}

func (t timer) C() <-chan time.Time {
return t.ch
}

func (t timer) Stop() bool {
return t.time.stopTimer(t.id)
}

func (t timer) Reset(d time.Duration) {
t.time.resetTimer(d, t.id, t.ch)
}

0 comments on commit 90d3dd5

Please sign in to comment.