Skip to content

Commit

Permalink
[fix] [new] [#380] NB Refactor ws state management
Browse files Browse the repository at this point in the history
Includes

  - [fix] [#380] Show correct (false) `:open?` state when Firefox unloading
  - [new] More robust detection of client-unloading state
  - [new] Client `:chsk/state` event now includes a 3rd `open-changed?` element:
      [<old-state-map> <new-state-map> <open-changed?>]
  - [new] Add ChWebSocket socket handle mechanism to ensure that socket events
    from old sockets don't produce noise, or pollute chsk state
  • Loading branch information
ptaoussanis committed Mar 7, 2023
1 parent 57e1416 commit bba7e48
Showing 1 changed file with 149 additions and 108 deletions.
257 changes: 149 additions & 108 deletions src/taoensso/sente.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
* Client-side events:
[:chsk/handshake [<?uid> nil[4] <?handshake-data> <first-handshake?>]]
[:chsk/state [<old-state-map> <new-state-map>]]
[:chsk/recv <ev-as-pushed-from-server>] ; Server>user push
[:chsk/state [<old-state-map> <new-state-map> <open-change?>]]
[:chsk/recv <ev-as-pushed-from-server>] ; Server>user push
[:chsk/ws-ping]
* Server-side events:
Expand All @@ -52,8 +52,9 @@
:last-ws-close - ?{:udt _ :ev <WebSocket-on-close-event>
:clean? _ :code _ :reason _}
:last-close - ?{:udt _ :reason _}, with reason e/o
#{nil :requested-disconnect :requested-reconnect
:downgrading-ws-to-ajax :unexpected}
#{nil :clean :unexpected :requested-disconnect
:requested-reconnect :downgrading-ws-to-ajax
:ws-ping-timeout}
:udt-next-reconnect - Approximate udt of next scheduled auto-reconnect attempt
Notable implementation details:
Expand Down Expand Up @@ -984,41 +985,57 @@
(defn- swap-chsk-state!
"Atomically swaps the value of chk's :state_ atom."
[chsk f]
(let [[old-state new-state]
(let [[old-state new-state] #_(swap-vals! (:state_ chsk) f) ; Clj 1.9+
(swap-in! (:state_ chsk)
(fn [old-state]
(let [new-state (f old-state)
new-state
(if (:first-open? old-state)
(assoc new-state :first-open? false)
new-state)

new-state
(if (:open? new-state)
(dissoc new-state :udt-next-reconnect)
new-state)]

(swapped new-state [old-state new-state]))))]
(let [new-state (f old-state)]
(enc/swapped new-state [old-state new-state]))))]

(when (not= old-state new-state)
(let [output [old-state new-state]]
;; (debugf "Chsk state change: %s" output)
(put! (get-in chsk [:chs :state]) [:chsk/state output])
output))))
(let [old-open? (boolean (:open? old-state))
new-open? (boolean (:open? new-state))

open-changed? (not= new-open? old-open? )
opened? (and new-open? (not old-open?))
closed? (and (not new-open?) old-open?)
first-open? (and opened? (not (:ever-opened? old-state)))

new-state ; Add transient state transitions, not in @state_
(if-not open-changed?
(do new-state)
(enc/assoc-when new-state
:open-changed? true
:opened? opened?
:closed? closed?
:first-open? first-open?))]

(cond
opened? (timbre/infof "Client chsk now open")
closed? (timbre/warnf "Client chsk now closed, reason: %s"
(get-in new-state [:last-close :reason] "unknown")))

(let [output [old-state new-state open-changed?]]
(put! (get-in chsk [:chs :state]) [:chsk/state output])
open-changed?)))))

(defn- chsk-state->closed [state reason]
(have? map? state)
(have? [:el #{:requested-disconnect
:requested-reconnect
:downgrading-ws-to-ajax
:unexpected}] reason)
(if (or (:open? state) (not= reason :unexpected))
(-> state
(dissoc :udt-next-reconnect)
(assoc
:open? false
:last-close {:udt (enc/now-udt) :reason reason}))
state))
(have?
[:el #{:clean :unexpected
:requested-disconnect
:requested-reconnect
:downgrading-ws-to-ajax
:ws-ping-timeout}]
reason)

(let [closing? (:open? state)
m state
m (dissoc m :udt-next-reconnect)
m (assoc m :open? false)]

(if closing?
(assoc m :last-close {:udt (enc/now-udt) :reason reason})
(do m))))

(defn- cb-chan-as-fn
"Experimental, undocumented. Allows a core.async channel to be provided
Expand Down Expand Up @@ -1062,8 +1079,7 @@
:open? true
:ever-opened? true
:uid ?uid
:handshake-data ?handshake-data
:first-open? first-handshake?}
:handshake-data ?handshake-data}

handshake-ev
[:chsk/handshake
Expand All @@ -1075,9 +1091,13 @@
:first-handshake? first-handshake?}]]

