From d7567965ef2fa24af7d76bbffb109be67ea9344f Mon Sep 17 00:00:00 2001 From: Abdulkadir DILSIZ Date: Wed, 8 Jun 2022 12:45:29 +0300 Subject: [PATCH] Fix high cpu using in listen function --- client.go | 88 ++++++++++++++++++++++++------------------------------- 1 file changed, 38 insertions(+), 50 deletions(-) diff --git a/client.go b/client.go index ed7b033..b4a6d38 100644 --- a/client.go +++ b/client.go @@ -115,6 +115,7 @@ func NewClient(address string) (Client, error) { c.connected = false c.handshakeTimeout = 3 * time.Second c.dialer = &websocket.Dialer{ + Proxy: http.ProxyFromEnvironment, HandshakeTimeout: c.handshakeTimeout, ReadBufferSize: 512 * 1024, WriteBufferSize: 512 * 1024, @@ -122,7 +123,7 @@ func NewClient(address string) (Client, error) { } _ = c.Start() - + // go func() { _, _ = c.connect(false) }() @@ -142,8 +143,6 @@ func (c *client) setStarted(b bool) { } func (c *client) getStarted() bool { - c.mut.RLock() - defer c.mut.RUnlock() return c.started } @@ -154,9 +153,6 @@ func (c *client) setConnected(b bool) { } func (c *client) isConnected() bool { - c.mut.RLock() - defer c.mut.RUnlock() - return c.connected } @@ -167,23 +163,14 @@ func (c *client) setSubscribed(b bool) { } func (c *client) getSubscribed() bool { - c.mut.RLock() - defer c.mut.RUnlock() - return c.subscribed } func (c *client) getConn() *websocket.Conn { - c.mut.RLock() - defer c.mut.RUnlock() - return c.conn } func (c *client) getSubscribedAddress() map[string]bool { - c.mut.RLock() - defer c.mut.RUnlock() - return c.subscribedAddresses } @@ -326,40 +313,40 @@ func (c *client) listenWrite() { } func (c *client) listen() { - ticker := time.NewTicker(time.Second) - defer ticker.Stop() - for { - select { - case <-c.listenCtx.Done(): - return - case <-ticker.C: - for { - if !c.isConnected() { - continue - } + listening := true + for listening { + go func() { + select { + case <-c.listenCtx.Done(): + listening = false + break + } + }() + if !c.isConnected() { + time.Sleep(time.Second * 1) + continue + } - messageType, readingMessage, err := c.readMessage() - if websocket.IsCloseError(err, websocket.CloseNormalClosure) { - _ = c.Stop() - return - } - if err != nil { - c.closeWS() - continue - } + messageType, readingMessage, err := c.readMessage() + if websocket.IsCloseError(err, websocket.CloseNormalClosure) { + _ = c.Stop() + return + } + if err != nil { + c.closeWS() + continue + } - switch messageType { - case websocket.TextMessage: - if json.Valid(readingMessage) { - var transaction Transaction - if err := json.Unmarshal(readingMessage, &transaction); err == nil { - c.listenCallback(transaction) - } - } else { - // - // - } + switch messageType { + case websocket.TextMessage: + if json.Valid(readingMessage) { + var transaction Transaction + if err := json.Unmarshal(readingMessage, &transaction); err == nil { + c.listenCallback(transaction) } + } else { + // + // } } } @@ -394,22 +381,23 @@ func (c *client) ping() { } case <-c.mainCtx.Done(): + c.pingTicker.Stop() return } } } func (c *client) closeWS() { - if c.getConn() != nil { + if conn := c.getConn(); conn != nil { _ = c.write(sendMsg{ typ: mclose, messageTyp: websocket.CloseMessage, msg: websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""), }) - _ = c.conn.Close() - c.connected = false - c.conn = nil + _ = conn.Close() } + c.connected = false + c.conn = nil } // SetListenCallback Callback that will be called when the WS client captures a