Skip to content

Commit

Permalink
more effort
Browse files Browse the repository at this point in the history
  • Loading branch information
vladopajic committed Nov 15, 2024
1 parent 81fde6e commit 20b5db1
Showing 1 changed file with 57 additions and 51 deletions.
108 changes: 57 additions & 51 deletions actor/mailbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,44 +113,30 @@ func newMailboxSync[T any](options optionsMailbox) *mailboxSync[T] {
return &mailboxSync[T]{
c: make(chan T, options.Capacity),
sendHandler: newSendHandler[T](),
rg: &runningGuard{},
}
}

type mailboxSync[T any] struct {
c chan T
ctx *context
running bool
lock sync.Mutex
sendHandler *atomic.Value
rg *runningGuard
}

func (m *mailboxSync[T]) Start() {
m.lock.Lock()
defer m.lock.Unlock()

if m.running {
return
}

m.running = true

m.ctx = newContext()
m.sendHandler.Store(createSendHandlerWithCtx(m.ctx, m.c))
m.rg.Start(func() {
m.ctx = newContext()
m.sendHandler.Store(createSendHandlerWithCtx(m.ctx, m.c))
})
}

func (m *mailboxSync[T]) Stop() {
m.lock.Lock()
defer m.lock.Unlock()

if !m.running {
return
}

m.running = false

m.sendHandler.Store(createErrorHandler[T](ErrMailboxStopped))
m.ctx.end()
close(m.c)
m.rg.Stop(func() {
m.sendHandler.Store(createErrorHandler[T](ErrMailboxStopped))
m.ctx.end()
close(m.c)
})
}

func (m *mailboxSync[T]) Send(ctx Context, msg T) error {
Expand All @@ -173,43 +159,30 @@ func newMailboxAsync[T any](options optionsMailbox) *mailbox[T] {
sendC: sendC,
receiveC: receiveC,
sendHandler: newSendHandler[T](),
rg: &runningGuard{},
}
}

type mailbox[T any] struct {
actor Actor
sendC chan<- T
receiveC <-chan T

running bool
lock sync.Mutex
actor Actor
sendC chan<- T
receiveC <-chan T
sendHandler *atomic.Value
rg *runningGuard
}

func (m *mailbox[T]) Start() {
m.lock.Lock()
defer m.lock.Unlock()

if m.running {
return
}

m.running = true
m.sendHandler.Store(createSendHandler(m.sendC))
m.actor.Start()
m.rg.Start(func() {
m.sendHandler.Store(createSendHandler(m.sendC))
m.actor.Start()
})
}

func (m *mailbox[T]) Stop() {
m.lock.Lock()
defer m.lock.Unlock()

if !m.running {
return
}

m.running = false
m.sendHandler.Store(createErrorHandler[T](ErrMailboxStopped))
m.actor.Stop()
m.rg.Stop(func() {
m.sendHandler.Store(createErrorHandler[T](ErrMailboxStopped))
m.actor.Stop()
})
}

func (m *mailbox[T]) Send(ctx Context, msg T) error {
Expand Down Expand Up @@ -328,3 +301,36 @@ func createSendHandlerWithCtx[T any](mbxCtx Context, sendC chan<- T) sendHandler
}
}
}

type runningGuard struct {
running bool
stopped bool
lock sync.Mutex
}

func (s *runningGuard) Start(cb func()) {
s.lock.Lock()
defer s.lock.Unlock()

if s.running || s.stopped {
return
}

s.running = true

cb()
}

func (s *runningGuard) Stop(cb func()) {
s.lock.Lock()
defer s.lock.Unlock()

if !s.running || s.stopped {
return
}

s.running = false
s.stopped = true

cb()
}

0 comments on commit 20b5db1

Please sign in to comment.