Skip to content

Commit

Permalink
Use locks in router with rooms
Browse files Browse the repository at this point in the history
  • Loading branch information
sergystepanov committed Oct 5, 2023
1 parent c1c4731 commit cddf081
Show file tree
Hide file tree
Showing 8 changed files with 60 additions and 27 deletions.
1 change: 1 addition & 0 deletions pkg/com/com.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ func NewNetMap[K comparable, T NetClient[K]]() NetMap[K, T] {

func (m *NetMap[K, T]) Add(client T) bool { return m.Put(client.Id(), client) }
func (m *NetMap[K, T]) Remove(client T) { m.Map.Remove(client.Id()) }
func (m *NetMap[K, T]) RemoveL(client T) int { return m.Map.RemoveL(client.Id()) }
func (m *NetMap[K, T]) Reset() { m.Map = Map[K, T]{m: make(map[K]T, 10)} }
func (m *NetMap[K, T]) RemoveDisconnect(client T) { client.Disconnect(); m.Remove(client) }

Expand Down
29 changes: 25 additions & 4 deletions pkg/com/map.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package com

import "sync"
import (
"fmt"
"sync"
)

// Map defines a concurrent-safe map structure.
// Keep in mind that the underlying map structure will grow indefinitely.
Expand All @@ -9,7 +12,7 @@ type Map[K comparable, V any] struct {
mu sync.Mutex
}

func (m *Map[K, _]) Has(key K) bool { _, ok := m.Find(key); return ok }
func (m *Map[K, _]) Has(key K) bool { _, ok := m.Contains(key); return ok }
func (m *Map[_, _]) Len() int { m.mu.Lock(); defer m.mu.Unlock(); return len(m.m) }
func (m *Map[K, V]) Pop(key K) V {
m.mu.Lock()
Expand All @@ -26,9 +29,22 @@ func (m *Map[K, V]) Put(key K, v V) bool {
return ok
}
func (m *Map[K, _]) Remove(key K) { m.mu.Lock(); delete(m.m, key); m.mu.Unlock() }
func (m *Map[K, _]) RemoveL(key K) int {
m.mu.Lock()
delete(m.m, key)
k := len(m.m)
m.mu.Unlock()
return k
}
func (m *Map[K, V]) String() string {
m.mu.Lock()
s := fmt.Sprintf("%v", m.m)
m.mu.Unlock()
return s
}

// Find returns the first value found and a boolean flag if its found or not.
func (m *Map[K, V]) Find(key K) (v V, ok bool) {
// Contains returns the first value found and a boolean flag if its found or not.
func (m *Map[K, V]) Contains(key K) (v V, ok bool) {
m.mu.Lock()
defer m.mu.Unlock()
if vv, ok := m.m[key]; ok {
Expand All @@ -37,6 +53,11 @@ func (m *Map[K, V]) Find(key K) (v V, ok bool) {
return v, false
}

func (m *Map[K, V]) Find(key K) V {
v, _ := m.Contains(key)
return v
}

// FindBy searches the first key-value with the provided predicate function.
func (m *Map[K, V]) FindBy(fn func(v V) bool) (v V, ok bool) {
m.mu.Lock()
Expand Down
4 changes: 2 additions & 2 deletions pkg/com/map_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ func TestMap_Base(t *testing.T) {
if !m.Has(k) {
t.Errorf("should have the key %v, %v", k, m.m)
}
v, ok := m.Find(k)
v, ok := m.Contains(k)
if v != 0 && !ok {
t.Errorf("should have the key %v and ok, %v %v", k, ok, m.m)
}
_, ok = m.Find(k + 1)
_, ok = m.Contains(k + 1)
if ok {
t.Errorf("should not find anything, %v %v", ok, m.m)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/coordinator/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type RegionalClient interface {
}

type HasUserRegistry interface {
Find(com.Uid) (*User, bool)
Find(com.Uid) *User
}

func NewWorker(sock *com.Connection, handshake api.ConnectionRequest[com.Uid], log *logger.Logger) *Worker {
Expand Down
2 changes: 1 addition & 1 deletion pkg/coordinator/workerhandlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func (w *Worker) HandleCloseRoom(rq api.CloseRoomRequest) {
}

func (w *Worker) HandleIceCandidate(rq api.WebrtcIceCandidateRequest[com.Uid], users HasUserRegistry) error {
if usr, ok := users.Find(rq.Id); ok {
if usr := users.Find(rq.Id); usr != nil {
usr.SendWebrtcIceCandidate(rq.Candidate)
} else {
w.log.Warn().Str("id", rq.Id.String()).Msg("unknown session")
Expand Down
12 changes: 6 additions & 6 deletions pkg/worker/coordinatorhandlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,10 @@ func (c *coordinator) HandleWebrtcInit(rq api.WebrtcInitRequest[com.Uid], w *Wor
}

user := room.NewGameSession(rq.Id, peer) // use user uid from the coordinator
w.router.AddUser(user)
c.log.Info().Msgf("Peer connection: %s", user.Id())
c.log.Debug().Msgf("Users before add: %v", w.router.Users())
w.router.AddUser(user)
c.log.Debug().Msgf("Users after add: %v", w.router.Users())

return api.Out{Payload: sdp}
}
Expand Down Expand Up @@ -118,11 +120,7 @@ func (c *coordinator) HandleGameStart(rq api.StartGameRequest[com.Uid], w *Worke

// make the room
r = room.NewRoom[*room.GameSession](uid, app, w.router.Users(), m)
r.HandleClose = func() {
w.router.Close()
c.CloseRoom(uid)
w.log.Debug().Msgf("Closed room %v", uid)
}
r.HandleClose = func() { c.CloseRoom(uid) }

w.router.SetRoom(r)
c.log.Info().Str("room", r.Id()).Str("game", game.Name).Msg("New room")
Expand All @@ -148,8 +146,10 @@ func (c *coordinator) HandleTerminateSession(rq api.TerminateSessionRequest[com.

// HandleQuitGame handles cases when a user manually exits the game.
func (c *coordinator) HandleQuitGame(rq api.GameQuitRequest[com.Uid], w *Worker) {
w.log.Debug().Msgf("Users before remove: %v", w.router.Users())
if user := w.router.FindUser(rq.Id); user != nil {
w.router.Remove(user)
w.log.Debug().Msgf("Users after remove: %v", w.router.Users())
}
}

Expand Down
31 changes: 19 additions & 12 deletions pkg/worker/room/room.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package room

import "github.com/giongto35/cloud-game/v3/pkg/worker/caged/app"
import (
"sync"

"github.com/giongto35/cloud-game/v3/pkg/worker/caged/app"
)

type MediaPipe interface {
// Destroy frees all allocated resources.
Expand All @@ -19,10 +23,9 @@ type MediaPipe interface {

type SessionManager[T Session] interface {
Add(T) bool
Find(string) (T, bool)
Find(string) T
ForEach(func(T))
Len() int
Remove(T)
RemoveL(T) int
// Reset used for proper cleanup of the resources if needed.
Reset()
}
Expand Down Expand Up @@ -93,32 +96,36 @@ func (r *Room[T]) Close() {
type Router[T Session] struct {
room *Room[T]
users SessionManager[T]
mu sync.Mutex
}

func (r *Router[T]) AddUser(user T) { r.users.Add(user) }

func (r *Router[T]) Close() {
r.mu.Lock()
if r.room != nil {
r.room.Close()
r.room = nil
}
r.mu.Unlock()
}

func (r *Router[T]) FindRoom(id string) *Room[T] {
r.mu.Lock()
defer r.mu.Unlock()
if r.room != nil && r.room.Id() == id {
return r.room
}
return nil
}
func (r *Router[T]) FindUser(uid Uid) T { sess, _ := r.users.Find(uid.Id()); return sess }

func (r *Router[T]) Remove(user T) {
r.users.Remove(user)
if r.users.Len() == 0 {
if r.room != nil {
r.room.Close()
}
r.users.Reset()
if left := r.users.RemoveL(user); left == 0 {
r.Close()
}
}
func (r *Router[T]) SetRoom(room *Room[T]) { r.room = room }
func (r *Router[T]) FindUser(uid Uid) T { return r.users.Find(uid.Id()) }
func (r *Router[T]) SetRoom(room *Room[T]) { r.mu.Lock(); r.room = room; r.mu.Unlock() }
func (r *Router[T]) Users() SessionManager[T] { return r.users }

type AppSession struct {
Expand Down
6 changes: 5 additions & 1 deletion pkg/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@ type Worker struct {
storage cloud.Storage
}

func (w *Worker) Reset() { w.router.Close() }
func (w *Worker) Reset() {
w.log.Debug().Msgf("Users before close: %v", w.router.Users())
w.router.Close()
w.log.Debug().Msgf("Users after close: %v", w.router.Users())
}

const retry = 10 * time.Second

Expand Down

0 comments on commit cddf081

Please sign in to comment.