(assert-event handshake-ev)
(swap-chsk-state! chsk #(merge % new-state))
(put! (:internal chs) handshake-ev)
(swap-chsk-state! chsk
(fn [m]
(-> m
(dissoc :udt-next-reconnect)
(merge new-state))))

(put! (:internal chs) handshake-ev)
:handled))

#?(:clj
Expand Down Expand Up @@ -1185,6 +1205,11 @@

(comment (get-client-csrf-token-str false "token"))

(def client-unloading?_ (atom false))
#?(:cljs
(.addEventListener goog/global "beforeunload"
(fn [event] (reset! client-unloading?_ true) nil)))

(defrecord ChWebSocket
;; WebSocket-only IChSocket implementation
;; Handles (re)connections, cbs, etc.
Expand All @@ -1195,17 +1220,18 @@
ws-kalive-ms ws-kalive-ping-timeout-ms ws-opts
backoff-ms-fn ; (fn [nattempt]) -> msecs
cbs-waiting_ ; {<cb-uuid> <fn> ...}
socket_
socket_ ; ?[<socket> <socket-id>]
udt-last-comms_
ws-constructor]

IChSocket
(-chsk-disconnect! [chsk reason]
(reset! instance-handle_ nil) ; Disable auto retry
(swap-chsk-state! chsk #(chsk-state->closed % reason))
(when-let [s @socket_]
#?(:clj (.closeBlocking ^WebSocketClient s)
:cljs (.close s 1000 "CLOSE_NORMAL"))))
(let [closed? (swap-chsk-state! chsk #(chsk-state->closed % reason))]
(when-let [[s _sid] @socket_]
#?(:clj (.close ^WebSocketClient s 1000 "CLOSE_NORMAL")
:cljs (.close s 1000 "CLOSE_NORMAL")))
closed?))

(-chsk-reconnect! [chsk]
(-chsk-disconnect! chsk :requested-reconnect)
Expand All @@ -1230,14 +1256,19 @@
(when-let [cb-fn* (pull-unused-cb-fn! cbs-waiting_ ?cb-uuid)]
(cb-fn* :chsk/timeout)))))

