Skip to content

Commit

Permalink
refactor actor
Browse files Browse the repository at this point in the history
  • Loading branch information
dc0d committed Oct 9, 2021
1 parent 5abf573 commit 7c4afe0
Show file tree
Hide file tree
Showing 9 changed files with 1,214 additions and 335 deletions.
116 changes: 116 additions & 0 deletions actor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
// see LICENSE file

package spool

import (
"context"
"time"
)

func Start(ctx context.Context, mailbox Mailbox, callbacks Callbacks, options ...Option) {
opts := applyOptions(options...)
start(ctx, mailbox, callbacks, opts)
}

func start(ctx context.Context, mailbox Mailbox, callbacks Callbacks, opts actorOptions) {
go func() {
if started != nil {
started(mailbox)
}
if stopped != nil {
defer stopped(mailbox)
}

var (
absoluteTimeout = opts.absoluteTimeout
idleTimeout = opts.idleTimeout
)

var absoluteTimeoutSignal, idleTimeoutSignal <-chan time.Time
if absoluteTimeout > 0 {
absoluteTimeoutSignal = time.After(absoluteTimeout)
}

var requestCount RequestCount
for {
if requestCount > 0 && opts.respawnAfter > 0 && opts.respawnAfter <= requestCount {
start(ctx, mailbox, callbacks, opts)
return
}

if idleTimeout > 0 {
idleTimeoutSignal = time.After(idleTimeout)
}

select {
case <-absoluteTimeoutSignal:
callbacks.Stopped()
return
case <-idleTimeoutSignal:
if opts.respawnAfter > 0 {
start(ctx, mailbox, callbacks, opts)
return
}
callbacks.Stopped()
return
case <-ctx.Done():
callbacks.Stopped()
return
case v, ok := <-mailbox:
if !ok {
callbacks.Stopped()
return
}
callbacks.Received(v)
requestCount++
}
}
}()
}

type (
Callbacks interface {
Received(T)
Stopped()
}

Mailbox <-chan T

Option func(actorOptions) actorOptions

actorOptions struct {
absoluteTimeout time.Duration
idleTimeout time.Duration
respawnAfter RequestCount
}

RequestCount int
MailboxSize int

T = func()
)

func WithAbsoluteTimeout(timeout time.Duration) Option {
return func(opts actorOptions) actorOptions { opts.absoluteTimeout = timeout; return opts }
}

func WithIdleTimeout(timeout time.Duration) Option {
return func(opts actorOptions) actorOptions { opts.idleTimeout = timeout; return opts }
}

func WithRespawnAfter(respawnAfter RequestCount) Option {
return func(opts actorOptions) actorOptions { opts.respawnAfter = respawnAfter; return opts }
}

func applyOptions(opts ...Option) actorOptions {
var options actorOptions
for _, fn := range opts {
options = fn(options)
}
return options
}

var (
started func(pool Mailbox)
stopped func(pool Mailbox)
)
116 changes: 116 additions & 0 deletions actor/actor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
// see LICENSE file

package actor

import (
"context"
"time"
)

func Start(ctx context.Context, mailbox Mailbox, callbacks Callbacks, options ...Option) {
opts := applyOptions(options...)
start(ctx, mailbox, callbacks, opts)
}

func start(ctx context.Context, mailbox Mailbox, callbacks Callbacks, opts actorOptions) {
go func() {
if started != nil {
started(mailbox)
}
if stopped != nil {
defer stopped(mailbox)
}

var (
absoluteTimeout = opts.absoluteTimeout
idleTimeout = opts.idleTimeout
)

var absoluteTimeoutSignal, idleTimeoutSignal <-chan time.Time
if absoluteTimeout > 0 {
absoluteTimeoutSignal = time.After(absoluteTimeout)
}

var requestCount RequestCount
for {
if requestCount > 0 && opts.respawnAfter > 0 && opts.respawnAfter <= requestCount {
start(ctx, mailbox, callbacks, opts)
return
}

if idleTimeout > 0 {
idleTimeoutSignal = time.After(idleTimeout)
}

select {
case <-absoluteTimeoutSignal:
callbacks.Stopped()
return
case <-idleTimeoutSignal:
if opts.respawnAfter > 0 {
start(ctx, mailbox, callbacks, opts)
return
}
callbacks.Stopped()
return
case <-ctx.Done():
callbacks.Stopped()
return
case v, ok := <-mailbox:
if !ok {
callbacks.Stopped()
return
}
callbacks.Received(v)
requestCount++
}
}
}()
}

type (
Callbacks interface {
Received(T)
Stopped()
}

Mailbox <-chan T

Option func(actorOptions) actorOptions

actorOptions struct {
absoluteTimeout time.Duration
idleTimeout time.Duration
respawnAfter RequestCount
}

RequestCount int
MailboxSize int

T = interface{}
)

func WithAbsoluteTimeout(timeout time.Duration) Option {
return func(opts actorOptions) actorOptions { opts.absoluteTimeout = timeout; return opts }
}

func WithIdleTimeout(timeout time.Duration) Option {
return func(opts actorOptions) actorOptions { opts.idleTimeout = timeout; return opts }
}

func WithRespawnAfter(respawnAfter RequestCount) Option {
return func(opts actorOptions) actorOptions { opts.respawnAfter = respawnAfter; return opts }
}

func applyOptions(opts ...Option) actorOptions {
var options actorOptions
for _, fn := range opts {
options = fn(options)
}
return options
}

var (
started func(pool Mailbox)
stopped func(pool Mailbox)
)
Loading

0 comments on commit 7c4afe0

Please sign in to comment.