From e759095990f70d1c555de6dbc6c5d1da5e20d25b Mon Sep 17 00:00:00 2001 From: stuart Date: Thu, 16 May 2024 09:22:09 +0200 Subject: [PATCH] Fix connexion problem --- be1-go/internal/popserver/hub.go | 7 ++++--- be1-go/internal/popserver/state/state.go | 12 ++++++++++++ be1-go/internal/popserver/types/subscribers.go | 15 +++++++++++++++ 3 files changed, 31 insertions(+), 3 deletions(-) diff --git a/be1-go/internal/popserver/hub.go b/be1-go/internal/popserver/hub.go index 07d0c62cb4..3e22ef69df 100644 --- a/be1-go/internal/popserver/hub.go +++ b/be1-go/internal/popserver/hub.go @@ -56,6 +56,7 @@ func (h *Hub) Start() { go func() { utils.LogInfo("start the Hub") for { + utils.LogInfo("waiting for a new message") select { case incomingMessage := <-h.messageChan: utils.LogInfo("start handling a message") @@ -65,9 +66,9 @@ func (h *Hub) Start() { } else { utils.LogInfo("successfully handled a message") } - case <-h.closedSockets: - utils.LogInfo("stopping the Sockets") - return + case socketID := <-h.closedSockets: + utils.LogInfo("stopping the Socket " + socketID) + state.UnsubscribeFromAll(socketID) case <-h.stop: utils.LogInfo("stopping the Hub") return diff --git a/be1-go/internal/popserver/state/state.go b/be1-go/internal/popserver/state/state.go index 65bed7164e..519a1bddab 100644 --- a/be1-go/internal/popserver/state/state.go +++ b/be1-go/internal/popserver/state/state.go @@ -23,6 +23,7 @@ type Subscriber interface { HasChannel(channel string) bool Subscribe(channel string, socket socket.Socket) *answer.Error Unsubscribe(channel string, socket socket.Socket) *answer.Error + UnsubscribeFromAll(socketID string) SendToAll(buf []byte, channel string) *answer.Error } @@ -104,6 +105,17 @@ func Unsubscribe(socket socket.Socket, channel string) *answer.Error { return subs.Unsubscribe(channel, socket) } +func UnsubscribeFromAll(socketID string) *answer.Error { + subs, errAnswer := getSubs() + if errAnswer != nil { + return errAnswer + } + + subs.UnsubscribeFromAll(socketID) + + return nil +} + func SendToAll(buf []byte, channel string) *answer.Error { subs, errAnswer := getSubs() if errAnswer != nil { diff --git a/be1-go/internal/popserver/types/subscribers.go b/be1-go/internal/popserver/types/subscribers.go index 9845ff8a2d..ab40671222 100644 --- a/be1-go/internal/popserver/types/subscribers.go +++ b/be1-go/internal/popserver/types/subscribers.go @@ -1,6 +1,7 @@ package types import ( + "fmt" "golang.org/x/xerrors" "popstellar/message/answer" "popstellar/network/socket" @@ -65,6 +66,20 @@ func (s *Subscribers) Unsubscribe(channel string, socket socket.Socket) *answer. return nil } +func (s *Subscribers) UnsubscribeFromAll(socketID string) { + s.Lock() + defer s.Unlock() + + for channel, subs := range s.list { + _, ok := subs[socketID] + if !ok { + continue + } + delete(s.list[channel], socketID) + fmt.Println("unsubscribe from " + channel) + } +} + // SendToAll sends a message to all sockets. func (s *Subscribers) SendToAll(buf []byte, channel string) *answer.Error { s.RLock()