(try
#?(:cljs (.send @socket_ ppstr)
:clj (.send ^WebSocketClient @socket_ ^String ppstr))
(or
(when-let [[s _sid] @socket_]
(try
#?(:cljs (.send s ppstr)
:clj (.send ^WebSocketClient s ^String ppstr))

(reset! udt-last-comms_ (enc/now-udt))
:apparent-success
(catch #?(:clj Throwable :cljs :default) t
(errorf t "Chsk send error")
(reset! udt-last-comms_ (enc/now-udt))
:apparent-success
(catch #?(:clj Throwable :cljs :default) t
(errorf t "Chsk send error")
nil)))

(do
(when-let [cb-uuid ?cb-uuid]
(let [cb-fn* (or (pull-unused-cb-fn! cbs-waiting_ cb-uuid)
(have ?cb-fn))]
Expand All @@ -1250,9 +1281,17 @@
connect-fn
(fn connect-fn []
(when (have-handle?)
(let [retry-fn
(let [;; ID for the particular candidate socket to be returned from
;; this particular connect-fn call
this-socket-id (enc/uuid-str)
own-socket?
(fn []
(when-let [[_s sid] @socket_]
(= sid this-socket-id)))

retry-fn
(fn [] ; Backoff then recur
(when (have-handle?)
(when (and (have-handle?) (not @client-unloading?_))
(let [retry-count* (swap! retry-count_ inc)
backoff-ms (backoff-ms-fn retry-count*)
udt-next-reconnect (+ (enc/now-udt) backoff-ms)]
Expand All @@ -1269,22 +1308,24 @@
on-error
#?(:cljs
(fn [ws-ev]
(errorf ; ^:meta {:raw-console? true}
"WebSocket error: %s"
(try
(js->clj ws-ev)
(catch :default _ ws-ev)))
(when (own-socket?)
(errorf ; ^:meta {:raw-console? true}
"WebSocket error: %s"
(try
(js->clj ws-ev)
(catch :default _ ws-ev)))

(swap-chsk-state! chsk
#(assoc % :last-ws-error
{:udt (enc/now-udt), :ev ws-ev})))
(swap-chsk-state! chsk
#(assoc % :last-ws-error
{:udt (enc/now-udt), :ev ws-ev}))))

:clj
(fn [ex]
(errorf ex "WebSocket error")
(swap-chsk-state! chsk
#(assoc % :last-ws-error
{:udt (enc/now-udt), :ex ex}))))
(when (own-socket?)
(errorf ex "WebSocket error")
(swap-chsk-state! chsk
#(assoc % :last-ws-error
{:udt (enc/now-udt), :ex ex})))))

on-message ; Nb receives both push & cb evs!
(fn #?(:cljs [ws-ev] :clj [ppstr])
Expand All @@ -1300,7 +1341,7 @@
(reset! udt-last-comms_ (enc/now-udt))

(or
(when (handshake? clj)
(when (and (own-socket?) (handshake? clj))
(receive-handshake! :ws chsk clj)
(reset! retry-count_ 0)
:handshake)
Expand All @@ -1317,38 +1358,34 @@
(let [buffered-evs clj]
(receive-buffered-evs! chs buffered-evs))))))

;; Fires repeatedly (on each connection attempt) while
;; server is down:
on-close
(fn #?(:cljs [ws-ev] :clj [code reason remote])
(let [last-ws-close
#?(:clj
{:udt (enc/now-udt)
:clean? (= code org.java_websocket.framing.CloseFrame/NORMAL)
:code code
:reason reason}

:cljs
{:udt (enc/now-udt)
:ev ws-ev
:clean? (enc/oget ws-ev "wasClean")
:code (enc/oget ws-ev "code")
:reason (enc/oget ws-ev "reason")})]

;; Firefox calls "onclose" while unloading,
;; Ref. http://goo.gl/G5BYbn:
(if (:clean? last-ws-close)
(do
(debugf "Clean WebSocket close, will not attempt reconnect")
(swap-chsk-state! chsk
#(assoc % :last-ws-close last-ws-close)))
(do
(swap-chsk-state! chsk
#(assoc (chsk-state->closed % :unexpected)
:last-ws-close last-ws-close))
(retry-fn)))))

?socket
;; Fires repeatedly (on each connection attempt) while server down
(fn #?(:cljs [ws-ev] :clj [code reason _remote?])
(when (own-socket?)
(let [;; For codes, Ref. https://www.rfc-editor.org/rfc/rfc6455.html#section-7.1.5
last-ws-close ; For advanced debugging, etc.
#?(:clj
{:udt (enc/now-udt)
:code code
:reason reason
:clean? (= code 1000)}

:cljs
{:udt (enc/now-udt)
:code (enc/oget ws-ev "code")
:reason (enc/oget ws-ev "reason")
:clean? (boolean (enc/oget ws-ev "wasClean"))
:ev ws-ev})

reason* (if (:clean? last-ws-close) :clean :unexpected)]

(swap-chsk-state! chsk
#(assoc (chsk-state->closed % reason*)
:last-ws-close last-ws-close))

(retry-fn))))

?new-socket
(try
(ws-constructor
(merge ws-opts
Expand All @@ -1367,15 +1404,20 @@
(errorf t "WebSocket error")
nil))]

(if-not ?socket
(retry-fn) ; Couldn't even get a socket
(if-let [new-socket ?new-socket]
(do
;; Clean up the old socket if any exists
(when-let [old-socket @socket_]
#?(:clj (.close ^WebSocketClient old-socket)
:cljs (.close old-socket)))
(when-let [[old-s _old-sid]
(reset-in! socket_
[new-socket this-socket-id])]

;; Close old socket if one exists
(timbre/tracef "Old client WebSocket will be closed")
#?(:clj (.close ^WebSocketClient old-s 1000 "CLOSE_NORMAL")
:cljs (.close old-s 1000 "CLOSE_NORMAL")))
new-socket)

(reset! socket_ ?socket))))))]
;; Couldn't create a socket
(retry-fn)))))]

(reset! retry-count_ 0)
(connect-fn)
Expand Down Expand Up @@ -1436,8 +1478,9 @@
IChSocket
(-chsk-disconnect! [chsk reason]
(reset! instance-handle_ nil) ; Disable auto retry
(swap-chsk-state! chsk #(chsk-state->closed % reason))
(when-let [x @curr-xhr_] (.abort x)))
(let [closed? (swap-chsk-state! chsk #(chsk-state->closed % reason))]
(when-let [x @curr-xhr_] (.abort x))
closed?))

(-chsk-reconnect! [chsk]
(-chsk-disconnect! chsk :requested-reconnect)
Expand Down Expand Up @@ -1484,8 +1527,7 @@
(if (= ?error :timeout)
(when ?cb-fn (?cb-fn :chsk/timeout))
(do
(swap-chsk-state! chsk
#(chsk-state->closed % :unexpected))
(swap-chsk-state! chsk #(chsk-state->closed % :unexpected))
(when ?cb-fn (?cb-fn :chsk/error))))

(let [content ?content
Expand All @@ -1508,7 +1550,7 @@
(when (have-handle?)
(let [retry-fn
(fn [] ; Backoff then recur
(when (have-handle?)
(when (and (have-handle?) (not @client-unloading?_))
(let [retry-count* (inc retry-count)
backoff-ms (backoff-ms-fn retry-count*)
udt-next-reconnect (+ (enc/now-udt) backoff-ms)]
Expand Down Expand Up @@ -1558,8 +1600,7 @@
;; (= ?error :abort) ; Abort => intentional, not an error
:else
(do
(swap-chsk-state! chsk
#(chsk-state->closed % :unexpected))
(swap-chsk-state! chsk #(chsk-state->closed % :unexpected))
(retry-fn)))

;; The Ajax long-poller is used only for events, never cbs:
Expand Down

0 comments on commit bba7e48

Please sign in to comment.