Skip to content

Commit

Permalink
VKP-128-ws-fix: fix WS messages received only on one of many connecte…
Browse files Browse the repository at this point in the history
…d and authorized from single account devices
  • Loading branch information
PetrMitin committed May 28, 2024
1 parent 57b3c9d commit f5c8c69
Showing 1 changed file with 89 additions and 34 deletions.
123 changes: 89 additions & 34 deletions internal/rest/chat/chat.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"socio/usecase/csrf"
"strconv"
"strings"
"sync"
"time"

"github.com/gorilla/mux"
Expand All @@ -36,6 +37,7 @@ const (

type ChatServer struct {
Service ChatService
wsConns *sync.Map
}

type ChatService interface {
Expand Down Expand Up @@ -63,9 +65,25 @@ var upgrader = &websocket.Upgrader{
func NewChatServer(service ChatService) (chatServer *ChatServer) {
return &ChatServer{
Service: service,
wsConns: &sync.Map{},
}
}

func (c *ChatServer) getWSConns(userID uint) (conns []*websocket.Conn, ok bool) {
untypedConns, ok := c.wsConns.Load(userID)
if !ok {
conns = nil
return
}

conns, ok = untypedConns.([]*websocket.Conn)
if !ok {
return
}

return
}

// HandleGetDialogs godoc
//
// @Summary get user dialogs
Expand Down Expand Up @@ -246,7 +264,18 @@ func (c *ChatServer) ServeWS(w http.ResponseWriter, r *http.Request) {
return
}

go c.listenWrite(r.Context(), conn, client)
conns, ok := c.wsConns.Load(userID)
if !ok {
conns = make([]*websocket.Conn, 0, 1)
conns = append(conns.([]*websocket.Conn), conn)
c.wsConns.Store(userID, conns)

go c.listenWrite(r.Context(), client)
} else {
conns = append(conns.([]*websocket.Conn), conn)
c.wsConns.Store(userID, conns)
}

go c.listenRead(r.Context(), conn, client)
}

Expand Down Expand Up @@ -308,17 +337,25 @@ func (c *ChatServer) listenRead(ctx context.Context, conn *websocket.Conn, clien
}
}

func (c *ChatServer) listenWrite(ctx context.Context, conn *websocket.Conn, client *chat.Client) {
func (c *ChatServer) listenWrite(ctx context.Context, client *chat.Client) {
ticker := time.NewTicker(pingPeriod)

defer func() {
ticker.Stop()
err := conn.Close()
if err != nil {

conns, ok := c.getWSConns(client.UserID)
if !ok {
return
}

err = c.Service.Unregister(client.UserID)
for _, conn := range conns {
err := conn.Close()
if err != nil {
return
}
}

err := c.Service.Unregister(client.UserID)
if err != nil {
return
}
Expand All @@ -327,64 +364,82 @@ func (c *ChatServer) listenWrite(ctx context.Context, conn *websocket.Conn, clie
for {
select {
case message, ok := <-client.Send:

Check failure on line 366 in internal/rest/chat/chat.go

View workflow job for this annotation

GitHub Actions / Lint

ineffectual assignment to ok (ineffassign)
err := conn.SetWriteDeadline(time.Now().Add(writeWait))
messages := make([][]byte, 0, len(client.Send)+1)

messageData, err := easyjson.Marshal(message)
if err != nil {
return
}

if !ok {
err := conn.WriteMessage(websocket.CloseMessage, []byte{})
messages = append(messages, messageData)

n := len(client.Send)
for i := 0; i < n; i++ {
messageData, err = easyjson.Marshal(<-client.Send)
if err != nil {
return
}

return
}

w, err := conn.NextWriter(websocket.TextMessage)
if err != nil {
return
}

messageData, err := easyjson.Marshal(message)
if err != nil {
return
messages = append(messages, messageData)
}

_, err = w.Write(messageData)
if err != nil {
conns, ok := c.getWSConns(client.UserID)
if !ok {
return
}

n := len(client.Send)
for i := 0; i < n; i++ {
messageData, err = easyjson.Marshal(<-client.Send)
for _, conn := range conns {
err := conn.SetWriteDeadline(time.Now().Add(writeWait))
if err != nil {
return
}
_, err := w.Write([]byte{newline})
if err != nil {

if !ok {
err := conn.WriteMessage(websocket.CloseMessage, []byte{})
if err != nil {
return
}

return
}

_, err = w.Write(messageData)
w, err := conn.NextWriter(websocket.TextMessage)
if err != nil {
return
}
}

if err := w.Close(); err != nil {
return
for _, message := range messages {
_, err := w.Write([]byte{newline})
if err != nil {
return
}

_, err = w.Write(message)
if err != nil {
return
}
}

if err := w.Close(); err != nil {
return
}
}

case <-ticker.C:
err := conn.SetWriteDeadline(time.Now().Add(writeWait))
if err != nil {
conns, ok := c.getWSConns(client.UserID)
if !ok {
return
}

if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil {
return
for _, conn := range conns {
err := conn.SetWriteDeadline(time.Now().Add(writeWait))
if err != nil {
return
}

if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil {
return
}
}
}
}
Expand Down

0 comments on commit f5c8c69

Please sign in to comment.