Skip to content

Commit

Permalink
slots simplified
Browse files Browse the repository at this point in the history
  • Loading branch information
igorcrevar committed Jul 7, 2023
1 parent d29bc8a commit c31c119
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 70 deletions.
7 changes: 4 additions & 3 deletions network/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,8 @@ func (s *Server) runDial() {
case
peerEvent.PeerFailedToConnect,
peerEvent.PeerDisconnected:
_ = slots.ReturnSlot()
slots.Release()
s.logger.Debug("slot released", "event", event.Type)
}
}); err != nil {
s.logger.Error(
Expand Down Expand Up @@ -399,7 +400,7 @@ func (s *Server) runDial() {

s.logger.Debug("Waiting for a dialing slot", "addr", peerInfo, "local", s.host.ID())

if _, closed := slots.TakeSlot(ctx); closed {
if closed := slots.Take(ctx); closed {
return
}

Expand All @@ -408,7 +409,7 @@ func (s *Server) runDial() {
// the connection process is async because it involves connection (here) +
// the handshake done in the identity service.
if err := s.host.Connect(ctx, *peerInfo); err != nil {
slots.ReturnSlot()
slots.Release()

s.logger.Debug("failed to dial", "addr", peerInfo, "err", err.Error())

Expand Down
69 changes: 18 additions & 51 deletions network/slots.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,71 +2,38 @@ package network

import (
"context"
"sync"
)

// Slots is synchronization structure
// A routine can invoke the TakeSlot method, which will block until at least one slot becomes available
// The ReturnSlot method can be called by other routines to increase the count of available slots by one
type Slots struct {
ch chan struct{}
maximal int64
available int64
// A routine can invoke the Take method, which will block until at least one slot becomes available
// The Release method can be called by other routines to increase the number of available slots by one
type Slots chan struct{}

lock sync.Mutex
}

// NewSlots creates *Slots object with maximal slots available
func NewSlots(maximal int64) *Slots {
ch := make(chan struct{}, maximal)
// NewSlots creates Slots object with maximal slots available
func NewSlots(maximal int64) Slots {
slots := make(Slots, maximal)
// add slots
for i := int64(0); i < maximal; i++ {
ch <- struct{}{}
slots <- struct{}{}
}

return &Slots{
ch: ch,
maximal: maximal,
available: maximal,
lock: sync.Mutex{},
}
return slots
}

// TakeSlot takes slot if available or blocks until slot is available or context is done
func (s *Slots) TakeSlot(ctx context.Context) (int64, bool) {
// Take takes slot if available or blocks until slot is available or context is done
func (s Slots) Take(ctx context.Context) bool {
select {
case <-ctx.Done():
return -1, true
case <-s.ch:
s.lock.Lock()
defer s.lock.Unlock()

s.available--

return s.available, false
return true
case <-s:
return false
}
}

// ReturnSlot returns back one slot. There is guarantee that available slots must be <= than maximal slots
func (s *Slots) ReturnSlot() int64 {
s.lock.Lock()
defer s.lock.Unlock()

// prevent to return more slots than maximal
if s.available == s.maximal {
return s.available
// Release returns back one slot. If all slots are already released, nothing will happen
func (s Slots) Release() {
select {
case s <- struct{}{}:
default: // No slot available to release, do nothing
}

s.ch <- struct{}{}
s.available++

return s.available
}

// GetAvailableCount returns currently available slots count
func (s *Slots) GetAvailableCount() int64 {
s.lock.Lock()
defer s.lock.Unlock()

return s.available
}
30 changes: 14 additions & 16 deletions network/slots_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,31 +13,29 @@ func TestSlots(t *testing.T) {

slots := NewSlots(4)

assert.Equal(t, int64(4), slots.GetAvailableCount())

num := slots.ReturnSlot() // should do nothing

assert.Equal(t, int64(4), num)
assert.Equal(t, int64(4), slots.GetAvailableCount())
for i := 0; i < 4; i++ {
slots.Release() // should do nothing
}

for i := 3; i >= 0; i-- {
num, closed := slots.TakeSlot(context.Background())

closed := slots.Take(context.Background())
assert.False(t, closed)
assert.Equal(t, int64(i), num)
}

go func() {
<-time.After(time.Second * 1)

_ = slots.ReturnSlot() // return one slot after 2 seconds
time.Sleep(time.Millisecond * 500)
slots.Release() // return one slot after 500 milis
time.Sleep(time.Millisecond * 500)
slots.Release() // return another slot after 1 seconds
}()

tm := time.Now().UTC()

num, closed := slots.TakeSlot(context.Background())
closed1 := slots.Take(context.Background())
closed2 := slots.Take(context.Background())

assert.False(t, closed)
assert.Equal(t, int64(0), num)
assert.GreaterOrEqual(t, time.Now().UTC(), tm.Add(time.Second*1))
assert.False(t, closed1)
assert.GreaterOrEqual(t, time.Now().UTC(), tm.Add(time.Millisecond*500))
assert.False(t, closed2)
assert.GreaterOrEqual(t, time.Now().UTC(), tm.Add(time.Millisecond*500*2))
}

0 comments on commit c31c119

Please sign in to comment.